Dubbo服务引用与调用原理

1.概述

Dubbo服务暴露原理一文已经详细介绍了服务暴露的原理以及过程,本文会对剩下的服务引用和调用的原理及其过程进行详细介绍。在服务消费方中,接口并不包含具体的实现逻辑,具体实现都放在服务提供方,但是当我们在调用接口的时候,却能做到与调用本地方法没有区别,原因在于调用方提供了一个代理类,在运行时与该接口绑定,当接口中的方法被调用时,实际是作用于该代理类上,代理类封装了远程调用的逻辑,把请求参数发送给远程服务提供方,获取结果后再返回。因此在调用具体方法之前,需要获取到这个代理对象,这便是服务引用的过程。

服务调用动态代理.png

Dubbo在服务引用的时候还会订阅zookeeper以保证能时刻监听provider节点新增或者减少的信息,实现动态感知,对应subscribe流程。
RPC原理.png

2.Dubbo服务引用

dubbo-demo-consumer配置文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

      http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
  -->
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://dubbo.apache.org/schema/dubbo"
       xmlns="http://www.springframework.org/schema/beans"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
       http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">

    <!-- consumer's application name, used for tracing dependency relationship (not a matching criterion),
    don't set it same as provider -->
    <dubbo:application name="demo-consumer"/>

    <!-- use multicast registry center to discover service -->
    <dubbo:registry address="zookeeper://127.0.0.1:2181"/>

    <!-- generate proxy for the remote service, then demoService can be used in the same way as the
    local regular interface -->
    <dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService"/>
</beans>

Java代码示例如下:

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.demo.consumer;

import com.alibaba.dubbo.demo.DemoService;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class Consumer {

    public static void main(String[] args) {
        //Prevent to get IPV6 address,this way only work in debug mode
        //But you can pass use -Djava.net.preferIPv4Stack=true,then it work well whether in debug mode or not
        System.setProperty("java.net.preferIPv4Stack", "true");
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[]{"META-INF/spring/dubbo-demo-consumer.xml"});
        context.start();
        /*服务引用*/
        DemoService demoService = (DemoService) context.getBean("demoService"); // get remote service proxy

        while (true) {
            try {
                Thread.sleep(1000);
                /*服务调用*/
                String hello = demoService.sayHello("world"); // call remote method
                System.out.println(hello); // get result

            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
        }

    }
}

整个服务引用的流程如下图所示:


dubbo服务引用时序图.png

可以概括为以下四个步骤:

  • 创建invoker对象,并对其加工(在后续的服务调用中是以invoker对象为载体完成的)
  • 建立与zookeeper的连接并订阅注册中心消息
  • 初始化Netty Client并建立与服务提供方Netty Server的连接
  • 返回代理对象以便服务调用时使用
2.1创建并加工invoker

ReferenceConfig类中refprotocol属性通过Dubbo SPI机制初始化,被赋值为Protocol$Adaptive

    private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

当调用refer方法时,会执行如下逻辑,其中extensionProtocolListenerWrapper。此处的设计与服务暴露中是一致的,因为ProtocolListenerWrapperProtocolFilterWrapperProtocol接口的包装类,在整个调用过程中会优先调用这两个类的refer方法。

package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;

public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
    ...
    public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        com.alibaba.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
        com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

ProtocolListenerWrapper类实现:

/**
 * ListenerProtocol
 */
public class ProtocolListenerWrapper implements Protocol {
    ...
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /*url.getProtocol() == registry*/
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        ...
    }
}

ProtocolFilterWrapper类实现:

/**
 * ListenerProtocol
 */
public class ProtocolFilterWrapper implements Protocol {
    ...
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /*url.getProtocol() == registry*/
        if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
            return protocol.refer(type, url);
        }
        ...
    }
}

整个流程如下:


registry refer.png
2.2建立zookeeper连接并订阅注册中心信息

registryFactory.getRegistry方法会创建ZookeeperRegistry对象,在ZookeeperRegistry构造函数中会建立consumer与zookeeper的连接,directory.subscribe会订阅注册中心消息,这里最终会调用到CuratorZookeeperClient#addTargetChildListener实现,订阅地址为/dubbo/com.alibaba.dubbo.demo.DemoService/providers即服务暴露时注册的地址。

/**
 * RegistryProtocol
 *
 */
