Dubbo源码解读

ExtensionLoader源码解析

https://www.jianshu.com/p/2f4eeb8ef93a

ExtensionLoader

服务暴露过程

1、spring会先解析xml生成BeanDefeination,DubboBeanDefinitionParser的parse方法中完成dubbo 命名空间的解析。

2、在applicationContext启动的refresh阶段会广播ApplicationEvent事件,触发ApplicatinListener.onApplicationEvent()方法的执行,ServiceBean实现了ApplicatuinListener接口,所以onApplicationEvent()方法开始执行。

3、ProxyFactory$Adaptive是dubbo通过javassist字节码动态产生的一个类,主要功能是利益dubbo的接口扩展机制适配指定的实现

4、JavassistProxyFactory.getInvoker()实现就是反射调用指定的方法。封装成了Wrapper类

5、Protocol$Adaptive的原理同3

6、RegistryProtocol.export使用同样的原理,通过Protocol$Adaptive适配器,将真正的export工作代理给了DubboProtocol

7、DubboProtocol.export 核心代码参考openServer方法,创建dubbo 服务的监听socket等。

8、ProtocolFilterWrapper.export将filter加入到Invoker中,最核心的代码在buildInvokerChain方法中,为每个filter产生一个Invoker,然后绑定到Invoker链中。

9、RegistryProtocol.getRegistry获取注册中心的实现,根据<dubbo:registry/> 元素配置获取对应的RegistryFactory实现,获取Registry对象。

10、ZookeeperRegistry.registry调用具体的注册中心完成服务的注册。至此,服务暴露完成。

ProxyFactory(代理)

代理分为两种,静态代理和动态代理。

  1. 静态代理:如果代理类在程序运行前就已经存在,那么这种代理就是静态代理。

  2. 动态代理:代理类在程序运行时创建的代理方式。动态代理关系由两组静态代理关系组成,这就是动态代理的原理。

动态代理的底层原理就是字节码技术,dubbo提供了两种方式来实现代理:

dubbo对于动态代理有两种实现方式

1、javassist

javassist是一款java字节码引擎工具,能够在运行时编译生成class。该方法也是代理的默认方法

2、jdk

它内置在JDK中,因此不依赖第三方jar包,但是功能相对较弱,当调用Proxy的静态方法创建动态代理类时,类名格式是"$ProxyN",N代表第N次生成的动态代理类,如果重复创建动态代理类会直接返回原先创建的代理类。但是以这种格式命名的类是继承Proxy类的,并且实现了其所代理的一组接口,这里就出现了它的一个局限性,由于java的类只能单继承,所以JDK动态代理仅支持接口代理。

dubbo的服务端和消费端启动都默认采用javassist代理

dubbo不能对非接口类进行代理的

ProxyFactory

public class JavassistProxyFactory extends AbstractProxyFactory {

    // 获取代理
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName, 
                                      Class<?>[] parameterTypes, 
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
    }

}

getProxy:引用服务时调用,将Invoker对象转化为proxy代理对象。

getInvoker:暴露服务时调用,将ref(真正的服务实现类)转化为invoker。

Protocol

https://my.oschina.net/u/2377110/blog/1857642

protocol的接口方法

/**
 * Protocol. (API/SPI, Singleton, ThreadSafe)
 */
@SPI("dubbo")
public interface Protocol {

    /**
     * Get default port when user doesn't config the port.
     *
     * @return default port
     */
    int getDefaultPort();

    /**
     * Export service for remote invocation: <br>
     * 1. Protocol should record request source address after receive a request:
     * RpcContext.getContext().setRemoteAddress();<br>
     * 2. export() must be idempotent, that is, there's no difference between invoking once and invoking twice when
     * export the same URL<br>
     * 3. Invoker instance is passed in by the framework, protocol needs not to care <br>
     *
     * @param <T>     Service type
     * @param invoker Service invoker
     * @return exporter reference for exported service, useful for unexport the service later
     * @throws RpcException thrown when error occurs during export the service, for example: port is occupied
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * Refer a remote service: <br>
     * 1. When user calls `invoke()` method of `Invoker` object which's returned from `refer()` call, the protocol
     * needs to correspondingly execute `invoke()` method of `Invoker` object <br>
     * 2. It's protocol's responsibility to implement `Invoker` which's returned from `refer()`. Generally speaking,
     * protocol sends remote request in the `Invoker` implementation. <br>
     * 3. When there's check=false set in URL, the implementation must not throw exception but try to recover when
     * connection fails.
     *
     * @param <T>  Service type
     * @param type Service class
     * @param url  URL address for the remote service
     * @return invoker service's local proxy
     * @throws RpcException when there's any error while connecting to the service provider
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * Destroy protocol: <br>
     * 1. Cancel all services this protocol exports and refers <br>
     * 2. Release all occupied resources, for example: connection, port, etc. <br>
     * 3. Protocol can continue to export and refer new service even after it's destroyed.
     */
    void destroy();

}

