三 捋代码--dubbo源码服务发布

分析完spi,开始分析服务发布,先看一张官网的服务发布时序图:


服务发布时序图

下面开始分析服务发布逻辑,我们的入口是我们的使用方式-----配置文件:

       <dubbo:application name="kai-nina-server"/>
       <dubbo:registry id="nina-register" address="N/A" port="2181" protocol="zookeeper" file="/Users/kai.yang/Documents/学习/dubbo"/>
       <dubbo:registry id="nina-register2" address="127.0.0.1" port="2181" protocol="zookeeper" file="/Users/kai.yang/Documents/学习/dubbo"/>
       <dubbo:protocol id="dubbo" port="20880" name="dubbo" server="netty4"/>
       <dubbo:service interface="com.alibaba.dubbo.kai.api.HelloApi" ref="helloApi" version="0.0"  />
       <bean id="helloApi" class="com.alibaba.dubbo.kai.api.imp.HelloApiImpl"/>

很熟悉吧,dubbo的常用配置文件启动方式,当然dubbo有api的方式,如下:


// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
registry.setAddress("10.20.130.230:9090");
registry.setUsername("aaa");
registry.setPassword("bbb");
 
// 服务提供者协议配置
ProtocolConfig protocol = new ProtocolConfig();
protocol.setName("dubbo");
protocol.setPort(12345);
protocol.setThreads(200);

ServiceConfig<XxxService> service = new ServiceConfig<XxxService>(); // 此实例很重,封装了与注册中心的连接,请自行缓存,否则可能造成内存和连接泄漏
service.setApplication(application);
service.setRegistry(registry); // 多个注册中心可以用setRegistries()
service.setProtocol(protocol); // 多个协议可以用setProtocols()
service.setInterface(XxxService.class);
service.setRef(xxxService);
service.setVersion("1.0.0");


service.export();

当然实际逻辑大同小异,一个用spring,一个不用spring而已,那我们就看spring中的,发布服务的入口就是 <dubbo:service >标签。
熟悉spring自定义标签的同学,对这个一定不陌生,其实现关键点为:

  • 1、handler--对应一个NamseSpaceHandler类
  • 2、parser---对应一个spring的bean解析类
  • 3、 schemas--对应两个指定xsd和handler的配置文件
       分析入口自然在handler中,dubbo的handler为
public class DubboNamespaceHandler extends NamespaceHandlerSupport {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new DubboBeanDefinitionParser(AnnotationBean.class, true));
    }

}

   很明显入口到了DubboBeanDefinitionParser解析类,服务发布明细是service入口,看到此解析会生成一个ServiceBean的beand定义,因此我们的方向被带到了ServiceBean类(spring自定义注解我们就不做过多解释了,都是spring生成bean的老一套,解析标签,依赖注入,生成bean定义)。
   下面开始重点看ServiceBean类