public class RegistryProtocol implements Protocol {
    ...
    @Override
    @SuppressWarnings("unchecked")
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        /**设置registry属性值为dubbo**/
        url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
        /**建立与注册中心的连接**/
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        String group = qs.get(Constants.GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
                    || "*".equals(group)) {
                return doRefer(getMergeableCluster(), registry, type, url);
            }
        }
        return doRefer(cluster, registry, type, url);
    }

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
        URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
                && url.getParameter(Constants.REGISTER_KEY, true)) {
            URL registeredConsumerUrl = getRegisteredConsumerUrl(subscribeUrl, url);
            registry.register(registeredConsumerUrl);
            directory.setRegisteredConsumerUrl(registeredConsumerUrl);
        }
        /*订阅注册中心消息*/
        directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
                Constants.PROVIDERS_CATEGORY
                        + "," + Constants.CONFIGURATORS_CATEGORY
                        + "," + Constants.ROUTERS_CATEGORY));

        Invoker invoker = cluster.join(directory);
        ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
        return invoker;
    }
    ...
}

在注册中心完成消息订阅后,会根据服务暴露时的url转换为invoker对象

服务引用ZookeeperRegistry -_ Protocol$Adaptive.png

因为protocol具有两个包装类,ProtocolListenerWrapperProtocolFilterWrapper,因此整个调用过程会先经过这两个类

/**
 * ListenerProtocol
 */
public class ProtocolListenerWrapper implements Protocol {
    ...
    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ...
        return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
                Collections.unmodifiableList(
                        ExtensionLoader.getExtensionLoader(InvokerListener.class)
                                .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
    }
    ...
}

ListenerInvokerWrapper类的设计与在服务暴露中分析的ListenerExporterWrapper类的设计是非常相似的,在构造函数中遍历调用InvokerListenerreferred方法,此处作为扩展点,开发者可以在接口的实现类中加入自己的逻辑也可以定制化实现类以达到监听器的效果,destroy方法同理。

/**
 * ListenerInvoker
 */
public class ListenerInvokerWrapper<T> implements Invoker<T> {
    ...
    public ListenerInvokerWrapper(Invoker<T> invoker, List<InvokerListener> listeners) {
        if (invoker == null) {
            throw new IllegalArgumentException("invoker == null");
        }
        this.invoker = invoker;
        this.listeners = listeners;
        if (listeners != null && !listeners.isEmpty()) {
            for (InvokerListener listener : listeners) {
                if (listener != null) {
                    try {
                        /*调用InvokerListener接口实现类的referred方法*/
                        listener.referred(invoker);
                    } catch (Throwable t) {
                        logger.error(t.getMessage(), t);
                    }
                }
            }
        }
    }

    ...
    @Override
    public void destroy() {
        try {
            invoker.destroy();
        } finally {
            if (listeners != null && !listeners.isEmpty()) {
                for (InvokerListener listener : listeners) {
                    if (listener != null) {
                        try {
                            /*调用InvokerListener接口实现类的destroyed方法*/
                            listener.destroyed(invoker);
                        } catch (Throwable t) {
                            logger.error(t.getMessage(), t);
                        }
                    }
                }
            }
        }
    }
}

ProtocolFilterWrapper作为过滤器,主要负责包装invoker类,在调用invoke方法的时候会触发Fliter实现类的invoke方法,开发者可以定义自己的Filter以达到扩展框架的作用。

/**
 * ListenerProtocol
 */
public class ProtocolFilterWrapper implements Protocol {
    ...
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {

                    @Override
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }

                    @Override
                    public URL getUrl() {
                        return invoker.getUrl();
                    }

                    @Override
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }

                    /*filter包装invoker对象*/
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }

                    @Override
                    public void destroy() {
                        invoker.destroy();
                    }

                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        ...
        return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
    }

}

整体流程如图:


服务引用 refer.png
2.3初始化Netty Client并建立与服务提供方Netty Server的连接

