Dubbo 单注册中心和多注册中心下的Invoker

开篇

  • 这篇文章的目的主要是阐述单注册中心和多注册场景下Invoker的初始化以及调用过程。


服务引用过程

public class ReferenceConfig<T> extends AbstractReferenceConfig {

    private T createProxy(Map<String, String> map) {
        URL tmpUrl = new URL("temp", "localhost", 0, map);

        // 省略代码

        if (isJvmRefer) {
          // 省略代码
        } else {
            if (url != null && url.length() > 0) { 
                // 省略代码,忽略处理直连逻辑
            } else { // 处理非直连来自注册中心的场景
                List<URL> us = loadRegistries(false);
                if (us != null && !us.isEmpty()) {
                    for (URL u : us) {
                        URL monitorUrl = loadMonitor(u);
                        if (monitorUrl != null) {
                            map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                        }
                        urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
                    }
                }
            }

            if (urls.size() == 1) {
                // 单注册中心的情况
                invoker = refprotocol.refer(interfaceClass, urls.get(0));
            } else {
                // 多注册中心的情况
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {
                    invokers.add(refprotocol.refer(interfaceClass, url));
                    if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { // registry url is available
                    // use AvailableCluster only when register's cluster is available
                    URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
                    invoker = cluster.join(new StaticDirectory(u, invokers));
                } else { // not a registry url
                    invoker = cluster.join(new StaticDirectory(invokers));
                }
            }
        }

        // 省略相关代码

        // create service proxy
        return (T) proxyFactory.getProxy(invoker);
    }
}
  • ReferenceConfig#createProxy内部根据注册中心的个数分别走不同的分支逻辑。
  • 单注册中心情况下:urls.size() == 1,通过refprotocol.refer(interfaceClass, urls.get(0))来实现refer过程。
  • 多注册中心情况下:通过refprotocol.refer(interfaceClass, url)来实现单个注册中心下的服务引用,通过invoker = cluster.join(new StaticDirectory(u, invokers))来实现不同注册中心invoker的聚合。
  • 多注册中心和单注册中心的区别在于多注册中心存在二次聚合的join逻辑


单注册中心下的ClusterInvoker

单注册中心
  • 单注册中心下每个Interface对应一个MockClusterInvoker,MockClusterInvoker包含FailoverClusterInvoker。
  • MockClusterInvoker可以理解为FailoverClusterInvoker的装饰类。
  • 每个FailoverClusterInvoker包含一个RegistryDirectory,RegistryDirectory的维度 = Interface + 注册中心,也就是每个注册中心+每个Interface=一个RegistryDirectory。


多注册中心下的ClusterInvoker

多注册中心
  • 多注册中心下的ClusterInvoker是每个注册中心下的ClusterInvoker的聚合。
  • 外层聚合的ClusterInvoker为AvailableCluster。
  • AvailableCluster的内部包含StaticDirectory,StaticDirectory包含所有注册中心下的MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker),通过上图可以看到包含注册中心[127.0.0.1:2181]和注册中心[127.0.0.1:2182]下的Invoker对象。


AvailableCluster的Invoker的生成代码
invoker = cluster.join(new StaticDirectory(u, invokers));

public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new AbstractClusterInvoker<T>(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
    }
}


public class StaticDirectory<T> extends AbstractDirectory<T> {

    private final List<Invoker<T>> invokers;
    public StaticDirectory(List<Invoker<T>> invokers) {
        this(null, invokers, null);
    }

    public StaticDirectory(List<Invoker<T>> invokers, List<Router> routers) {
        this(null, invokers, routers);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers) {
        this(url, invokers, null);
    }

    public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
        super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);
        if (invokers == null || invokers.isEmpty())
            throw new IllegalArgumentException("invokers == null");
        this.invokers = invokers;
    }
}
  • AvailableCluster的join方法返回的是AbstractClusterInvoker对象,参数为StaticDirectory对象。
  • StaticDirectory包含了Interface在每个注册中心下的invoker对象,即MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker)。


多注册中心查找调用关系