public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware {

他实现了InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener, BeanNameAware 几个spring的接口,根据Spring的执行流程会做相应注入,最后到afterPropertiesSet及onApplicationEvent中,按例先看大图说话:


dubbo发布服务过程-图太大请查看原图

   我们可以看出图中已经包含了所有发布过程,让我一起了解下。
首先我们介绍下服务发布的对象模型


dubbo中服务发布模型

dubbo的发布服务层层包装后最终得到exporter对象,看起来很牛的样子,层层包装,调用有20来层(加filter)。下面让我们看看他们形成的过程。
大图中显示serviceBean通过afterPropertiesSet开始赋值,然后判断是否配置delay:

   if (!isDelay()) {
            export();
        }

看下isDelay这个方法

  private boolean isDelay() {
        Integer delay = getDelay();
        ProviderConfig provider = getProvider();
        if (delay == null && provider != null) {
            delay = provider.getDelay();
        }
        return supportedApplicationListener && (delay == null || delay == -1);
    }

delay如果你配置了>0的值,将会进入export()方法,在export()方法中将会通过
开启异步延时线程发布服务

  if (delay != null && delay > 0) {
            delayExportExecutor.schedule(new Runnable() {
                public void run() {
                    doExport();
                }
            }, delay, TimeUnit.MILLISECONDS);
        } else {
            doExport();
        }

否则将会不执行export()方法,那将会什么是否发布呢?
对了,大图中展示了这个流程,将会在ApplicationListener接口的事件通知方法执行

 public void onApplicationEvent(ApplicationEvent event) {
        if (ContextRefreshedEvent.class.getName().equals(event.getClass().getName())) {
            if (isDelay() && !isExported() && !isUnexported()) {
                if (logger.isInfoEnabled()) {
                    logger.info("The service ready on spring started. service: " + getInterface());
                }
                export();
            }
        }
    }

方法中监听ContextRefreshedEvent(spring的context加载完毕)事件,进行发布export方法
好了,终于开始发布了,export方法慢慢进入doExport(),首先一些基本参数的校验,图中显示会new出个provider,其中会有一段判断是否为GenericService的逻辑,此处如果泛化类,后面地址中methods会为*、generic为true,否则就是找出实际接口的方方法名字,generic为false。泛化类大家可能有使用过,其特点入参和反参都可用map,调用只需指定方法名,不需要依赖实际接口,写个插曲看下使用:

 //1.server端配置
    <dubbo:service interface="com.alibaba.dubbo.kai.api.HelloApi" ref="helloApi" version="0.0"   generic="true"/>
........
//2.client配置
    <dubbo:reference interface="com.alibaba.dubbo.kai.api.HelloApi" id="helloApi2" protocol="dubbo" check="true"  version="0.0" timeout="10000" generic="true"/>
.......
//3.客户端调用,不需要依赖接口,直接使用
 public static void main(String[] args) throws IOException {
        ClassPathXmlApplicationContext classPathXmlApplicationContext=new ClassPathXmlApplicationContext("dubbo-client.xml");
        classPathXmlApplicationContext.start();
        GenericService bean = classPathXmlApplicationContext.getBean(GenericService.class);
        Object sayHello = bean.$invoke("sayHello", null, null);
        System.out.println(sayHello);
        System.in.read();
    }

回到正题,此处判断泛化赋值,为的都是生成最终的dubbo url。

    protected synchronized void doExport() {
         .....//参数检查整理,没有配置provier时new出一个
        
        checkApplication();
        checkRegistry();
        checkProtocol();
        appendProperties(this);
        //检查mock相关注入
        checkStubAndMock(interfaceClass);
        if (path == null || path.length() == 0) {
            path = interfaceName;
        }
        //进行发布方法
        doExportUrls();
    }

继续看代码,此时略过一系列检查,最终进入了ServiceConfig的doExportUrls中,开始做发布服务前的准备。首先loadRegistries方法会根据registry的配置生成对应需要注册的url:

  //进行发布
    private void doExportUrls() {
        List<URL> registryURLs = loadRegistries(true);
        for (ProtocolConfig protocolConfig : protocols) {
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

此时的loadRegistries生成的地址(我们只配置了一个注册中心,所以只有一个):

registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=kai-nina-server&dubbo=2.0.0&file=/Users/kai.yang/Documents/学习/dubbo&pid=50109&registry=zookeeper&timestamp=1540349411225

注意,此处协议头是registry,并不是我们配置zookeeper,

 protected List<URL> loadRegistries(boolean provider) {
                    .......
                    //此处为zookeeper协议头
                    List<URL> urls = UrlUtils.parseURLs(address, map);
                    for (URL url : urls) {
                        url = url.addParameter(Constants.REGISTRY_KEY, url.getProtocol());
                        //最终的url都是registry协议头
                        url = url.setProtocol(Constants.REGISTRY_PROTOCOL);
                        if ((provider && url.getParameter(Constants.REGISTER_KEY, true))
                                || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) {
                            registryList.add(url);
                        }
                    }
                }
            }
        }
        return registryList;
    }

大家可以记下,原有地址是

zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=kai-nina-server&dubbo=2.0.0&file=/Users/kai.yang/Documents/学习/dubbo&pid=52976&timestamp=1540349933667

此方法返回所有需要注册的注册中心地址。继续,进入 doExportUrlsFor1Protocol(protocolConfig, registryURLs)方法,显而易见,此方法会循环发布每个protocol,并注册到所有注册中心。
doExportUrlsFor1Protocol方法第一步会判断上面赋值到generic,当不是generic时,dubbo会给你生成一个wapper,类似泛化类当包装类,

    private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {

        .....
        //判断是否为返泛化类,泛化类则生成wrapper,泛化所有方法名,否则methods为*
        if (ProtocolUtils.isGeneric(generic)) {
            map.put("generic", generic);
            map.put("methods", Constants.ANY_VALUE);
        } else {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put("revision", revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("NO method found in service interface " + interfaceClass.getName());
                map.put("methods", Constants.ANY_VALUE);
            } else {
                map.put("methods", StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
            }
            ......
        //获取ip,port
        String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
        Integer port = this.findConfigedPorts(protocolConfig, name, map);
      //生成服务的url,dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.HelloApi?anyhost=true&application=kai-nina-server&bind.ip=172.18.166.201&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.HelloApi&methods=sayHello,sayNihao&pid=63104&revision=0.0&server=netty4&side=provider&timestamp=1540352013631&version=0.0

        URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);

        if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                .hasExtension(url.getProtocol())) {
            url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
                    .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
        }
            //最终发布,下面介绍
      .....
    }