该阶段目的为与服务提供方建立长连接,成功后会返回ExchangeClient对象并作为Invoker的属性保存,在服务调用阶段,实现接口的代理类直接选择invoker中保存的clients向服务提供者发送网络请求。其中Dubbo协议支持两种模式的网络连接,一种是共享client即所有调用共用一个连接,另一种是创建多个连接。

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {

    private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
    ...
    @Override
    public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);
        // create rpc invoker.
        /**创建dubboInvoker对象,该对象会有ExchangeClient[] client属性存储与server端的长连接**/
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

    /**获取与server端的连接**/
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection
        boolean service_share_connect = false;
        /**获取配置中的connections属性,不配置默认为0**/
        int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
        // if not configured, connection is shared, otherwise, one connection for one service
        if (connections == 0) {
            service_share_connect = true;
            connections = 1;
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            /**connections属性不配置或者配置为0**/
            if (service_share_connect) {
                /**只创建一个公共的客户端**/
                clients[i] = getSharedClient(url);
            } else {
                /**创建多个client,调用时轮流使用**/
                clients[i] = initClient(url);
            }
        }
        return clients;
    }
    ...
}

共享连接的模式下,在一个消费者应用中,使用Dubbo协议进行通讯时会创建一个DubboProtocol类对象,对于公共连接,该对象会使用referenceClientMap这个属性进行维护,其中key为从配置中心获取到的服务提供方的ip:port,value就为创建出来的client。对于每一个rpc接口类来说(在某个特定的协议下,这里我们使用的是Dubbo协议),都会生成一个对应的invoker对象在调用时使用,同时invoker对象中会有属性来保存DubboProtocol提供的这个公共client以便在调用时直接使用,如下图所示:

service reference sharedClient.png

DubboProtocol类中的实现如下所示:

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {

    private final Map<String, ReferenceCountExchangeClient> referenceClientMap = new ConcurrentHashMap<String, ReferenceCountExchangeClient>(); // <host:port,Exchanger>
    ...
    /**
     * Get shared connection
     */
    private ExchangeClient getSharedClient(URL url) {
        /**key=ip:port**/
        String key = url.getAddress();
        /**从缓存中获取client**/
        ReferenceCountExchangeClient client = referenceClientMap.get(key);
        if (client != null) {
            if (!client.isClosed()) {
                /**已经初始化过client,记录的连接数增加1(每关闭一个连接会减少1,直接为0时才能销毁)**/
                client.incrementAndGetCount();
                return client;
            } else {
                referenceClientMap.remove(key);
            }
        }

        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            if (referenceClientMap.containsKey(key)) {
                return referenceClientMap.get(key);
            }

            ExchangeClient exchangeClient = initClient(url);
            /**无client对象,初始化**/
            client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
            /**放入map中缓存**/
            referenceClientMap.put(key, client);
            /**删除懒加载client保存的key,因为这时需要立即建立client与server的连接**/
            ghostClientMap.remove(key);
            locks.remove(key);
            return client;
        }
    }
    ...
}

对于公共client,会使ReferenceCountExchangeClient类包装,其中refenceCount属性会记录连接数,每有一个invoker使用该client就会增加1,其目的是在断开连接时使用,每断开一个连接该值就会减少1,为0时说明无连接,这时client就能够被安全销毁,ReferenceCountExchangeClient类如下所示:

/**
 * dubbo protocol support class.
 */
@SuppressWarnings("deprecation")
final class ReferenceCountExchangeClient implements ExchangeClient {
    ...
    private final URL url;
    /**refenceCount变量记录连接数,每有一个invoker使用shared client就会增加一**/
    private final AtomicInteger refenceCount = new AtomicInteger(0);

    //    private final ExchangeHandler handler;
    private final ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap;
    private ExchangeClient client;


    public ReferenceCountExchangeClient(ExchangeClient client, ConcurrentMap<String, LazyConnectExchangeClient> ghostClientMap) {
        this.client = client;
        /**初始化时增加到1**/
        refenceCount.incrementAndGet();
        this.url = client.getUrl();
        if (ghostClientMap == null) {
            throw new IllegalStateException("ghostClientMap can not be null, url: " + url);
        }
        this.ghostClientMap = ghostClientMap;
    }
    ...
}

在多个连接的模式下,消费端可以通过connections属性配置,不配置该属性则默认共享一个client(详见:Dubbo连接控制),与sharedClient模式同理,在每个rpc接口类所对应的invoker对象中,会有一个数组用于保存多个client,在调用时轮流使用,与共享模式不同的是DubboProtocol只会负责帮助生成这些client,并不会保存,如下图所示:

service reference multiClient.png