dubboProtocol是dubbo的核心类,具体实现了暴露服务和引用服务,即export方法

首先根据url生成servicekey,也就是这个invoker的唯一标识,然后放到exportMap中缓存起来。

openServer会启动一个socket服务(默认监听20880端口),里面最终也会调用createServer方法。

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        
        // export service.
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);
        // .....
        openServer(url);
        
        return exporter;
    }

createServer方法:在 server = Exchangers.bind(url, requestHandler) 中,放置了一个requestHandler

来处理所有的请求,其实这个handler也是一个dispatcher

private ExchangeServer createServer(URL url) {
        url = URLBuilder.from(url)
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
                .addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT))
                .addParameter(Constants.CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }

        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return server;
    }

reply方法就是requestHandler核心方法,getInvoker会计算inv的serviceKey,然后从exporter缓存中找到exporter,进而拿到invoker,调用返回结果。

public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                // ....
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

export方法执行完成后就标志服务端可以接受客户端的请求了,但是还需要向注册中心注册.

下面的就是RegistryProtocol的export方法,可以看到调用getRegistry后,其实Registry是一个代理,调用了原创服务的Registry的register方法完成注册的。

public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // .....
    }

以上就是服务端暴露服务的过程。

客户端引用的过程:客户端在通过getBean方法最终会调用createProxy,然后是refer方法:

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());
        if (! Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
                    Constants.CHECK_KEY, String.valueOf(false)));
        }
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, 
                Constants.PROVIDERS_CATEGORY 
                + "," + Constants.CONFIGURATORS_CATEGORY 
                + "," + Constants.ROUTERS_CATEGORY));
        return cluster.join(directory);
}

这是RegistryProtocol的doRefer方法,可以看到是向Registry请求感兴趣的服务地址,Registry返回的每一个url都会被dubboProtocol使用refer封装成一个invoker,最后RegistryProtocol使用cluster把所有invoker封装成一个invoker,也就是代理。

Asm技术

Java字节码操控框架,能被用来动态生成类或者增强既有类的功能。ASM可以直接产生二进制class文件,也可以在类被加载入Java虚拟机之前改变类行为。Java class被存储在严格格式定义的.class文件里面。这些类文件拥有足够的元数据来解析类中的所有速算。ASM从类文件中读入信息后,能够改变类行为,分析类信息,甚至能够根据用户要求生成新类。

ThreadLocal

ThreadLocal是如何实现为线程提供变量副本的:

首先我们要知道每一个线程下都有一个私有变量map,当我们使用ThreadLocal进行set(val)变量时,会向当前线程下的map中put一个键为当前ThreadLocal对象(虚引用),值为val的键值对,这样当使用ThreadLocal的get方法时,会直接向当前线程下的map获得键为此ThreadLocal的值。由于此操作只在当前线程下,所以完美的避免了并发。

threadpool的线程池类型