请看生成wrapper泛化类示例:

public class Wrapper0 extends Wrapper{
    public static String[] pns;
    public static java.util.Map pts;
    public static String[] mns = new String[]{"sayHello"};
    public static String[] dmns = new String[]{"sayHello"};
    public static Class[] mts0;

    public String[] getPropertyNames() {
        return pns;
    }

    public String[] getDeclaredMethodNames() {
        return dmns;
    }

    public String[] getMethodNames() {
        return mns;
    }


    public Class getPropertyType(String n) {
        return (Class) pts.get(n);
    }


    public boolean hasProperty(String n) {
        return pts.containsKey(n);
    }

    public void setPropertyValue(Object o, String n, Object v) {
        com.alibaba.dubbo.kai.api.HelloApi w;
        try {
            w = ((com.alibaba.dubbo.kai.api.HelloApi) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + n + "\" filed or setter method in class com.alibaba.dubbo.kai.api.HelloApi.");
    }

    public Object getPropertyValue(Object o, String n) {
        com.alibaba.dubbo.kai.api.HelloApi w;
        try {
            w = ((com.alibaba.dubbo.kai.api.HelloApi) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchPropertyException("Not found property \"" + n + "\" filed or setter method in class com.alibaba.dubbo.kai.api.HelloApi.");
    }

    public Object invokeMethod(Object o, String n, Class[] p, Object[] v) throws java.lang.reflect.InvocationTargetException {
        com.alibaba.dubbo.kai.api.HelloApi w;
        try {
            w = ((com.alibaba.dubbo.kai.api.HelloApi) o);
        } catch (Throwable e) {
            throw new IllegalArgumentException(e);
        }
        try {
            //和cglib写法类似,不通过反射调用,效率更高
            if ("sayHello".equals(n) && p.length == 0) {
                return  w.sayHello();
            }
        } catch (Throwable e) {
            throw new java.lang.reflect.InvocationTargetException(e);
        }
        throw new com.alibaba.dubbo.common.bytecode.NoSuchMethodException("Not found method \"" + n + "\" in class com.alibaba.dubbo.kai.api.HelloApi.");
    }


}

我们回头看下服务发布对象图,此wrapper为底层实现类,直接引用实际类进行调用,这中包装方式是不是有点像cglib代理当实现。
下一步dubbo调用findConfigedHosts方法获取本机服务器ip(非127.0.0.1的,可以学习下),findConfigedPorts获取要发布的port,然后根据这一些列参数最终生成了服务地址:

dubbo://172.18.166.201:20880/com.alibaba.dubbo.kai.api.HelloApi?anyhost=true&application=kai-nina-server&bind.ip=172.18.166.201&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.HelloApi&methods=sayHello,sayNihao&pid=63104&revision=0.0&server=netty4&side=provider&timestamp=1540352013631&version=0.0

有地址了,是不是该发布了,继续:

   String scope = url.getParameter(Constants.SCOPE_KEY);
        //配置为none不暴露
        if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {

            //配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
            if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
                exportLocal(url);
            }
            //如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
            if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
                if (logger.isInfoEnabled()) {
                    logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                }
                if (registryURLs != null && registryURLs.size() > 0
                        && url.getParameter("register", true)) {
                    for (URL registryURL : registryURLs) {
                        //发布远程服务
                        Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
                        Exporter<?> exporter = protocol.export(invoker);
                        exporters.add(exporter);
                    }
                } else {
                    //没有配置注册中心时,只发布不注册
                    Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
                    Exporter<?> exporter = protocol.export(invoker);
                    exporters.add(exporter);
                }
            }
        }

scope参数就不再解释了,dubbo会先发布本地服务,表示在一个jvm中,可以引用不通过远程服务(现在版本默认都会开启,大图有中详细流程),然后开始发布远程服务,我们直接调到远程发布都核心代码(本地发布可以参考大图,逻辑类似,只是本地会经过dubbo的层层filer生成最终的invoker,此时invoker将是通过dubbo包装过的):

  Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
  Exporter<?> exporter = protocol.export(invoker);
  exporters.add(exporter);

重点来了!看方法名意思是dubbo通过代理工厂生成一个invoker,我们先看入参

