Dubbo服务暴露原理

1.概述

RPC作为分布式系统中不可或缺的中间件,在业界已经具有相当成熟的技术实现,其中Dubbo应用得特别广泛,本文将对Dubbo服务暴露的流程进行介绍。在正式进入Dubbo原理探究之前,需要先弄清楚RPC的基本模型:

RPC原理.png

consumer代表服务调用方,provider代表服务提供方,registry代表注册中心。当服务提供方启动时会将自己的信息(服务ip,port等)记录在注册中心,这样在调用方调用的时候,会先从注册中心获取到提供方的基本信息,然后发送网络请求给provider完成调用;同时consumer在启动的时候,会向注册中心订阅消息,这样就能在provider发生变更的时候获取到最新的信息,保证请求路由到正确的provider

2.Dubbo服务暴露概览

Dubbo服务暴露就是指providerregistry注册的过程,以zookeeper作为注册中心,netty作为通讯框架,本文基于2.6.x版本代码分析。使用Dubbo官方提供的demo运行,dubbo-demo-provider配置文件如下:

    <!-- provider's application name, used for tracing dependency relationship -->
    <dubbo:application name="demo-provider"/>

    <!-- use multicast registry center to export service -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- use dubbo protocol to export service on port 20880 -->
    <dubbo:protocol name="dubbo" port="20880"/>

    <!-- service implementation, as same as regular local bean -->
    <bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>

    <!-- declare the service interface to be exported -->
    <dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>

服务暴露的整个过程大致可以分为四个阶段:

  • 创建invoker对象并对其加工
  • 开启netty服务,用于后续接收消费方发起的网络请求
  • 获取zookeeper连接并将服务注册成为zookeeper上的节点(服务引用的时候通过获取节点则能够找到provider然后建立网络连接)
  • 返回Exporter对象并放入ServiceConfig中的exporters变量中存储
    dubbo服务启动时序图.png

3.创建invoker对象并对其加工

ServiceConfig类中会先调用exportLocal完成本地暴露,然后调用proxyFactory.getInvoker获取invoker对象,先关注下该类中的静态变量proxyFactory,它会调用ExtensionLoader中的getAdaptiveExtension方法完成初始化,该类是Dubbo框架自身实现的一种SPI机制,能够根据配置文件灵活选择接口的具体实现

package com.alibaba.dubbo.config;


/**
 * ServiceConfig
 *
 * @export
 */
public class ServiceConfig<T> extends AbstractServiceConfig {

    private static final long serialVersionUID = 3033787999037024738L;

    /**根据dubbo spi机制加载**/
    private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

    /**根据dubbo spi机制加载**/
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

    private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();

    private Class<?> interfaceClass;

    // don't export when none is configured
    if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

        // export to local if the config is not remote (export to remote only when config is remote)
        if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
            /**本地暴露**/
            exportLocal(url);
        }
        // export to remote if the config is not local (export to local only when config is local)
        if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
            if (registryURLs != null && !registryURLs.isEmpty()) {
                for (URL registryURL : registryURLs) {
                    /**获取invoker对象**/
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                    DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)
                    Exporter<?> exporter = protocol.export(wrapperInvoker);
                    exporters.add(exporter);
                }
            }
        }
    }
}

proxyFactory在初始化的时候会被赋值为ProxyFactory$Adaptive,该类是通过字节码增强实现的

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        /**获取url中的proxy参数,默认为javassist**/
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        /**根据proxy参数获取对应的ProxyFactory接口的实现**/
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }
}

