本系列主要参考官网文档、芋道源码的源码解读和《深入理解Apache Dubbo与实战》一书。Dubbo版本为2.6.1。
文章内容顺序:
1.服务引用的介绍
2.服务引用的入口方法getObject()=>createProxy()方法介绍3.本地引用
- 3.1是否为本地引用的判别方法,InjvmProtocol#isInjvmRefer
- 3.2createProxy()中的本地引用链路
- 3.3InjvmIvoker介绍,引出proxyFactory
4.proxyFactory扩展点
- 4.1proxyFactory的包装类StubProxyFactoryWrapper,本地存根的作用
- 4.2扩展类JavassistProxyFactory的getProxy(Invoker)
- 4.3Proxy 实例中传入了InvokerInvocationHandler类的意义
- 4.4JavassistProxyFactory生成的代码样例及作用
- 4.5服务引用存在哪?
5.远程引用
- 5.1 createProxy()中的远程引用链路
- 5.2只配置了一个注册中心的远程引用
- 5.3RegistryProtocol#refer()
- 5.4直连时的远程引用
- 5.5new DubboInvoker时的getClients(url)
- 5.6getClients(url)中的getSharedClient(url)
- 5.7getClients(url)中的initClient(url)
- 5.8initClient(url)中的自适应Exchangers#connect()
- 5.9多注册中心时链路
1.服务引用的介绍
来自官网的服务引用介绍:
- Dubbo 服务引用的时机有两个,第一个是在 Spring 容器调用 ReferenceBean 的
afterPropertiesSet
方法时引用服务,第二个是在 ReferenceBean对应的服务被注入到其他类中时引用。这两个引用服务的时机区别在于,第一个是饿汉式的,第二个是懒汉式的。- 默认情况下,Dubbo 使用懒汉式引用服务。如果需要使用饿汉式,可通过配置 <dubbo:reference> 的 init 属性开启。下面我们按照 Dubbo 默认配置进行分析,整个分析过程从 ReferenceBean 的
getObject
方法开始。当我们的服务被注入到其他类中时,Spring 会第一时间调用getObject
方法,并由该方法执行服务引用逻辑。- 按照惯例,在进行具体工作之前,需先进行配置检查与收集工作。接着根据收集到的信息决定服务用的方式,有三种,第一种是引用本地 (JVM) 服务,第二是通过直连方式引用远程服务,第三是通过注册中心引用远程服务。不管是哪种引用方式,最后都会得到一个 Invoker 实例。
- 如果有多个注册中心,多个服务提供者,这个时候会得到一组 Invoker 实例,此时需要通过集群管理类 Cluster 将多个 Invoker 合并成一个实例。合并后的 Invoker 实例已经具备调用本地或远程服务的能力了,但并不能将此实例暴露给用户使用,这会对用户业务代码造成侵入。此时框架还需要通过代理工厂类 (ProxyFactory) 为服务接口生成代理类,并让代理类去调用 Invoker 逻辑。避免了 Dubbo 框架代码对业务代码的侵入,同时也让框架更容易使用。
本图暂不考虑集群容错、网络调用、序列化反序列
2.服务引用的入口,getObject()=>createProxy()方法介绍
那么就从ReferenceBean的getObject
开始吧:
public Object getObject() throws Exception {
return get();
}
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("Already destroyed!");
}
// 检测 ref (即service对象)是否为空,为空则通过 init 方法创建
if (ref == null) {
// init 方法主要用于处理配置,以及调用 createProxy 生成代理类
init();
}
return ref;
}
注意这个ReferenceConfig#init()
,主要逻辑就是配置解析封装(实在太长就不贴了,而且解析配置不在讨论的点),该实例包含了事件通知配置,比如 onreturn、onthrow、oninvoke 等。并在最后调用 createProxy ()
创建代理对象。
从createProxy 我们也将开始真正的引用链路。
//map为 `side`,`dubbo`,`timestamp`,`pid`等 参数
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 是否本地引用
final boolean isJvmRefer;
// injvm 属性为空,不通过该属性判断
if (isInjvm() == null) {
// 直连服务提供者,参见文档《直连提供者》https://dubbo.gitbooks.io/dubbo-user-book/demos/explicit-target.html
if (url != null && url.length() > 0) { // if a url is specified, don't do local reference
isJvmRefer = false;
// 通过 `tmpUrl` 判断,是否需要本地引用
} else if (InjvmProtocol.getInjvmProtocol().isInjvmRefer(tmpUrl)) {
// by default, reference local service if there is
isJvmRefer = true;
// 默认不是
} else {
isJvmRefer = false;
}
// 通过 injvm 属性。
} else {
isJvmRefer = isInjvm();
}
// 本地引用
if (isJvmRefer) {
// 创建本地服务引用 URL 对象。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 引用服务,返回 Invoker 对象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
// 正常流程,一般为远程引用
} else {
// 定义直连地址,可以是服务提供者的地址,也可以是注册中心的地址
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
// 拆分地址成数组,使用 ";" 分隔。
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
// 循环数组,添加到 `url` 中。
if (us != null && us.length > 0) {
for (String u : us) {
// 创建 URL 对象
URL url = URL.valueOf(u);
// 设置默认路径
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
// 注册中心的地址,带上服务引用的配置参数
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
// 服务提供者的地址
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
// 注册中心
} else { // assemble URL from register center's configuration
// 加载注册中心 URL 数组
List<URL> us = loadRegistries(false);
// 循环数组,添加到 `url` 中。
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控中心 URL
URL monitorUrl = loadMonitor(u);
// 服务引用配置对象 `map`,带上监控中心的 URL
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 注册中心的地址,带上服务引用的配置参数
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map))); // 注册中心,带上服务引用的配置参数
}
}
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
// 单 `urls` 时,引用服务,返回 Invoker 对象
if (urls.size() == 1) {
// 引用服务
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {
// 循环 `urls` ,引用服务,返回 Invoker 对象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服务
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最后一个注册中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注册中心
if (registryURL != null) { // registry url is available
// 对有注册中心的 Cluster 只用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
// 无注册中心,全部都是服务直连
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
// 启动时检查
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true; // default true
}
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("Failed to check the status of the service " + interfaceName + ". No provider available for the service " + (group == null ? "" : group + "/") + interfaceName + (version == null ? "" : ":" + version) + " from the url " + invoker.getUrl() + " to the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion());
}
if (logger.isInfoEnabled()) {
logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
}
// 创建 Service 代理对象
// create service proxy
return (T) proxyFactory.getProxy(invoker);
}
3.createProxy()中的本地引用链路
3.1是否为本地引用的判别方法,InjvmProtocol#isInjvmRefer
同样的,服务引用也分为本地引用和远程引用,本地引用还是远程引用是从URL来辨别的,先从本地引用来讲起。
这边来简单介绍下他的判别方法InjvmProtocol#isInjvmRefer
,也比较简单
public boolean isInjvmRefer(URL url) {
final boolean isJvmRefer;
String scope = url.getParameter(Constants.SCOPE_KEY);
// Since injvm protocol is configured explicitly, we don't need to set any extra flag, use normal refer process.
// 当 `protocol = injvm` 时,本身已经是 jvm 协议了,走正常流程就是了。
if (Constants.LOCAL_PROTOCOL.toString().equals(url.getProtocol())) {
isJvmRefer = false;
// 当 `scope = local` 或者 `injvm = true` 时,本地引用
} else if (Constants.SCOPE_LOCAL.equals(scope) || (url.getParameter("injvm", false))) {
// if it's declared as local reference
// 'scope=local' is equivalent to 'injvm=true', injvm will be deprecated in the future release
isJvmRefer = true;
// 当 `scope = remote` 时,远程引用
} else if (Constants.SCOPE_REMOTE.equals(scope)) {
// it's declared as remote reference
isJvmRefer = false;
// 当 `generic = true` 时,即使用泛化调用,远程引用。
} else if (url.getParameter(Constants.GENERIC_KEY, false)) {
// generic invocation is not local reference
isJvmRefer = false;
// 当本地已经有该 Exporter 时,本地引用
} else if (getExporter(exporterMap, url) != null) {
// by default, go through local reference if there's the service exposed locally
isJvmRefer = true;
// 默认,远程引用
} else {
isJvmRefer = false;
}
return isJvmRefer;
}
}
上面的方法没什么好说的,都已经注释好啦。
createProxy()中的本地引用链路
这边我们先走本地引用的链路,再贴一下createProxy()
的部分代码如下,注意到在创建本地服务引用 URL 对象时,已经把Protocol设置成InjvmProtocol了,
// 本地引用
if (isJvmRefer) {
// 创建本地服务引用 URL 对象。
URL url = new URL(Constants.LOCAL_PROTOCOL, NetUtils.LOCALHOST, 0, interfaceClass.getName()).addParameters(map);
// 引用服务,返回 Invoker 对象
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
看到直接调用了Protocol#refer(interface, url)
,根据url获得对应 Protocol 拓展实现为 InjvmProtocol 。同样的还是SPI机制,调用链路是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => InjvmProtocol
,已经在SPI和服务暴露一文提过很多次了,包装类里export()
和refer()
的逻辑都差不多,就不再介绍。直接来看Protocol#refer(interface, url)
做了什么吧。
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
return new InjvmInvoker<T>(serviceType, url, url.getServiceKey(), exporterMap);
}
非常朴素的new了InjvmInvoker,在进去看看他干了什么。
3.3InjvmIvoker介绍,引出proxyFactory
/**
* Exporter 集合
*
* key: 服务键
*
* 该值实际就是 {@link com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap}
*/
private final Map<String, Exporter<?>> exporterMap;
InjvmInvoker(Class<T> type, URL url, String key, Map<String, Exporter<?>> exporterMap) {
super(type, url);
this.key = key;
this.exporterMap = exporterMap;
}
- 最后返回的就是这个InjvmIvoker,Invoker 是 Dubbo 的核心模型,代表一个可执行体,这个InjvmIvoker他将一些属性和本地的缓存聚合到一起形成了一个Invoker。
每个InjvmInvoker都会持有一个指向本地缓存的指针。- 拿到这个Invoker后,在
ReferenceConfig#createProxy
最后调用// 创建 Service 代理对象 // create service proxy return (T) proxyFactory.getProxy(invoker);
调用
ProxyFactory#getProxy(invoker)
方法,创建 Service 代理对象。这边的proxyFactory
同样也是扩展点,由传入的invoker.url来决定调用哪个。
顺带一提 Service 代理对象的内部,可以调用Invoker#invoke(Invocation)
方法,进行 Dubbo 服务的调用。
那么最后,getProxy()到底是怎么创建代理对象的呢?
4.proxyFactory扩展点
ProxyFactory同样也是个扩展类,有Stub包装类,还有两个扩展类实现,可以自由决定用哪个Factory。Invoker通过Javassist动态代理或者JDK动态代理,两者都是通过生成字节码实现的。
而技术的选型可以看下这张图作者的解释
4.1proxyFactory的包装类StubProxyFactoryWrapper
ProxyFactory#getProxy(invoker)`链路其实是这样的,StubProxyFactoryWrapper的作用就是生成本地存根。
在此之前我们先要知道本地存根的作用(也就是为什么要调用
StubProxyFactoryWrapper
):
远程服务后,客户端通常只剩下接口,而实现全在服务器端,但提供方有些时候想在客户端也执行部分逻辑,比如:做 ThreadLocal 缓存,提前验证参数,调用失败后伪造容错数据等等,此时就需要在 API 中带上 Stub,客户端生成 Proxy 实例,会把 Proxy 通过构造函数传给 Stub ,然后把 Stub 暴露给用户,Stub 可以决定要不要去调 Proxy。
说多了不如看一个简单的实例:Dubbo本地存根
StubProxyFactoryWrapper#getProxy(invoker)
代码如下
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
// 获得 Service Proxy 对象,这边调用的就是指定的proxyFactory了
T proxy = proxyFactory.getProxy(invoker);
if (GenericService.class != invoker.getInterface()) { // 非泛化引用
// 获得 `stub` 配置项
String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY));
if (ConfigUtils.isNotEmpty(stub)) {
Class<?> serviceType = invoker.getInterface();
// `stub = true` 的情况,使用接口 + `Stub` 字符串。
if (ConfigUtils.isDefault(stub)) {
if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) {
stub = serviceType.getName() + "Stub";
} else {
stub = serviceType.getName() + "Local";
}
}
try {
// 加载 Stub 类
Class<?> stubClass = ReflectUtils.forName(stub);
if (!serviceType.isAssignableFrom(stubClass)) {
throw new IllegalStateException("The stub implementation class " + stubClass.getName() + " not implement interface " + serviceType.getName());
}
try {
Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType);
// 创建 Stub 对象,使用带 Service Proxy 对象的构造方法
proxy = (T) constructor.newInstance(new Object[]{proxy});
// 【TODO 8033】参数回调
//export stub service
URL url = invoker.getUrl();
if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)) {
url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ","));
url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString());
try {
export(proxy, (Class) invoker.getInterface(), url);
} catch (Exception e) {
LOGGER.error("export a stub service error.", e);
}
}
} catch (NoSuchMethodException e) {
throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implementation class " + stubClass.getName(), e);
}
} catch (Throwable t) {
LOGGER.error("Failed to create stub implementation class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t);
// ignore
}
}
}
return proxy;
}
- 动态的用proxyFactory(默认是JavassistFactory)来获取Service Proxy 对象
- 获取配置中的stub 配置项,而后调用
ReflectUtils#forName(stub)
方法,加载我们自己写的 Stub 类,注意是加载哦,就是拿到Class,初始化对象是在下面完成的。- 之后创建 Stub 对象,使用带 Service Proxy 对象作为参数的构造方法。例如,
public DemoServiceStub(DemoService demoService)
。通过这样的方式,我们就拥有了一个内部有 Proxy Service 对象的 Stub 对象啦,可以实现各种 OOXX 啦。最后将这个Stub对象返回。- 再次提醒,所以我们有的,是这个Stub对象,可以用这个对象来对执行方法进行一些本地的AOP拦截
4.2扩展类JavassistProxyFactory的getProxy(Invoker)
StubProxyFactoryWrapper#getProxy(invoker)
第一行就调用了扩展类的getProxy(invoker)
既然默认实现是javassist,那么我们就来看看javassist都干了什么吧
先来看看他的父类,AbstractProxyFactory
,
public abstract class AbstractProxyFactory implements ProxyFactory {
public <T> T getProxy(Invoker<T> invoker) throws RpcException {
Class<?>[] interfaces = null;
// TODO 8022 芋艿
String config = invoker.getUrl().getParameter("interfaces");
if (config != null && config.length() > 0) {
String[] types = Constants.COMMA_SPLIT_PATTERN.split(config);
if (types != null && types.length > 0) {
interfaces = new Class<?>[types.length + 2];
interfaces[0] = invoker.getInterface();
interfaces[1] = EchoService.class;
for (int i = 0; i < types.length; i++) {
interfaces[i + 1] = ReflectUtils.forName(types[i]);
}
}
}
// 增加 EchoService 接口,用于回生测试。参见文档《回声测试》https://dubbo.gitbooks.io/dubbo-user-book/demos/echo-service.html
if (interfaces == null) {
interfaces = new Class<?>[]{invoker.getInterface(), EchoService.class};
}
return getProxy(invoker, interfaces);
}
public abstract <T> T getProxy(Invoker<T> invoker, Class<?>[] types);
}
可以看到,该抽象类,主要是实现了 #getProxy(invoker)
方法,获得需要生成代理的接口们,而后调用了我们的子类JavassistProxyFactory#getProxy(invoker, types)
。接着往下看
public class JavassistProxyFactory extends AbstractProxyFactory {
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
/// 生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
// TODO Wrapper类不能正确处理带$的类名
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
JavassistProxyFactory#getProxy(invoker, types)
return的时候生成 Proxy 子类(Proxy 是抽象类)。并调用 Proxy 子类的 newInstance 方法创建 Proxy 实例。注意:其中传入的参数是 InvokerInvocationHandler
类,通过这样的方式,让 proxy 和真正的逻辑代码解耦。
那我们来看看这个InvokerInvocationHandler
类到底是什么
4.3Proxy 实例中传入了InvokerInvocationHandler类
public class InvokerInvocationHandler implements InvocationHandler {
/**
* Invoker 对象
*/
private final Invoker<?> invoker;
public InvokerInvocationHandler(Invoker<?> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
// wait 等Object类的方法,直接反射调用
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
// 基础方法,不使用 RPC 调用
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RPC 调用
return invoker.invoke(new RpcInvocation(method, args)).recreate();
}
}
- 这个类就是拦截一些接口类调用的用途。将一些简单的方法本地调用,就不用浪费网络了通信了。
- 这里注意
InvokerInvocationHandler#invoke
return的调用invoker.invoke(new RpcInvocation(method, args)).recreate()
,如果消费者调用invoke的话,会在这里终于开始了网络调用
4.4JavassistProxyFactory生成的代码样例及作用
最终JavassistProxyFactory生成的代码样例如下:
package org.apache.dubbo.common.bytecode;
public class proxy0 implements org.apache.dubbo.demo.DemoService {
public static java.lang.reflect.Method[] methods;
private java.lang.reflect.InvocationHandler handler;
public proxy0() {
}
public proxy0(java.lang.reflect.InvocationHandler arg0) {
handler = $1;
}
public java.lang.String sayHello(java.lang.String arg0) {
Object[] args = new Object[1];
args[0] = ($w) $1;
Object ret = handler.invoke(this, methods[0], args);
return (java.lang.String) ret;
}
}
- 这个proxy实现了我们的业务接口方法,使得我们自己调用业务方法,比如
sayHello
的时候,可以进行这样的链路:
InvokerInvocationHandler对象(拦截基本方法,使其不进行网络调用,如toString()) => 网络调用
- 而使用方对此是没有感觉的,他只需要调用消费者的接口,Dubbo帮他实现了一切。
4.5服务引用存在哪?
别急,这就来
XML中的服务引用配置样例如下:<dubbo:reference interface="cn.com.wang.service.UserService" id="userService"> </dubbo:reference>
createProxy()
方法最终返回的一个Stub(如果实现了本地存根的话),getObject()
的返回值正是这个Stub,存储在ReferenceBean中。- ReferenceBean 有个特殊之处,实现了FactoryBean ,FactoryBean就是spring的工厂bean,工厂bean也就是说当我们要获取dubbo:reference中interface时也就是我们的UserService,我们会直接注入到使用类中,spring就得从容器中找。
- 也因为它是工厂bean,才会有上文我们提到的,调用FactoryBean的getObject()方法,这个方法返回的对象就会作为标签配置返回的对象。所以我们的引用,也就是最后返回Service 代理对象,的实际上是存在Spring容器里的。
至此,我们简单分析了本地引用和proxyFactory的动态代理以及本地存根的作用,接下来来看看远程引用与本地引用的区别吧。
5.远程调用
5.1 createProx()中的远程引入的链路
private T createProxy(Map<String, String> map) {
URL tmpUrl = new URL("temp", "localhost", 0, map);
// 【省略代码】是否本地引用
final boolean isJvmRefer;
if (isJvmRefer) {
// 【省略代码】本地引用
} else {// 远程引用
// url 不为空,表明用户可能想进行点对点调用
if (url != null && url.length() > 0) {
// 当需要配置多个 url 时,可用分号进行分割,这里会进行切分
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
// 设置接口全限定名为 url 路径
url = url.setPath(interfaceName);
}
// 检测 url 协议是否为 registry,若是,表明用户想使用指定的注册中心
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
// 将 map 转换为查询字符串,并作为 refer 参数的值添加到 url 中
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
// 合并 url,移除服务提供者的一些配置(这些配置来源于用户配置的 url 属性),
// 比如线程池相关配置。并保留服务提供者的部分配置,比如版本,group,时间戳等
// 最后将合并后的配置设置为 url 查询字符串中。
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else {
// url为空,加载注册中心 url,
//注意是这里开始将不同的注册中心循环添加到urls
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
// 加载监控中心 URL
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
// 服务引用配置对象 `map`,带上监控中心的 URL
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 添加 refer 参数到 url 中,并将 url 添加到 urls 中
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
// 未配置注册中心,抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference...");
}
}
// 单个注册中心或服务提供者(服务直连,下同)
if (urls.size() == 1) {
// 调用 RegistryProtocol 的 refer 构建 Invoker 实例
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多个注册中心或多个服务提供者,或者两者混合
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
// 获取所有的 Invoker
for (URL url : urls) {
// 通过 refprotocol 调用 refer 构建 Invoker,refprotocol 会在运行时
// 根据 url 协议头加载指定的 Protocol 实例,并调用实例的 refer 方法
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url;
}
}
if (registryURL != null) {
// 如果注册中心链接不为空,则将使用 AvailableCluster
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
// 创建 StaticDirectory 实例,并由 Cluster 对多个 Invoker 进行合并
invoker = cluster.join(new StaticDirectory(u, invokers));
} else {
invoker = cluster.join(new StaticDirectory(invokers));
}
}
}
Boolean c = check;
if (c == null && consumer != null) {
c = consumer.isCheck();
}
if (c == null) {
c = true;
}
// invoker 可用性检查
if (c && !invoker.isAvailable()) {
throw new IllegalStateException("No provider available for the service...");
}
// 生成代理类
return (T) proxyFactory.getProxy(invoker);
}
- 简单概括下远程引用的逻辑:读取直连配置项,或注册中心 url,并将读取到的 url 存储到 urls 中。然后根据 urls 元素数量进行后续操作。
- 若 urls 元素数量为1,则直接通过 Protocol 自适应拓展类构建 Invoker 实例接口。
- 若 urls 元素数量大于1,即存在多个注册中心或服务直连 url,此时先根据 url 构建 Invoker。然后再通过 Cluster 合并多个 Invoker,最后调用 ProxyFactory 生成代理类。
- 对于上述的判断,可以看下多注册中心官网的配置样例来辅助理解。多注册中心
Invoker 的构建过程以及代理类的过程比较重要,就是refprotocol.refer(type, url)
这一方法的调用链路。
5.2只配置了一个注册中心的远程引用:
截取上面的方法部分如下:
// 单个注册中心或服务提供者
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多个注册中心或多个服务提供者,或者两者混合
} else {
//…………
}
在上述方法中,如果只配置了一个注册中心的话,urls属性样例如下(配置了直连会有不同,下面会介绍)
registry://224.5.6.7:1234/com.alibaba.dubbo.registry.RegistryService?application=demo-consumer&dubbo=2.0.0&pid=26540&qos.port=33333&refer=application%3Ddemo-consumer%26check%3Dfalse%26dubbo%3D2.0.0%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D26540%26qos.port%3D33333%26register.ip%3D192.168.1.102%26side%3Dconsumer%26timestamp%3D1594622397172®istry=multicast×tamp=1594622397254
调用
refprotocol.refer(type, url)
方法,引用服务,返回 Invoker 对象。又到了我们喜闻乐见的SPI自适应特性,自动根据 URL 参数,获得对应的拓展实现。
在这里我们根据url可以得到调用的是RegistryProtocol
,#refer(...)
方法的调用顺序是:
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => RegistryProtocol
与服务暴露时类似,ProtocolFilterWrapper 用于创建过滤链,
ProtocolListenerWrapper
返回了一个ListenerInvokerWrapper
对象,ListenerInvokerWrapper
装饰invoker, 在构造器中遍历listeners构建referer的监听链,这两个类都会放行RegistryProtocol。也就是什么都不做。
5.35.3RegistryProtocol#refer()
直接来看RegistryProtocol的refer(type, url)做了什么吧
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 获得真实的注册中心的 URL
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
// 获得注册中心
Registry registry = registryFactory.getRegistry(url);
// TODO 芋艿
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// 将 url 查询字符串转为 Map,获得服务引用配置参数集合
// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
// 分组聚合,参见文档 http://dubbo.io/books/dubbo-user-book/demos/group-merger.html
if (group != null && group.length() > 0) {
if ((Constants.COMMA_SPLIT_PATTERN.split(group)).length > 1
|| "*".equals(group)) {
// 通过 SPI 加载 MergeableCluster 实例,并调用 doRefer 继续执行服务引用逻辑
return doRefer(getMergeableCluster(), registry, type, url);
}
}
// 执行服务引用
return doRefer(cluster, registry, type, url);
}
上面代码首先为 url 设置协议头,然后根据 url 参数从registryFactory加载注册中心实例,如果用的是zk的协议,那么注册器就是zk的注册器。然后获取 group 配置,根据 group 配置决定
doRefer()
第一个参数的类型。这里的重点是doRefer()
方法
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
// 创建 RegistryDirectory 对象,并设置注册中心和协议
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// 服务引用配置集合
Map<String, String> parameters = new HashMap<String, String>(directory.getUrl().getParameters());
// 生成服务消费者链接
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
// 向注册中心注册自己(服务消费者)
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 向注册中心订阅服务提供者 + 路由规则 + 配置规则
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 创建 Invoker 对象
Invoker invoker = cluster.join(directory);
// 向本地注册表,注册消费者
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
- doRefer 方法创建一个
RegistryDirectory
实例,然后生成服务者消费者链接,并向注册中心进行注册。- 注册完毕后,紧接着订阅 providers、configurators、routers 等节点下的数据。完成订阅后,RegistryDirectory 会收到这几个节点下的子节点信息。
- 由于一个服务可能部署在多台服务器上,这样就会在 providers 产生多个节点,这个时候就需要 Cluster 将多个服务节点合并为一个,并生成一个 Invoker。
注意这个cluster,他也是通过SPI机制依赖注入来的,(关于Cluster,RegistryDirectory的内容本篇暂不讨论,等我下一篇再说! )
5.45.4直连时的远程引用
上面讲的是没有配置直连的情况,这次我们改一下配置,将引用改成直连看看会咋样。
<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" url="dubbo://localhost:20880" />
同样是到这一行代码,
// 单个注册中心或服务提供者
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
// 多个注册中心或多个服务提供者,或者两者混合
} else {
//…………
}
此时的urls属性如下
dubbo://localhost:20880/com.alibaba.dubbo.demo.DemoService?application=demo-consumer&check=false&dubbo=2.0.0&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2144&qos.port=33333®ister.ip=192.168.1.102&side=consumer×tamp=1594651670644
注意到头部已经被换成dubbo了,与上文的registry类似,这边就是调用DubboProtocol来进行引用了,链路如下
Protocol$Adaptive => ProtocolFilterWrapper => ProtocolListenerWrapper => DubboProtocol
ProtocolFilterWrapper
创建好过滤器链,ProtocolListenerWrapper
开启监听后,就开始调用DubboProtocol#refer()
了,来看一下这个方法做了什么吧
5.5DubboProtocol#refer()
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// 初始化序列化优化器
optimizeSerialization(url);
// 获得远程通信客户端数组
// 创建 DubboInvoker 对象
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
// 添加到 `invokers`
invokers.add(invoker);
return invoker;
}
上面方法看起来比较简单,不过这里有一个调用需要我们注意一下,即
getClients(url)
。这个方法用于获取客户端实例,实例类型为ExchangeClient
。ExchangeClient
实际上并不具备通信能力,它需要基于更底层的客户端实例进行通信。比如NettyClient
、MinaClient
等,默认情况下,Dubbo 使用NettyClient
进行通信。接下来,我们简单看一下getClients()
方法的逻辑。
5.5new DubboInvoker时的getClients(url)
private ExchangeClient[] getClients(URL url) {
// 是否共享连接
boolean service_share_connect = false;
// 获取连接数,默认为0,表示未配置
int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0);
// 如果未配置 connections,则共享连接
if (connections == 0) {
service_share_connect = true;
connections = 1;
}
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (service_share_connect) {
// 获取共享客户端
clients[i] = getSharedClient(url);
} else {
// 初始化新的客户端
clients[i] = initClient(url);
}
}
return clients;
}
这里根据 connections 数量决定是获取共享客户端还是创建新的客户端实例,默认情况下,使用共享客户端实例。
getSharedClient
方法中也会调用initClient 方法
,因此下面我们一起看一下这两个方法,看看他是怎么共享,怎么初始化的。
5.6getClients(url)中的getSharedClient(url)
private ExchangeClient getSharedClient(URL url) {
// 从集合中,查找 ReferenceCountExchangeClient 对象
String key = url.getAddress();
ReferenceCountExchangeClient client = referenceClientMap.get(key);
if (client != null) {
// 若未关闭,增加指向该 Client 的数量,并返回它
if (!client.isClosed()) {
client.incrementAndGetCount();
return client;
// 若已关闭,移除
} else {
referenceClientMap.remove(key);
}
}
// 同步,创建 ExchangeClient 对象。
synchronized (key.intern()) {
// 创建 ExchangeClient 对象
ExchangeClient exchangeClient = initClient(url);
// 将 `exchangeClient` 包装,创建 ReferenceCountExchangeClient 对象
client = new ReferenceCountExchangeClient(exchangeClient, ghostClientMap);
// 添加到集合
referenceClientMap.put(key, client);
// 从 `ghostClientMap`移除
ghostClientMap.remove(key);
return client;
}
}
- 上面方法先访问缓存,若缓存未命中,则通过
initClient
方法创建新的 ExchangeClient 实例,并将该实例传给ReferenceCountExchangeClient
构造方法创建一个带有引用计数功能的ExchangeClient
实例。ReferenceCountExchangeClient
类内部使用引用计数的方式记录共享的数量。
ghostClientMap
,这个名字说实话确实有点难理解,这个实际上是一个存储LazyConnectExchangeClient
的集合。
这个时候我们就要理解LazyConnectExchangeClient
的作用,当服务引用时,我们并不想此时就是开始通信,而是在调用的时候再与服务端通信,LazyConnectExchangeClient
就像是一个缓存,在服务调用的时候才会创建真正的Client去连接,节省了资源.- LazyConnectExchangeClient 在每次数据传输前,先判断tcp连接状态,若连接断开则先执行connect建立连接。
5.7getClients(url)中的initClient(url)
在DubboProtocol#getSharedClient()
也也调用了DubboProtocol#initClient()
,看来是绕不过这方法了,来继续看他的代码。
private ExchangeClient initClient(URL url) {
// 获取客户端类型,默认为 netty
String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT));
// 校验 Client 的 Dubbo SPI 拓展是否存在,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
// 设置编解码器为 Dubbo ,即 DubboCountCodec
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
// 默认开启 heartbeat
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 连接服务器,创建客户端
ExchangeClient client;
try {
// 懒连接,创建 LazyConnectExchangeClient 对象
if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
// 直接连接,创建 HeaderExchangeClient 对象
} else {
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
- DubboProtocol的
requestHandler
是ExchangeHandler
的实现,是remoting层接收数据后的回调。只定义了一个回复请求结果的方法,返回的是请求结果initClient()
方法首先获取用户配置的客户端类型,默认为netty
。然后检测用户配置的客户端类型是否存在,不存在则抛出异常。最后根据 lazy 配置决定创建什么类型的客户端。这里的LazyConnectExchangeClient
代码并不是很复杂,该类会在 request 方法被调用时通过Exchangers#connect
方法创建 ExchangeClient 客户端,该类的代码本节就不分析了。下面我们分析一下Exchangers#connect()
方法。
5.8initClient(url)中的自适应Exchangers#connect()
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger 实例,默认为 HeaderExchangeClient
return getExchanger(url).connect(url, handler);
Exchangers#getExchanger
会通过 SPI 加载HeaderExchanger
实例(在Dubbo中只有这一个实例),来就来看看HeaderExchanger#connect()
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
// 这里包含了多个调用,分别如下:
// 1. 创建 HeaderExchangeHandler 对象
// 2. 创建 DecodeHandler 对象
// 3. 通过 Transporters 构建 Client 实例
// 4. 创建 HeaderExchangeClient 对象
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
我们这里重点看一下
Transporters#connect
方法
public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
ChannelHandler handler;
if (handlers == null || handlers.length == 0) {
handler = new ChannelHandlerAdapter();
} else if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handler 数量大于1,则创建一个 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取 Transporter 自适应拓展类,并调用 connect 方法生成 Client 实例
return getTransporter().connect(url, handler);
}
如上,
getTransporter()
方法返回的是自适应拓展类,该类会在运行时根据客户端类型加载指定的Transporter
实现类。若用户未配置客户端类型,则默认加载NettyTransporter
,并调用该类的connect()
方法。如下:
public Client connect(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyClient 对象
return new NettyClient(url, listener);
}
到这里就不继续跟下去了,在往下就是通过 Netty 提供的 API 构建 Netty 客户端了。
5.9多注册中心时链路
else {
// 循环 `urls` ,引用服务,返回 Invoker 对象
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 引用服务
invokers.add(refprotocol.refer(interfaceClass, url));
// 使用最后一个注册中心的 URL
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
registryURL = url; // use last registry url
}
}
// 有注册中心
if (registryURL != null) { // registry url is available
// 对有注册中心的 Cluster 只用 AvailableCluster
// use AvailableCluster only when register's cluster is available
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
// 无注册中心,全部都是服务直连
} else { // not a registry url
invoker = cluster.join(new StaticDirectory(invokers));
}
}
可以看到多注册中心的时候多了一步 cluster.join(),其他与上文分析的单注册中心并无区别,仅仅是Cluster 将多个服务节点合并为一个,并生成一个 Invoker,这个我们留待下面章节来主要分析Cluster。
至此,我们的DubboProcotol#refer()也分析完毕告一段落,我们知道了他里面创建了一个DubboInvoker,其中有我们的Clients实例,基于此,我们才可以通过它来进行远程调用了。