  • ref --- 依赖注入的引用
  • interfaceClass --- 引用接口类型
  • url ---服务发布url
    前两个就不赘述,主要看第三个url,此时生成传入的url又是什么呢?
    registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString())方法我们看到是在registryUrl中添加了个map参数,key是export(上面记录的dubbo服务地址),所以此时传入的是registry协议头的注册地址:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=kai-nina-server&dubbo=2.0.0&export=dubbo%3A%2F%2F10.254.1.113%3A20880%2Fcom.alibaba.dubbo.kai.api.HelloApi%3Fanyhost%3Dtrue%26application%3Dkai-nina-server%26bind.ip%3D10.254.1.113%26bind.port%3D20880%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.kai.api.HelloApi%26methods%3DsayHello%2CsayNihao%26pid%3D79497%26revision%3D0.0%26server%3Dnetty4%26side%3Dprovider%26timestamp%3D1540364859493%26version%3D0.0&file=/Users/kai.yang/Documents/学习/dubbo&pid=79497&registry=zookeeper&timestamp=1540364857840

入参解决了,再来看下此时的proxyFactory是什么?

//根据扩展点添加的适配工厂
    private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

那么我们要怎么找到这个类呢?记得dubbo的spi原理的,一定知道此时先看配置文件,找其是否又适配器类,没有的话,看接口是否又适配器方法,此时有,dubbo一定会动态生成一个适配器工厂类来使用(具体内容,可以看上一篇文章),闲言少叙,我们最终会找到此类为:

public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
    public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg2 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg2;
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getInvoker(arg0, arg1, arg2);
    }

    public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
        com.alibaba.dubbo.common.URL url = arg0.getUrl();
      //默认采用javassist工厂
        String extName = url.getParameter("proxy", "javassist");
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
        com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
        return extension.getProxy(arg0);
    }
}

我们发现此时工厂适配器会在你不指定默认proxy的情况下,默认采用javassistFactory(name=javassist的工厂类,配置文件中可以找到),那么我们看看javassistFactory里面做了什么

 public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
//生成对应的泛化wapper,此时class为实现类
 final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//每次返回一个新的代理invoker,实际调用上面的wrapper
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
            }
        };
}

到此,模型图中最下面三个已出世


服务发布invoker模型

当然如果你仔细看了我的大图的话,会发现此时少了一个步骤:


StubProxyFactoryWrapper

那这个StubProxyFactoryWrapper是哪来的呢?对了,还是靠dubbo的扩展机制,记得上篇介绍了createExtension(String name)方法,此时会找到配置文件中的所有的wrapper类进行层层包装,不过此类在发布过程中没有实际作用,所以此地忽略。

获取到invoker了,下面进入最最最核心的发布了!

  Exporter<?> exporter = protocol.export(invoker);

先思考下,protocol现在是哪个类?熟悉套路的朋友们一定猜到了,此处为Protocol$Adaptive类。上篇中我们专门解释过此类的由来,这下用到了。还记得其中关键代码:

       String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
       if (extName == null)
           throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
       com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);

那此时适配到的extension应该是哪个类呢?哈哈,可能有人说是DubboProtocol,或者RegistryProtocol,其实此时获取到的是个层层wrapper包装后的RegistryProtocol,再看眼配置文件

registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

其中有两个wrapper,所以此处会先进过ProtocolFilterWrapper和ProtocolListenerWrapper最后才会到RegistryProtocol中,当然两个wrapper的export方法都屏蔽了REGISTRY_PROTOCOL的协议,所以暂且跳过这两类,
直接进入RegistryProtocol中。

  if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
  //registry策略直接进行调用           
           return protocol.export(invoker);
    }

看下RegistryProtocol:

        //重点一、生成最终的exporter
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //重点二、注册provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        registry.register(registedProviderUrl);
        // 重点三、订阅数据
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
  • 重点一 生成最终的exporter
    请看大图,进入doLocalExport()方法后会先创建个invoker的委托类
    new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker))
    此时出入的url我们解析出是:
dubbo://10.254.1.113:20880/com.alibaba.dubbo.kai.api.HelloApi?anyhost=true&application=kai-nina-server&bind.ip=10.254.1.113&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.HelloApi&methods=sayHello,sayNihao&pid=88182&revision=0.0&server=netty4&side=provider&timestamp=1540366683633&version=0.0
  

下面创建了个new ExporterChangeableWrapper()类用来做为缓存的可变动重新发布包装类,而其入参有两个:

  • originInvoker --->我们上一步就生成好的invoker,原始invoker
  • protocol.export(invokerDelegete)-->目标协议发布的invoker

此时的protocol肯定还是Protocol$Adaptive,按照上面的分析,此时得到的最终类肯定是DubboProtocol了。当然结果是这个,过程一样很崎岖,还有两个wrapper等这个呢,先看ProtocolFilterWrapper:

  Invoker<T> last = invoker;
        //获取所有配置Activate的此group=provider下的filter类,形成filer链
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                   .....
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                  .....
            }
        }

这就是dubbo的过滤器调用链的形成,在后面的调用过程中都会用到。下面ProtocolListenerWrapper类的export()方法:


    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
         ....
      //ListenerExporterWrapper包装了监听器进去
        return new ListenerExporterWrapper<T>(protocol.export(invoker),
                Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
                        .getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
    }