Dubbo针对每一个需要根据配置决定具体实现的接口都会在运行时生成以$Adaptive后缀结尾的类,目的是能够根据配置文件,在运行时灵活地选择接口的具体实现,例如:String extName = url.getParameter("proxy", "javassist")这行代码,如果url中的proxy属性为空即没有在配置文件中指定,则默认使用javassist作为extName的值,加载出JavassistProxyFactory类作为ProxyFactory接口的缺省实现。因为这里配置<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>,所以会加载JdkProxyFactory。但通过debug发现,com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName)返回的是StubProxyFactoryWrapper,原因是ProxyFactory接口默认有一个包装类实现,无论是JavassistProxyFactory还是JdkProxyFactory都是在StubProxyFactoryWrapper中通过构造函数完成proxyFactory属性的设置,当ProxyFactory$Adaptive中执行extension.getInvoker(arg0, arg1, arg2)的时候,先会调用到StubProxyFactoryWrappergetInvoker方法,该方法的实现为proxyFactory.getInvoker(proxy, type, url),所以最后会调用到JdkProxyFactorygetInvoker方法。对于这种有包装类实现的接口,$Adaptive会优先加载出包装类,而根据配置所对应的具体实现则是通过构造函数的形式作为包装类的属性被注入,在调用的时候先调用包装类从而间接调用到配置所对应的实现,这里使用了装饰器模式,可以在调用之间增加额外的处理。整个流程可以归纳如下:

  • ProxyFactory$Adaptive 获取extension
  • 初始化 StubProxyFactoryWrapper,构造函数设置proxyFactory属性为JdkProxyFactory
public class StubProxyFactoryWrapper implements ProxyFactory {

    private final ProxyFactory proxyFactory;

   /**构造函数中设置proxyFactory值为配置所对应的接口实现**/
    public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
        return proxyFactory.getInvoker(proxy, type, url);
    }

}
  • ProxyFactory$Adaptive调用extension.getInvoker等同于StubProxyFactoryWrapper调用getInvoker

对应时序图如下所示:


getInvoker.png

返回invoker对象后,进入到protocol.export方法,protocol初始化为Protocol$AdaptiveProtocol接口有两个包装类ProtocolListenerWrapperProtocolFilterWrapper,顾名思义监听器与过滤器,该过程会先调用ProtocolListenerWrapper,该类的protocol属性在初始化的时候会被设置为ProtocolFilterWrapper,在ServiceConfig中调用protocol.export时会直接进入到ProtocolListenerWrapper中的protocol.export(invoker)方法,接着进入ProtocolFilterWrapper中的protocol.export(invoker)方法,ProtocolFilterWrapper初始化时会设置protocol属性为RegistryProtocol,因此该过程最终会调用到RegistryProtocolexport方法


/**
 * ListenerProtocol
 */
public class ProtocolListenerWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolListenerWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
            /**ServiceConfig调用protocol.export直接进入到该方法**/
            return protocol.export(invoker);
        }
        /**RegistryProtocol调用protocol.export时候进入**/
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

}

根据ProtocolFilterWrapper类的命名,可以看出它拥有与Spring Filter类似的功能,本质上是一个过滤器

  /**
 * ListenerProtocol
 */
public class ProtocolFilterWrapper implements Protocol {

    private final Protocol protocol;

    public ProtocolFilterWrapper(Protocol protocol) {
        if (protocol == null) {
            throw new IllegalArgumentException("protocol == null");
        }
        this.protocol = protocol;
    }


    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
           /**ServiceConfig调用protocol.export直接进入到该方法**/
            return protocol.export(invoker);
        }
       /**RegistryProtocol调用protocol.export时候进入**/
        return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
    }

}

因为设置<dubbo:protocol name="dubbo" port="20880"/>RegistryProtocol中的protocol会被设置为DubboRegistry,当调用protocol.export(invokerDelegete)的时候,会再次进入到ProtocolListenerWrapper包装类中,这次会执行new ListenerExporterWrapper<T>


/**
 * ListenerExporter
 */
public class ListenerExporterWrapper<T> implements Exporter<T> {