DubboProtocol类中的实现如下所示:

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {
    ...
    /**
     * Create new connection
     */
    private ExchangeClient initClient(URL url) {

        // client type setting.
        String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));

        url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
        // enable heartbeat by default
        url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));

        // BIO is not allowed since it has severe performance issue.
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            /**懒加载属性lazy,默认为false**/
            // connection should be lazy
            if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
                /**配置懒加载时,只有在调用时才能建立连接**/
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                /**与netty server建立连接**/
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }
    ...
}
2.4返回代理对象以便服务调用时使用

ReferenceConfig类中通过refprotocol.refer获取到invoker对象后,执行(T) proxyFactory.getProxy(invoker)返回代理对象,这里以JDK的动态代理为例

/**
 * JavaassistRpcProxyFactory
 */
public class JdkProxyFactory extends AbstractProxyFactory {

    @Override
    @SuppressWarnings("unchecked")
    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        /**InvokerInvocationHandler继承jdk自带的InvocationHandler实现动态代理**/
        return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
    }
    ...
}

当调用rpc接口类的方法时,就会被拦截到InvokerInvocationHandler中处理,执行invoker.invoke(new RpcInvocation(method, args)).recreate()

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {

    private final Invoker<?> invoker;

    public InvokerInvocationHandler(Invoker<?> handler) {
        this.invoker = handler;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ... 
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

3.Dubbo服务调用

整个服务调用可以划分为四步:

  • 消费者封装方法、入参,选择远程服务提供方ip、port发起网络请求
  • 生产者接收到消费者发送的请求,根据发送过来的调用方法、入参匹配到接口实现,执行调用并获取结果
  • 生产者将返回结果回送给消费者
  • 消费者接收到调用结果并将结果返回给业务代码


    服务调用示意图.png
3.1消费者发起方法调用

消费者发起方法调用流程大致如下:

consumer invoke时序图.png

在服务引用阶段Dubbo为每个rpc接口返回了对应的代理类,服务调用时所有的方法都会被拦截到代理类中执行invoker.invoke,其中invokerMockClusterInvoker,在服务引用阶段ReferenceConfig类中调用cluster.join返回的

/**
 * InvokerHandler
 */
public class InvokerInvocationHandler implements InvocationHandler {

    ...
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        ...
        /**当业务代码调用接口的方法时触发,method为方法名,args为入参,args以数组的方式记录,多个入参时从左往右依次记录在数组中**/
        return invoker.invoke(new RpcInvocation(method, args)).recreate();
    }

}

MockClusterInvokerFailoverClusterInvoker这两个阶段会选择出执行调用的invoker对象,整个阶段称为集群容错的过程,包括根据路由策略过滤invoker,根据不同的负载均衡策略选择出要调用的invoker,面对调用失败的场景进行特殊处理(例如:重试、快速失败等)。选择出要执行调用的invoker对象后,会进入到Filter的调用链,因为在服务引用阶段生成invoker时,对该invoker对象使用Filter进行包装。

/**
 * ListenerProtocol
 */
public class ProtocolFilterWrapper implements Protocol {

    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (!filters.isEmpty()) {
            for (int i = filters.size() - 1; i >= 0; i--) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    ...
                    @Override
                    public Result invoke(Invocation invocation) throws RpcException {
                        /**依次调用filter的invoke方法**/
                        return filter.invoke(next, invocation);
                    }
                    ...
                };
            }
        }
        return last;
    }
}

完成Filter过滤器链的调用后,最终到DubboInvoker执行远程调用逻辑,在服务引用阶段invoker对象中会保存初始化成功的client对象,在调用时直接使用以发送网络请求。其中index属性主要用来在多连接的场景下计数,每次调用doInvoke都会增加1,使用client时会根据index值与连接数量取模来选取以达到每个client能被均衡使用到的目的。调用分为三类:

  • 调用不需要服务提供者返回结果:直接返回一个空的RpcResult对象
  • 异步调用返回结果:在请求上下文中放置一个Future对象后返回一个空的RpcResult对象,用户使用RpcContext.getContext().getFuture就能获取Future对象,使用Future对象提供的get方法就能阻塞当前线程直到结果返回
  • 同步调用返回结果,框架主动调用Future对象提供的get方法进行阻塞,直到服务提供方结果返回

同步与异步调用的不同之处在于是框架自身调用Future对象提供的get方法还是由业务代码控制调用,当由业务代码来控制阻塞的时候就能够在结果返回之前执行与本次rpc调用无关的代码逻辑以达到异步化的目的。