再看一下ListenerExporterWrapper:

    public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
        if (exporter == null) {
            throw new IllegalArgumentException("exporter == null");
        }
        this.exporter = exporter;
        this.listeners = listeners;
        if (listeners != null && listeners.size() > 0) {
            RuntimeException exception = null;
            for (ExporterListener listener : listeners) {
                if (listener != null) {
                    try {
                      //监听者模式,进行发布完后的事件处理
                        listener.exported(this);
                    } catch (RuntimeException t) {
                        logger.error(t.getMessage(), t);
                        exception = t;
                    }
                }
            }
            if (exception != null) {
                throw exception;
            }
        }
    }

显然这是一个后置的监听者插入口,那就和我们的发布过程关系不大,先跳过,继续向下走,终于该到我们的DubboProtocol发布了:

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // export service. 缓存key
        String key = serviceKey(url);
        //重点 一 dubbo 的expoter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
        exporterMap.put(key, exporter);

        //export an stub service for dispaching event,
        Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }
            } else {
                stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
            }
        }
      // 重点二 开启服务
        openServer(url);

        return exporter;
    }

看重点(stub存根不影响主流程,此处略过),

  • new DubboExporter<T>(invoker, key, exporterMap)-->生成了一个对应dubbo的Exporter

  • openServer(url) -->开启socket服务
    这里边DubboExporter对象为最终返回对象,它缓存了我们的invoker,下面重点就是openServer去发布服务了。

    private void openServer(URL url) {
        // find server.
        String key = url.getAddress();
        //client 也可以暴露一个只有server可以调用的服务。
        boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
        if (isServer) {
            ExchangeServer server = serverMap.get(key);
            if (server == null) {
                //开启新服务
                serverMap.put(key, createServer(url));
            } else {
                //server支持reset,配合override功能使用
                server.reset(url);
            }
        }
    }

开启服务,进到了createServer(url)方法内

private ExchangeServer createServer(URL url) {
     ....
      //开通服务方法
            server = Exchangers.bind(url, requestHandler);
      ....
    }

此处重点落到了Exchangers.bind(url, requestHandler)中,首先分析入参

  • url--》dubbo协议头服务地址,前面透出过来
  • requestHandle--》服务通信调用的最终处理类,让我看下此类
 private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
                Invoker<?> invoker = getInvoker(channel, inv);
                //如果是callback 需要处理高版本调用低版本的问题
                if (Boolean.TRUE.toString().equals(inv.getAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || methodsStr.indexOf(",") == -1) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: " + message == null ? null : (message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

        @Override
        public void connected(Channel channel) throws RemotingException {
            invoke(channel, Constants.ON_CONNECT_KEY);
        }

        @Override
        public void disconnected(Channel channel) throws RemotingException {
            if (logger.isInfoEnabled()) {
                logger.info("disconected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
            }
            invoke(channel, Constants.ON_DISCONNECT_KEY);
        }

        private void invoke(Channel channel, String methodKey) {
            Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
            if (invocation != null) {
                try {
                    received(channel, invocation);
                } catch (Throwable t) {
                    logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                }
            }
        }

        private Invocation createInvocation(Channel channel, URL url, String methodKey) {
            String method = url.getParameter(methodKey);
            if (method == null || method.length() == 0) {
                return null;
            }
            RpcInvocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
            invocation.setAttachment(Constants.PATH_KEY, url.getPath());
            invocation.setAttachment(Constants.GROUP_KEY, url.getParameter(Constants.GROUP_KEY));
            invocation.setAttachment(Constants.INTERFACE_KEY, url.getParameter(Constants.INTERFACE_KEY));
            invocation.setAttachment(Constants.VERSION_KEY, url.getParameter(Constants.VERSION_KEY));
            if (url.getParameter(Constants.STUB_EVENT_KEY, false)) {
                invocation.setAttachment(Constants.STUB_EVENT_KEY, Boolean.TRUE.toString());
            }
            return invocation;
        }
    };

熟悉netty编程的同学应该会感动眼熟,这个handler和netty中处理message的channelHandler很相似。没错,这个handler最终就是会被包在netty的channelHandler中的实际消息处理类。
我们此处不分析处理类,调用过程会分析。我们进入bind方法:

public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
      ....
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).bind(url, handler);
    }

dubbo在这里去getExchander(url),用来方便用户可以自己扩展通信框架,继续

    public static Exchanger getExchanger(URL url) {
        String type = url.getParameter(Constants.EXCHANGER_KEY, Constants.DEFAULT_EXCHANGER);
        return getExchanger(type);
    }

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