    private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);

    private final Exporter<T> exporter;

    private final List<ExporterListener> listeners;

    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
        if (exporter == null) {
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {
                if (listener != null) {
                    try {
                        /**调用listener的exported方法,作为hook函数**/
                        listener.exported(this);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

    @Override
    public Invoker<T> getInvoker() {
        return exporter.getInvoker();
    }

    @Override
    public void unexport() {
        try {
            exporter.unexport();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                RuntimeException exception = null;
                for (ExporterListener listener : listeners) {
                    if (listener != null) {
                        try {
                            listener.unexported(this);
                        } catch (RuntimeException t) {
                            logger.error(t.getMessage(), t);
                            exception = t;
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        }
    }

}

重点关注ListenerExporterWrapper的构造函数,该函数主要完成两件事,一是赋值exporter变量,将外部传入的Exporter对象保存,二是遍历外部传入的ExporterListener列表并调用其exported方法。此处为扩展点Dubbo对于ExporterListener接口只给出了ExporterListenerAdapter这一实现且exported方法实现为空,开发者可以自己实现该方法以达到扩展的目的,当然也可以自己实现ExporterListener接口,依据Dubbo SPI机制加载执行自定义的业务逻辑。除构造函数以外,unexport也拥有类似的功能,当调用unexport的时候会遍历之前保存的listeners并调用其unexport方法。
继续回到ProtocolFilterWrapper,当调用return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER))时,会触发buildInvokerChain

private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
    Invoker<T> last = invoker;
    List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
    if (!filters.isEmpty()) {
        for (int i = filters.size() - 1; i >= 0; i--) {
            final Filter filter = filters.get(i);
            final Invoker<T> next = last;
            last = new Invoker<T>() {

                @Override
                public Class<T> getInterface() {
                    return invoker.getInterface();
                }

                @Override
                public URL getUrl() {
                    return invoker.getUrl();
                }

                @Override
                public boolean isAvailable() {
                    return invoker.isAvailable();
                }

                @Override
                public Result invoke(Invocation invocation) throws RpcException {
                    /**包装invoker对象**/
                    return filter.invoke(next, invocation);
                }

                @Override
                public void destroy() {
                    invoker.destroy();
                }

                @Override
                public String toString() {
                    return invoker.toString();
                }
            };
        }
    }
    return last;
}

遍历通过SPI加载出来的Filter来包装invoker对象,当invoker调用invoke方法时,就会触发Filter中的实现逻辑,此处使用责任链设计模式,当然开发者也可以实现自己的Filter来对此处逻辑进行扩展。整个流程时序图如下所示:

ServiceConfig -_ RegistryProtocol -_ DubboProtocol.png

4.开启netty服务

RegistryProtocol中调用exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)进入到DubboProtocol类中的export方法,

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {

    public static final String NAME = "dubbo";

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service.
        /**获取key,com.alibaba.dubbo.demo.DemoService:20880**/
        String key = serviceKey(url);
        /**创建exporter对象**/
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        /**保存exporter对象**/
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }

        /**创建netty server**/
        openServer(url);
        /**初始化序列化器**/
        optimizeSerialization(url);
        return exporter;
    }

    private void openServer(URL url) {
        // find server.
        /**ip+port**/
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            /**从缓存中获取server对象**/
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                /**缓存中没有则创建server**/
                serverMap.put(key, createServer(url));
            } else {
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

    private ExchangeServer createServer(URL url) {
        // send readonly event when server closes, it's enabled by default
        url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
        String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);

        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
       /**设置传输协议为dubbo**/
        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        ExchangeServer server;
        try {
            /**netty Server初始化**/
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }
        str = url.getParameter(Constants.CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }
        return server;
    }
}

首先会创建DubboExporter对象,并把该对象保存到exporterMap中,其中类:端口会被作为key,例如:com.alibaba.dubbo.demo.DemoService:20880openServer中是获取netty Server的具体逻辑,serverMap中会存放创建好的server,key是ip:port,如果在serverMap中没有存储server,则调用createServer方法创建。

5.获取zookeeper连接并将服务注册成为zookeeper上的节点

完成Netty Server启动后,通过RegistryProtocol中的getRegistry方法创建ZookeeperRegistry对象,该对象的构造函数会进行zookeeper的连接。

private Registry getRegistry(final Invoker<?> originInvoker) {
    URL registryUrl = getRegistryUrl(originInvoker);
    return registryFactory.getRegistry(registryUrl);
}

通过返回的ZookeeperRegistry对象,调用subscribe方法便将服务注册成为了zookeeper的节点,/dubbo/com.alibaba.dubbo.demo.DemoService/providers,值为dubbo://ip:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1644&proxy=jdk&side=provider&timestamp=1603026176171,里面包含服务的ip、端口、接口名、接口中包含的方法、代理模式等等。

6.总结

整个服务暴露的过程就是服务向注册中心注册的过程,除了基本的实现以外,Dubbo在该过程中还提供了ListenerFilter这两个扩展点方便开发者进行定制化的实现。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351