/**
 * DubboInvoker
 */
public class DubboInvoker<T> extends AbstractInvoker<T> {

    /**服务引用时生成的client保存在该属性中以便在调用时使用**/
    private final ExchangeClient[] clients;

    /**使用AtomicPositiveInteger类保证并发安全,每次调用doInvoke时增加1**/
    private final AtomicPositiveInteger index = new AtomicPositiveInteger();
    ...

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            /**拥有多个client连接时,每次调用时index增加1取模client总数量获取client连接,达到每个client被轮流使用到的效果**/
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            /**消费者调用时不需要生产者返回**/
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                /**异步调用,直接返回RpcResult,由用户使用RpcContext.getContext().getFuture获取future对象来阻塞**/
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                /**同步调用使用get方法阻塞直到结果返回**/
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    ...
}
3.2生产者接收网络请求,执行真正的接口调用

DubboProtocol中的requestHandler会处理消费者发送过来的RpcInvocation对象,根据ip:port获取exporterMap中的DubboExporter对象(服务暴露阶段存储的),再根据exporter对象获取到invoker,对invoker进行调用。

/**
 * dubbo protocol support.
 */
public class DubboProtocol extends AbstractProtocol {
    ...
    private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {

        @Override
        public Object reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                Invocation inv = (Invocation) message;
               /**获取服务暴露阶段创建的invoker**/
                Invoker<?> invoker = getInvoker(channel, inv);
                ...
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                return invoker.invoke(inv);
            }
            throw new RemotingException(channel, "Unsupported request: "
                    + (message == null ? null : (message.getClass().getName() + ": " + message))
                    + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
        }

    };
    ...
    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        ...
        /**生成serviceKey,ip:port**/
        String serviceKey = serviceKey(port, path, inv.getAttachments().get(Constants.VERSION_KEY), inv.getAttachments().get(Constants.GROUP_KEY));
        /**根据serviceKey获取服务暴露阶段存储的exporter**/
        DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

        if (exporter == null)
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + inv);

        /**获取invoker**/
        return exporter.getInvoker();
    }
    ...

}

调用时序图如下所示:

provider invoke.png

DubboProtocol调用invoker提供的invoke方法后会依次执行Filter调用链中的invoke方法,因为该invoker在服务暴露阶段时被Filter调用链所包装,完成Filter中的调用后会进入之前生成好的JdkProxyFactory代理工厂,完成最终的DemoServiceImpl的调用并获取结果。

/**
 * JavaassistRpcProxyFactory
 */
public class JdkProxyFactory extends AbstractProxyFactory {
    ...
    @Override
    public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
        return new AbstractProxyInvoker<T>(proxy, type, url) {
            @Override
            protected Object doInvoke(T proxy, String methodName,
                                      Class<?>[] parameterTypes,
                                      Object[] arguments) throws Throwable {
                Method method = proxy.getClass().getMethod(methodName, parameterTypes);
                /**调用DemoServiceImpl完成最终的调用**/
                return method.invoke(proxy, arguments);
            }
        };
    }

}

3.3生产者将返回结果回送给消费者

获取到DemoServiceImpl的执行结果后,在HeaderExchangeHandler类中通过调用channel.send将结果返回给consumer

/**
 * ExchangeReceiver
 */
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
    ...
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
        ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
        try {
            if (message instanceof Request) {
                // handle request.
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        /**将获取到的结果返回给consumer**/
                        channel.send(response);
                    } else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            }
            ...
        } finally {
            HeaderExchangeChannel.removeChannelIfDisconnected(channel);
        }
    }
}
3.4消费者接收到调用结果并将结果返回给业务代码

该过程与向服务提供方发送请求时的顺序相反,时序图如下:


consumer invoker return.png

4.总结

Dubbo服务引用的过程就是为消费者生成代理类的过程,在服务调用阶段所有对方法的调用会直接作用于代理类,代理类帮助完成请求的封装、网络请求的发送、结果的接收等等,其中invoker作为实现这些操作的桥梁,所有的调用最终都是作用于invoker对象上的,接口类、协议与远程地址这三个维度共同决定一个invoker对象。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,755评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,369评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,799评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,910评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,096评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,159评论 3 411
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,917评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,360评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,673评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,814评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,509评论 4 334
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,156评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,882评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,123评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,641评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,728评论 2 351