在没有指定exchanger时,dubbo默认实现
DEFAULT_EXCHANGER = "header"
所以此处的exchanger从配置文件中我们可以找到HeaderExchanger

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

此处又new出了个new HeaderExchangeServer(),入参通过Transporters的静态方法生成了一个server

Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))

继续

    public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            handler = new ChannelHandlerDispatcher(handlers);
        }
    //一样的套路,将会获取ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()类,此处获取到Transporter$Adaptive
        return getTransporter().bind(url, handler);
    }

让我们看下获取的Transporter$Adaptive类:

public class Transporter$Adaptive implements com.alibaba.dubbo.remoting.Transporter {
    public com.alibaba.dubbo.remoting.Client connect(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        String extName = url.getParameter("client", url.getParameter("transporter", "netty"));
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([client, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.connect(arg0, arg1);
    }

    public com.alibaba.dubbo.remoting.Server bind(com.alibaba.dubbo.common.URL arg0, com.alibaba.dubbo.remoting.ChannelHandler arg1) throws com.alibaba.dubbo.remoting.RemotingException {
        if (arg0 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg0;
        // 不指定默认为netty
        String extName = url.getParameter("server", url.getParameter("transporter", "netty"));
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Transporter) name from url(" + url.toString() + ") use keys([server, transporter])");
        com.alibaba.dubbo.remoting.Transporter extension = (com.alibaba.dubbo.remoting.Transporter) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension(extName);
        return extension.bind(arg0, arg1);
    }
}

显然在不指定任务通讯协议时,采用netty协议,让我们进入NettyTransporter

 public class NettyTransporter implements Transporter {

   public static final String NAME = "netty4";
   
   public Server bind(URL url, ChannelHandler listener) throws RemotingException {
       return new NettyServer(url, listener);
   }

   public Client connect(URL url, ChannelHandler listener) throws RemotingException {
       return new NettyClient(url, listener);
   }

}

明显此处调用new了一个NettyServer(url,listener),进入NettyServer,构造方法中调用父类的构造方法,抽象类处理完基本参数后,调用子类实现的模版方法的doOpen(),此处熟悉的netty操作映入眼帘(终于发布服务了!)。

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
       super(url, handler);
       localAddress = getUrl().toInetSocketAddress();

       String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
       int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
       if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
           bindIp = NetUtils.ANYHOST;
       }
       bindAddress = new InetSocketAddress(bindIp, bindPort);
       this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
       this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
       try {
       //子类实现模版方法,开启服务
           doOpen();
           if (logger.isInfoEnabled()) {
               logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
           }
       } catch (Throwable t) {
           throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                   + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
       }
       //fixme replace this with better method
       DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
       executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
   }
 public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        //调用父类的抽象方法
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    //模版方法
    @Override
    protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();

        bootstrap = new ServerBootstrap();

        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                //该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                //对应于套接字选项中的SO_REUSEADDR,这个参数表示允许重复使用本地地址和端口
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                //分配Buf,管理类指定
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

而其消息处理的channelHander为,此处注意传入实际消息处理类为this!说明NetterServer一定实现了ChannelHander,让我们找找

  final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

果然:


nettyServer类图

在其AbstractPeer父抽象类中,我们看到

public abstract class AbstractPeer implements Endpoint, ChannelHandler {
    //实际处理消息handler
    private final ChannelHandler handler;

    private volatile URL url;

    // closing closed分别表示关闭流程中、完成关闭
    private volatile boolean closing;

    private volatile boolean closed;

那此刻的这个ChannelHandler又是谁呢?让我们回到NettyServer的构造

  super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));

注意这个ChannelHandlers.wrap()方法,这就是我们最终生成的handler!
那他又是什么东西呢?

public class ChannelHandlers {

    private static ChannelHandlers INSTANCE = new ChannelHandlers();

    protected ChannelHandlers() {
    }

    public static ChannelHandler wrap(ChannelHandler handler, URL url) {
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected static ChannelHandlers getInstance() {
        return INSTANCE;
    }

    static void setTestingChannelHandlers(ChannelHandlers instance) {
        INSTANCE = instance;
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
}

发现他又又是层层包装,每一层只处理一部分任务

  • MultiMessageHandler 处理Multi类型,一般如文件流
  • HeartbeatHandler包装了心跳请求消息
  • ExtensionLoader.getExtensionLoader(Dispatcher.class)
    .getAdaptiveExtension().dispatch()最终得到类
public class Dispatcher$Adaptive implements com.alibaba.dubbo.remoting.Dispatcher {

    public com.alibaba.dubbo.remoting.ChannelHandler dispatch(com.alibaba.dubbo.remoting.ChannelHandler arg0, com.alibaba.dubbo.common.URL arg1) {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = url.getParameter("dispatcher", url.getParameter("dispather", url.getParameter("channel.handler", "all")));
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.Dispatcher) name from url(" + url.toString() + ") use keys([dispatcher, dispather, channel.handler])");
        com.alibaba.dubbo.remoting.Dispatcher extension = (com.alibaba.dubbo.remoting.Dispatcher) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Dispatcher.class).getExtension(extName);
        return extension.dispatch(arg0, arg1);
    }
}