public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    protected final Directory<T> directory;
    protected final boolean availablecheck;

    public AbstractClusterInvoker(Directory<T> directory) {
        this(directory, directory.getUrl());
    }

    public AbstractClusterInvoker(Directory<T> directory, URL url) {
        if (directory == null)
            throw new IllegalArgumentException("service directory == null");

        this.directory = directory;
        this.availablecheck = url.getParameter(Constants.CLUSTER_AVAILABLE_CHECK_KEY, Constants.DEFAULT_CLUSTER_AVAILABLE_CHECK);
    }

    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // binding attachments into invocation.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        // 查找invokers
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 执行doInvoke动作
        return doInvoke(invocation, invokers, loadbalance);
    }

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }
}
  • AbstractClusterInvoker作为ClusterInvoker的基类,AbstractClusterInvoker#invoker封装了调用过程的逻辑,包括查找invoker过程和执行invoker过程。
  • List<Invoker<T>> invokers = list(invocation)表示查找invoker过程,list方法内部通过doList返回对应的invoker对象,然后通过路由进行一层过滤
  • doInvoke(invocation, invokers, loadbalance)表示invoker的执行过程。
  • 多注册中心场景下dubbo的调用关系分为两级查找,先从StaticDirectory当中查找,再从RegistryDirectory当中查找。


StaticDirectory查找和调用

public class StaticDirectory<T> extends AbstractDirectory<T> {

    private final List<Invoker<T>> invokers;

    public StaticDirectory(URL url, List<Invoker<T>> invokers, List<Router> routers) {
        super(url == null && invokers != null && !invokers.isEmpty() ? invokers.get(0).getUrl() : url, routers);

        this.invokers = invokers;
    }

    @Override
    public boolean isAvailable() {
        if (isDestroyed()) {
            return false;
        }
        for (Invoker<T> invoker : invokers) {
            if (invoker.isAvailable()) {
                return true;
            }
        }
        return false;
    }

    @Override
    protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
        return invokers;
    }

}
  • StaticDirectory#doList内部只是简单的返回invokers对象。
  • 通过上面的分析得知StaticDirectory的invokers包含Interface在所有注册中心下的MockClusterInvoker对象(MockClusterInvoker内部包含FailoverClusterInvoker),包含注册中心[127.0.0.1:2181]和注册中心[127.0.0.1:2182]下的FailoverClusterInvoker对象。
  • StaticDirectory的查找是以注册中心为维度进行查找的,返回Interface在单个注册中心下的FailoverClusterInvoker对象

public class AvailableCluster implements Cluster {

    public static final String NAME = "available";

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {

        return new AbstractClusterInvoker<T>(directory) {
            @Override
            public Result doInvoke(Invocation invocation, List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
                for (Invoker<T> invoker : invokers) {
                    if (invoker.isAvailable()) {
                        return invoker.invoke(invocation);
                    }
                }
                throw new RpcException("No provider available in " + invokers);
            }
        };
    }
}
  • AvailableCluster#doInvoke先执行FailoverClusterInvoker#isAvailable方法判断可用性。
  • AvailableCluster#doInvoke再执行FailoverClusterInvoker#invoke方法传递调用。


RegistryDirectory查找和调用

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
    @Override
    public List<Invoker<T>> doList(Invocation invocation) {

        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
}
  • RegistryDirectory中返回的是Interface在某个注册中心下的所有invoker对象。
  • RegistryDirectory#invoker代表的是MockClusterInvoker,包含FailoverClusterInvoker。
  • RegistryDirectory查找是以单个注册中心为维度查找,查找该Interface在单个注册中心下所有provider对应的invoker对象。


public abstract class AbstractClusterInvoker<T> implements Invoker<T> {

    @Override
    public Result invoke(final Invocation invocation) throws RpcException {
        checkWhetherDestroyed();
        LoadBalance loadbalance = null;

        // binding attachments into invocation.
        Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();
        if (contextAttachments != null && contextAttachments.size() != 0) {
            ((RpcInvocation) invocation).addAttachments(contextAttachments);
        }
        // list查找invokers
        List<Invoker<T>> invokers = list(invocation);
        if (invokers != null && !invokers.isEmpty()) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        // 执行invoke调用
        return doInvoke(invocation, invokers, loadbalance);
    }

    protected abstract Result doInvoke(Invocation invocation, List<Invoker<T>> invokers,
                                       LoadBalance loadbalance) throws RpcException;

    protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
        // directory表示RegistryDirectory
        List<Invoker<T>> invokers = directory.list(invocation);
        return invokers;
    }
}
  • AbstractClusterInvoker#list返回该Interface在某个注册中心下所有provider对应的invoker对象。
  • List<Invoker<T>> invokers = directory.list(invocation)执行的就是该查找过程。


RegistryDirectory

