08-dubbo服务引入和调用源码分析

开头

上一节讲到了服务的导出,即服务端如何将自己的接口提供成dubbo服务的过程,这一节就是讲服务的调用了,消费端是如何调用服务端的接口的呢?

主要流程

1.spring启动时,会给@Reference注解的属性赋值,赋值的时候会调用referenceBean.get方法
2.准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的
3.在注册中心初始化服务目录RegistryDirectory
4.将消费端信息注册到zk
5.构造路由链、服务订阅
6.根据服务目录得到最终的invoker对象MockClusterInvoker
8.最终调用MockClusterInvoker.invoke方法执行请求发送数据,里面调用了netty.send方法
9.通过netty channel,执行nettyServerHandler方法处理请求和结果返回

源码流程
流程图地址:https://www.processon.com/view/link/60e02b8d637689510d6c4184

服务引入.jpg

1.程序入口

在spring启动的时候,会对@Reference注解的属性赋值,生成ReferenceBean,在ReferenceAnnotationBeanPostProcessor.doGetInjectedBean方法中
可以看到,最终调用了 referenceBean.get()方法,这个方法最后返回了一个ref对象,这个ref对象看到最后就是一个Invoke代理对象,也就是主要流程的第二步,准备初始化invoker对象,MockClusterInvoker,生成这个是最终目的

@Override
    protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
                                       InjectionMetadata.InjectedElement injectedElement) throws Exception {

    
        return getOrCreateProxy(referencedBeanName, referenceBeanName, referenceBean, injectedType);
    }
 private Object getOrCreateProxy(String referencedBeanName, String referenceBeanName, ReferenceBean referenceBean, Class<?> serviceInterfaceType) {
        if (existsServiceBean(referencedBeanName)) { // If the local @Service Bean exists, build a proxy of ReferenceBean
            return newProxyInstance(getClassLoader(), new Class[]{serviceInterfaceType},
                    wrapInvocationHandler(referenceBeanName, referenceBean));
        } else {                                    // ReferenceBean should be initialized and get immediately
            // 这里
            return referenceBean.get();
        }
    }

public synchronized T get() {
        checkAndUpdateSubConfigs();

        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {
            // 入口
            init();
        }
        return ref;  // Invoke代理
    }

2.准备初始化invoker对象,MockClusterInvoker

由init()->createProxy(map),这个方法太长了,留了三个主要的方法:
1.加载注册中心url地址

  1. invoker = REF_PROTOCOL.refer调用registry.refer,这里又是spi机制,最终调用了registryProtocol.refer方法
private T createProxy(Map<String, String> map) {
     List<URL> us = loadRegistries(false);
    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
     invoker = CLUSTER.join(new StaticDirectory(u, invokers));

    }

3.在注册中心初始化服务目录RegistryDirectory
留下了主要代码,可以看到这里初始化了一个注册目录,也就是我们最终在zk上看到的consumers节点文件夹。
registry.register(directory.getRegisteredConsumerUrl());这里最终会调用ZookeeperRegistry.doRegister方法,用zk客户端向zk服务端创建节点,将消费端信息注册到zk,可以看到这里创建的是临时节点

private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
      
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);

         
          registry.register(directory.getRegisteredConsumerUrl());
       

    
        directory.buildRouterChain(subscribeUrl);

      
        directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY,
         
    

        return invoker;
    }

 @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

4.构造路由链、服务订阅
directory.buildRouterChain(subscribeUrl);
directory.subscribe(subscribeUrl.addParameter(CATEGORY_KEY

3.生成最终的invoker对象MockClusterInvoker

    Invoker invoker = cluster.join(directory);

这里又是SPI机制,由于Cluster有一个包装类,所以会先调用MockClusterWrapper.join方法,原理可参照我之前单独写的一节SPI源码分析
可以看到,这里最终生成MockClusterInvoker

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

4.服务调用

第3步骤中生成了一个MockClusterInvoker对象,所以最终调用服务的方法实际上就是调用MockClusterInvoker.invoke方法,会依次调用AbstractClusterInvoker.invoke->FailoverClusterInvoker.doInvoke->DubboInvoker.doInvoke

@Override
    public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;

        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
            //no mock
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            if (logger.isWarnEnabled()) {
                logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //fail-mock
            try {
                result = this.invoker.invoke(invocation);

                //fix:#4585
                if(result.getException() != null && result.getException() instanceof RpcException){
                    RpcException rpcException= (RpcException)result.getException();
                    if(rpcException.isBiz()){
                        throw  rpcException;
                    }else {
                        result = doMockInvoke(invocation, rpcException);
                    }
                }

            }
        return result;
    }

我们直接看DubboInvoker.doInvoke方法
1.首先会拿到一个 ExchangeClient客户端
2.异步请求currentClient.request,最终调用HeaderExchangeChannel.request->调用netty的方法channel.send

@Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

 
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
           
            currentClient = clients[index.getAndIncrement() % clients.length];
        }

        try {
         
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);

            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);

                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);

                CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);

                asyncRpcResult.subscribeTo(responseFuture);

                return asyncRpcResult;
            }
        }
    }

5.服务请求处理

由于使用的netty通信,所有客户端发送消息后,netty服务端会在NettyServerHandler.channelRead中接到消息,这里调用了很多handler,就不展开看了。
1.MultiMessageHandler
2.HeartbeatHandler
3.AllChannelHandler
4.DecodeHandler
5.HeaderExchangeHandler
6.ExchangeHandlerAdapter

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            handler.received(channel, msg);
        } finally {

            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }

总结

服务的引入的目的就是在消费端@Reference标注一个服务端接口,这个注解会去将消费端消息注册到zk,最终会生成一个调用服务端的代理对象invoker,消费端调用服务端接口的时候最后调用的就是invoker.invoke方法,而这个方法采用的通信框架是netty,实现了远程调用。
dubbo源码写的很好,比如里面的SPI机制运用的很巧妙,还有一些抽象工厂设计模式等,源码值得品读。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,406评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,732评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,711评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,380评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,432评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,301评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,145评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,008评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,443评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,649评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,795评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,501评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,119评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,731评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,865评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,899评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,724评论 2 354