所以默认会生成AllDispatcher,然后

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

AllDispatcher委派了AllChannelHandler处理消息任务。
我去,终于找到最终处理消息任务的handler。。。下面让我们整体看下dubbo中exporter服务发布的相关实体:


dubbo服务端模型

图中大致画出了ChannelHander从属于Server,invoker从属于Exporter,中线通过protocol进行管理,最终回归到ServiceConfig中。到此,服务终于发布出来了。
但你先别停,还有个注册订阅呢。。。。。。
继续看大图,注册逻辑也在RegistryProtocol中

 public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        //export invoker
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
        //registry provider
        final Registry registry = getRegistry(originInvoker);
        final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
        //注册zookeeper
        registry.register(registedProviderUrl);
        // 订阅override数据
        // FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

首先 getRegistry(originInvoker)获取了Registry实体,又是老套路

 private Registry getRegistry(final Invoker<?> originInvoker) {
        URL registryUrl = originInvoker.getUrl();
        if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
            String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);
//重新赋值协议头,将REGISTRY_KEY=registry的参数取出,此处为zookeeper
            registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
        }
        return registryFactory.getRegistry(registryUrl);
    }

从registryFactory获取实体,这工厂有是谁?

public class RegistryProtocol implements Protocol {
     ....
    private RegistryFactory registryFactory;
    private ProxyFactory proxyFactory;

我们发现registryFactory是成员变量,根据上节spi知识,此处会通过 injectExtension(T instance) 方法自动注入,结果为RegistryFactory$Adaptive,url已被改成zookeeper。你懂得,想必获得的一定是个ZookeeperRegistry类(因为此时协议头已被改为zookeeper,根据扩展规则,会找到ZookeeperRegistry)。
没错,就是他。
明白了ZookeeperRegistry是谁,就简单了,直接看代码,先回调父类FailbackRegistry的 register(URL url):

 public void register(URL url) {
        if (destroyed.get()){
            return;
        }
        super.register(url);
        failedRegistered.remove(url);
        failedUnregistered.remove(url);
        try {
            // 模版方法,向服务器端发送注册请求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // 如果开启了启动时检测,则直接抛出异常,对应标签check
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 将失败的注册请求记录到失败列表,定时重试
            failedRegistered.add(url);
        }
    }

方法中做了错误重试的容错机制,还有对配置文件中服务端check参数对使用(是否启动检查),然后还是一个模版方法,调用具体类对实现,当然调用的是zookeeperRegister的doRegister方法:

protected void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

代码又都熟悉了吧,zkClient直接添加地址,地址为/dubbo/接口名/服务url,如:

/dubbo/com.alibaba.dubbo.kai.api.HelloApi/providers/dubbo%3A%2F%2F192.168.199.130%3A20880%2Fcom.alibaba.dubbo.kai.api.HelloApi%3Fanyhost%3Dtrue%26application%3Dkai-nina-server%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.kai.api.HelloApi%26methods%3DsayHello%2CsayNihao%26pid%3D61946%26revision%3D0.0%26server%3Dnetty4%26side%3Dprovider%26timestamp%3D1540390897149%26version%3D0.0

到这里,注册结束。按照我们使用zookeeper的习惯,是不是该开始添加监听了?
对,dubbo也是一样的逻辑。

         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

首先获得了个overrideSubscribeUrl,用来做为缓存监听器key使用,url为

provider://192.168.199.130:20880/com.alibaba.dubbo.kai.api.HelloApi?anyhost=true&application=kai-nina-server&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.kai.api.HelloApi&methods=sayHello,sayNihao&pid=70508&revision=0.0&server=netty4&side=provider&timestamp=1540416661404&version=0.0

发现上边又一个FIXME的注释,说明当前版本有此问题,还并没有解决(因为都改为了provider协议,所以同一jvm引用统一服务时,监听器会发生覆盖),但我们暂时没有重复服务,还没有此问题。
最后到了registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener)方法。
和 register(URL url)方法写法类似,一个失败重试模版方法,最后调用zookeeperRegistry到 doSubscribe(url, listener)方法:

protected void doSubscribe(final URL url, final NotifyListener listener) {
      try {
          if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            ...
        //略过此逻辑,泛化支持
          } else {
              List<URL> urls = new ArrayList<URL>();
              for (String path : toCategoriesPath(url)) {
                  ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
                  if (listeners == null) {
                      zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
                      listeners = zkListeners.get(url);
                  }
                  ChildListener zkListener = listeners.get(listener);
                  if (zkListener == null) {
                      listeners.putIfAbsent(listener, new ChildListener() {
                          public void childChanged(String parentPath, List<String> currentChilds) {
                              ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                          }
                      });
                      zkListener = listeners.get(listener);
                  }
                  zkClient.create(path, false);
                  //添加节点监听
                  List<String> children = zkClient.addChildListener(path, zkListener);
                  if (children != null) {
                      urls.addAll(toUrlsWithEmpty(url, path, children));
                  }
              }
              notify(url, listener, urls);
          }
      } catch (Throwable e) {
          throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
      }
  }

添加订阅监听方法就不赘述(不熟悉都朋友可以学习下zk操作),但注意服务端监听的地址为/dubbo/com.alibaba.dubbo.kai.api.HelloApi/configurators,表示你如果在admin后台更新相关配置时,会监听到,同步服务端。
最后dubbo调用了 notify(url, listener, urls)方法,使用监听者模式来通知更新操作。

  protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // 将失败的通知请求记录到失败列表,定时重试
            Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
            if (listeners == null) {
                failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
                listeners = failedNotified.get(url);
            }
            listeners.put(listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

一样都套路,模版模式,调用doNotify(url, listener, urls)方法:

      protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        ......
        //调用监听者的通知方法       
        for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
            String category = entry.getKey();
            List<URL> categoryList = entry.getValue();
            categoryNotified.put(category, categoryList);
            saveProperties(url);
            listener.notify(categoryList);
        }
    }

看,最终又通知会了一开始创建的监听者OverrideListener,其为RegistryProtocol的内部类:
private class OverrideListener implements NotifyListener {

    private final URL subscribeUrl;
    private final Invoker originInvoker;

    public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
        this.subscribeUrl = subscribeUrl;
        this.originInvoker = originalInvoker;
    }

    /**
     * @param urls 已注册信息列表,总不为空,含义同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
     */
    public synchronized void notify(List<URL> urls) {
        logger.debug("original override urls: " + urls);
        List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
        logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
        //没有匹配的
        if (matchedUrls.isEmpty()) {
            return;
        }

        List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);

        final Invoker<?> invoker;
        if (originInvoker instanceof InvokerDelegete) {
            invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
        } else {
            invoker = originInvoker;
        }
        //最原始的invoker
        URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
        String key = getCacheKey(originInvoker);
        ExporterChangeableWrapper<?> exporter = bounds.get(key);
        if (exporter == null) {
            logger.warn(new IllegalStateException("error state, exporter should not be null"));
            return;
        }
        //当前的,可能经过了多次merge
        URL currentUrl = exporter.getInvoker().getUrl();
        //与本次配置merge的
        URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
        if (!currentUrl.equals(newUrl)) {
            RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
            logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
        }
    }

    private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
        List<URL> result = new ArrayList<URL>();
        for (URL url : configuratorUrls) {
            URL overrideUrl = url;
            // 兼容旧版本
            if (url.getParameter(Constants.CATEGORY_KEY) == null
                    && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
                overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
            }

            //检查是不是要应用到当前服务上
            if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
                result.add(url);
            }
        }
        return result;
    }

    //合并配置的url
    private URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
        for (Configurator configurator : configurators) {
            url = configurator.configure(url);
        }
        return url;
    }
}

最主要的notify方法,逻辑合情合理了,判断了从zookeeper变动中获取到当前的变动生成新的url,判断和以前的exporter的url是否一致,不一致,则调用RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);方法改变服务:

  private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
        String key = getCacheKey(originInvoker);
        final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
        if (exporter == null) {
            logger.warn(new IllegalStateException("error state, exporter should not be null"));
        } else {
            final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
            exporter.setExporter(protocol.export(invokerDelegete));
        }
    }

看,这也就对上了ExporterChangeableWrapper这层包装的作用,用来当服务配置发生改变时,更新服务端exporter。
到这里,dubbo的服务发布过程我们终于分析完了。。。。。。。。
回顾下,我们印象最深的设计

  • 扩展spi的灵活使用,似的dubbo的可扩展行更强
  • 大量的模版模式的使用,抽象思想
  • 领域模型设计,所有发布对象都在Protocol中进行管理
  • 层层包装,每一层都又自己的指责,互不影响,使用可灵活的进行添加新的扩展层。
    等等等等,你还有什么喜欢的地方,可以留言一起讨论学习下,稍后一起看dubbo源码分析服务引用端

下一篇     dubbo源码分析服务引用
首页     dubbo源码欣赏简介

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容