FixedThreadPool :固定大小线程池,启动时创建线程,不关闭,一直特有(缺省)

 public Executor getExecutor(URL url) {
        // 获取线程名称前缀,默认Dubbo
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 获取核心线程和最大线程数,fixed是相等的,默认200
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        // 获取队列大小,默认为0 ,SynchronousQueue,否则为LinkedBlockingQueue
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

注意:如果使用这个线程池,默认的线程池数量为200,默认阻塞队列大小为0,默认使用的阻塞队列为SynchronousQueue。如果业务时间并发较高或者处理时间较长,请适当的调整阻塞队列的大小,即queues变量,否则会导致大量的请求被丢弃。该线程池也是 Dubbo 默认使用的线程池,估计出事的挺多。

CachedThreadPool:缓存线程池,空闲一分钟自动删除,需要时重建。

 public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // 获取核心线程数 corethreads,默认0
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 获取最大线程数 默认Integer.MAX_VALUE 2147483647
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // 获取线程池队列 默认0
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 获取线程存活时间 默认1分钟
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

注意:它并不是正经的cache,该线程池的阻塞队列虽然默认是SynchronousQueue,但是如果用户配置了queues变量,且其值较大,使用的阻塞队列就是LinkBolckingQueue,此时一旦corethreads再使用默认值0,就会导致处理时间阻塞。

LimitedThreadPool:可伸缩线程池,但池中的线程数只会增长不会收缩(为避免收缩时突然来了大流量引起的性能问题)

public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 存活无限长时间
        return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}

注意:这个线程池中的线程数可以一直增长到上限,永不回收,所以threads变量即线程最大限制的值不能太大,使用默认200即可,避免00M。

EagerThreadPool

AbortPolicyWithReport

dubbo自定义的线程池拒绝策略,任务被拒绝后输出堆栈信息dumpJStack()

dubbo架构介绍以及各个模块的关系

https://www.cnblogs.com/wangzhuxing/p/9725096.html

dubbo-common 公共逻辑模块:包括Util类和通用模型

dubbo-remoting 远程通讯模块:相当于Dubbo协议的实现,如果RPC用RMI协议,则不需要使用此包

dubbo-rpc 远程调用模块:抽象各种协议,以及动态代理,只包含一对一的调用,不关心集群的管理。

dubbo-cluster 集群模块:将多个服务提供方伪装为一个提供方,包括:负载均衡,容错,路由等,集群的地址列表可以是静态配置的,也可以是由注册中心下发。

dubbo-registry 注册中心模块:基于注册中心下发地址的集群方式,以及对各种注册中心的抽象。

dubbo-monitor监控模块:统计服务调用次数,调用时间的,调用链跟踪的服务。

dubbo-config 配置模块:是Dubbo对外的API,用户通过config使用dubbo,因此dubbo的所有细节。

dubbo-container容器模块:是一个Standlone的容器,以简单的Main加载Spring启动,因为服务通常不需要Tomcat/JBoss等WEB容器的特性,没必要用WEB容器去加载服务。

logger模块

第三方日志框架的优先级

Log4J 最高 (默认就用这个)

SLF4J 次高 (上面没有采用这个)

Common Logging(jcl就是common logging) 次低(Log4j和SLF4J在项目中均么有用这个)

JDK log 最低(最好的选择)

有和没有指的是项目classpath下面有没有对应的jar包,如果有则表示支持对应的日志实现。

dubbo选择日志提供方的代码

// 查找常用的日志框架
     static {
        String logger = System.getProperty("dubbo.application.logger", "");
        switch (logger) {
            case "slf4j":
                setLoggerAdapter(new Slf4jLoggerAdapter());
                break;
            case "jcl":
                setLoggerAdapter(new JclLoggerAdapter());
                break;
            case "log4j":
                setLoggerAdapter(new Log4jLoggerAdapter());
                break;
            case "jdk":
                setLoggerAdapter(new JdkLoggerAdapter());
                break;
            case "log4j2":
                setLoggerAdapter(new Log4j2LoggerAdapter());
                break;
            default:
                List<Class<? extends LoggerAdapter>> candidates = Arrays.asList(
                        Log4jLoggerAdapter.class,
                        Slf4jLoggerAdapter.class,
                        Log4j2LoggerAdapter.class,
                        JclLoggerAdapter.class,
                        JdkLoggerAdapter.class
                );
                for (Class<? extends LoggerAdapter> clazz : candidates) {
                    try {
                        setLoggerAdapter(clazz.newInstance());
                        break;
                    } catch (Throwable ignored) {
                    }
                }
        }
    }

热部署和热替换

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,122评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,070评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,491评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,636评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,676评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,541评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,292评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,211评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,655评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,846评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,965评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,684评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,295评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,894评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,012评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,126评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,914评论 2 355

推荐阅读更多精彩内容