public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {

    private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
    private static final RouterFactory routerFactory = ExtensionLoader.getExtensionLoader(RouterFactory.class).getAdaptiveExtension();
    private static final ConfiguratorFactory configuratorFactory = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class).getAdaptiveExtension();
    private final String serviceKey; // Initialization at construction time, assertion not null
    private final Class<T> serviceType; // Initialization at construction time, assertion not null
    private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
    private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
    private final String[] serviceMethods;
    private final boolean multiGroup;
    private Protocol protocol; // Initialization at the time of injection, the assertion is not null
    private Registry registry; // Initialization at the time of injection, the assertion is not null
    private volatile boolean forbidden = false;
    private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
    private volatile URL registeredConsumerUrl;
    private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
    private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
    private volatile Map<String, List<Invoker<T>>> methodInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference

    // Set<invokerUrls> cache invokeUrls to invokers mapping.
    private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference

    public RegistryDirectory(Class<T> serviceType, URL url) {
        // serviceType表示com.alibaba.dubbo.demo.DemoService
        // zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.2&pid=62794&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.2%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D62794%26qos.port%3D33333%26register.ip%3D192.168.0.8%26side%3Dconsumer%26timestamp%3D1587823349524&timestamp=1587823349613
        super(url);
        this.serviceType = serviceType;
        this.serviceKey = url.getServiceKey();
        this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
        this.overrideDirectoryUrl = this.directoryUrl = url.setPath(url.getServiceInterface()).clearParameters().addParameters(queryMap).removeParameter(Constants.MONITOR_KEY);
        String group = directoryUrl.getParameter(Constants.GROUP_KEY, "");
        this.multiGroup = group != null && ("*".equals(group) || group.contains(","));
        String methods = queryMap.get(Constants.METHODS_KEY);
        this.serviceMethods = methods == null ? null : Constants.COMMA_SPLIT_PATTERN.split(methods);
    }

    public List<Invoker<T>> doList(Invocation invocation) {
        List<Invoker<T>> invokers = null;
        Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
        if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
            String methodName = RpcUtils.getMethodName(invocation);
            Object[] args = RpcUtils.getArguments(invocation);
            if (args != null && args.length > 0 && args[0] != null
                    && (args[0] instanceof String || args[0].getClass().isEnum())) {
                invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // The routing can be enumerated according to the first parameter
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(methodName);
            }
            if (invokers == null) {
                invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
            }
            if (invokers == null) {
                Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
                if (iterator.hasNext()) {
                    invokers = iterator.next();
                }
            }
        }
        return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
    }
}


public abstract class AbstractDirectory<T> implements Directory<T> {

    private final URL url;
    private volatile boolean destroyed = false;
    private volatile URL consumerUrl;
    private volatile List<Router> routers;

    public List<Invoker<T>> list(Invocation invocation) throws RpcException {
        if (destroyed) {
            throw new RpcException("Directory already destroyed .url: " + getUrl());
        }
        // doList由各子类实现具体的实现返回invokers
        List<Invoker<T>> invokers = doList(invocation);
        // 执行路由选择
        List<Router> localRouters = this.routers; // local reference
        if (localRouters != null && !localRouters.isEmpty()) {
            for (Router router : localRouters) {
                try {
                    if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                        invokers = router.route(invokers, getConsumerUrl(), invocation);
                    }
                } catch (Throwable t) {
                }
            }
        }
        return invokers;
    }
}
  • RegistryDirectory的含义需要特别解释下,希望重点关注了解下。
  • RegistryDirectory对象代表的是Inteface服务在某个注册中心的对象,包含了Interface在注册中心所有的refer对象(即对应provider的服务引用对象)。
  • Dubbo的整体设计是面向接口设计的,所有才会有Interface+注册中心唯一确定一个RegistryDirectory对象。
  • RegistryDirectory的参数当中Class<T> serviceType代表服务的Interface, URL url代表注册中心的注册地址,例子见上述代码中的注释。
  • AbstractDirectory#list内部执行查找和路由选择功能,所有通过AbstractDirectory#list的方法都会执行查询和路由选择步骤。


ClusterInvoker介绍

Invoker类图
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 前言 本文继续分析dubbo的cluster层,此层封装多个提供者的路由及负载均衡,并桥接注册中心,以Invoke...
    Java大生阅读 4,563评论 0 0
  • 先看官网两张图【引用来自官网】:image.png 官网说明: 1.首先 ServiceConfig 类拿到对外提...
    致虑阅读 5,333评论 1 4
  • 1.简介 为了避免单点故障,现在的应用通常至少会部署在两台服务器上。对于一些负载比较高的服务,会部署更多的服务器。...
    虫师_银古阅读 1,486评论 0 0
  • 本篇重点关注 Dubbo 服务引入的实现细节。 服务消费配置如下: 这里配置 reference 的 init=t...
    列苗_tech阅读 4,585评论 0 0
  • 陌生的一起乘车 为了工作 为了家庭 为了一切 车开动,去工作 车开动,回家 车开动,有希望
    大碴粥阅读 1,004评论 0 0

友情链接更多精彩内容