本篇重点关注 Dubbo 服务引入的实现细节。
服务消费配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="wlm" />
<dubbo:registry protocol="zookeeper" address="127.0.0.1:2181"/>
<dubbo:protocol name="dubbo" port="20880" />
<dubbo:reference protocol="dubbo" id="helloWorld" version="1.0"
interface="com.wlm.dubbo.service.HelloWorld" check="false"
init="true" timeout="600000" />
</beans>
这里配置 reference 的 init=true,这样在项目启动的时候就会执行服务引入逻辑。
服务引入入口
前面介绍服务导出时说过,Dubbo 以 Spring 方式启动时,标签属性的解析都由 DubboBeanDefinitionParser 完成,并由 Spring 容器完成实例对象的创建、初始化,最终得到对应 bean 的实例对象,Dubbo 服务引入对应的 bean 为 ReferenceBean。
ReferenceBean 实现了 InitializingBean 接口,因此在 bean 初始化时会被调用 afterPropertiesSet() 方法。ReferenceBean 的实现中,首先针对各类型的配置,判断未配置时设置默认值,比如:consumer、application、module、monitor 等,最后判断如果设置了 init=true 属性,则直接执行服务引入流程,否则在实际使用到时再执行:
getObject() 是 FactoryBean 接口的方法,调用它可以获取这个 bean 的实例,内部实现如下:
checkAndUpdateSubConfigs() 方法内部对相关的配置进行检查和覆盖;再判断 ref 属性是否为 null,也就是是否执行过服务引入流程,否则调用 init() 方法进行服务引入。
服务引入整体的时序图如下:
init() 方法内部的逻辑主要分为两部分:组装属性和创建服务代理。下面分别看看这两部分的实现。
组装属性
组装的属性有:dubboVersion、side、interface、pid、timestamp 等,同时还调用 appendParameters 方法将 application、consumer 等标签配置的属性设置到 map 对象中:
appendParameters 方法用到第地方比较多,这里大概讲一下逻辑。
调用 appendParameters 方法时,将各标签对应的 config 对象传入,比如 ApplicationConfig、ConsumerConfig,appendParameters 内部通过反射获取类的所有 public 方法,分为两种情况处理:
1.通过 MethodUtils.isGetter 判断某个方法是否为 getter 类型:
public static boolean isGetter(Method method) {
String name = method.getName();
return (name.startsWith("get") || name.startsWith("is"))
&& !"get".equals(name) && !"is".equals(name)
&& !"getClass".equals(name) && !"getObject".equals(name)
&& Modifier.isPublic(method.getModifiers())
&& method.getParameterTypes().length == 0
&& ClassUtils.isPrimitive(method.getReturnType());
}
这里要注意的是,判断 getter 类型条件里有一个:方法返回类型必须为 “原型”,而 Dubbo 对原型的定义和 Java 中不太一样:
public static boolean isPrimitive(Class<?> type) {
return type.isPrimitive()
|| type == String.class
|| type == Character.class
|| type == Boolean.class
|| type == Byte.class
|| type == Short.class
|| type == Integer.class
|| type == Long.class
|| type == Float.class
|| type == Double.class
|| type == Object.class;
}
接下来就是从 config 对象中获取属性值,并设置到 parameters 的过程,实现如下:
这里主要逻辑为:
- 方法返回类型为 Object,或者getter 方法设置的 @Parameter 注解属性 excluded=true,则跳过该方法;
- 计算属性 key;
- 从 config 对象和 parameters 对象中根据 key 获取属性值 value,如果存在多个则拼接,并以逗号分隔;
- 最后将 key、value 设置到 parameters 对象中;
2.methodName = getParameters,且返回类型为 Map,则获取并遍历该 Map,将所有数据设置到 parameters 对象中:
创建服务代理
Dubbo 在 init() 方法中调用 createProxy 方法创建服务代理:
createProxy 内部主要有四个步骤:
- 获取服务地址列表;
- 遍历服务地址列表,调用 Protocol.refer 方法引入服务,得到远程服务的本地代理 invoker 对象;
- 如果得到多个 invoker 对象,则调用 Cluster.join 将它们合并为一个 invoker 对象;
- 调用 ProxyFactory.getProxy 创建代理,将 invoker 对象转换为 interface 对应的 Proxy 对象。
接下来就看看这四个步骤的实现。
1. 获取服务地址列表
Dubbo 根据是否配置了引用同一个 jvm,分为两种情况调用 refer 方法,即本地服务和远程服务。判断方式如下:
是否引用同一个 jvm 的判断步骤如下:
- 配置了 injvm 属性,则直接返回该属性值;
- reference 配置是否指定了 url 属性,指定了则返回 false;
- 判断 scope 属性,为 local 则返回 true;
- 判断引用的服务是否在当前 jvm,是则返回 true;
如果调用 shouldJvmRefer 结果为 true,则服务地址为本地,并直接引入本地服务:
如果调用 shouldJvmRefer 结果为 false,则引用远程服务,也是我们重点关注的逻辑。
获取远程服务的地址逻辑如下:
这里根据是否设置了 url 属性分成两种情况处理。
1.设置了 url 属性。比如:
<dubbo:reference url="127.0.0.1:20880" interface="com.wlm.dubbo.service.HelloWorld" id="helloWorld" version="1.0" />
url 内可以拼接多个地址,地址内容即可以是引用的服务的地址,也可以是注册中心的地址,根据协议头区分。
Dubbo 首先将地址字符串转换为 URL 对象,然后判断地址类型是否为注册中心(即协议头是否为 registry):
- 地址类型为注册中心地址:将前面组装的属性拼接好,作为 refer 属性添加到注册中心 URL 对象;
- 地址类型为远程服务地址:将前面组装的属性和 URL 对象的属性合并。
2.未设置 url 属性。则判断 scope 属性,如果 scope != local,则加载注册中心的地址,与服务导出时调用的是同一个方法,通过入参 isProvider 区分是服务提供者还是消费者。然后将前面组装的属性拼接好,作为 refer 属性添加到注册中心 URL 对象。
注:这里还会加载监控信息,如果有的话会作为 monitor 属性添加到 URL 对象。这部分属于监控相关的逻辑,
服务导出时也会加载,不属于本文重点,后续单独介绍。
2. Protocol.refer
获取到服务地址列表后,接下来就是遍历服务列表,调用 Protocol.refer 引入服务:
如果获取的服务地址数量大于 1,则调用完 refer 方法后,还要调用 Cluster.join 将多个 invoker 合并成一个 cluster invoker,即集群类型,这部分逻辑在下面介绍。
不管 url 的数量有多少,调用 refer 方法的逻辑都是一样的。这里 Protocol 支持 SPI 扩展,而 Protocol 接口的 Wrapper 类型的实现类,在服务导出时介绍过,多个 Wrapper 类会形成 Protocol 调用链。
这里以未显示设置 url 为例,即 urls 的数据类型都是注册中心(协议头为 registry),形成的 Protocol 调用链为:
前面示例中配置的协议为 "dubbo",对应的 Protocol 调用链为:
整体的流程如下:
主要分为以下几个部分:
- 启动 qos server;
- 注册消费者;
- 构建路由策略链;
- 订阅数据
- notify
- 真正的服务引入
- 发布服务引入事件
- 构建 filter 链
- 合并 invokers
其中 1 在 QosProtocolWrapper 类,2、3、4、5、9 在 RegistryProtocol 类,5 在 AbstractRegistry 类,6 在 DubboProtocol 类,7 在 ProtocolListenerWrapper 类,8 在 ProtocolFilterWrapper 类。
服务导出时也有 notify 操作,与服务导出相比,服务引入多了 构建路由策略链、合并 invokers 两个步骤。
2.1 启动 qos server
启动 qos server 的逻辑与服务导出一致,依赖于 netty,启动时注册解码器 QosProcessHandler 和监听 qos 端口,这里就不赘述:
2.2 注册消费者
接下来就进入 RegistryProtocol.refer 的逻辑:
先转换 URL 的协议头,转换结果如下:
// 转换协议头前的 url
registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=wlm
&dubbo=2.0.2&pid=85812&refer=application%3Dwlm%26check%3Dfalse%26dubbo%3D2.0.2%26init
%3Dtrue%26interface%3Dcom.wlm.dubbo.service.HelloWorld%26lazy%3Dfalse%26methods%3D
sayHello%26pid%3D85812%26protocol%3Ddubbo%26register.ip%3D192.168.199.243%26release
%3D2.7.3%26revision%3D1.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D600000%26
timestamp%3D1578817753832%26version%3D1.0®istry=zookeeper&release=2.7.3
×tamp=1578817754368
// 转换协议头后的 url
zookeeper://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=wlm
&dubbo=2.0.2&pid=85812&refer=application%3Dwlm%26check%3Dfalse%26dubbo%3D2.0.2%26init
%3Dtrue%26interface%3Dcom.wlm.dubbo.service.HelloWorld%26lazy%3Dfalse%26methods%3D
sayHello%26pid%3D85812%26protocol%3Ddubbo%26register.ip%3D192.168.199.243%26release
%3D2.7.3%26revision%3D1.0%26side%3Dconsumer%26sticky%3Dfalse%26timeout%3D600000%26
timestamp%3D1578817753832%26version%3D1.0&release=2.7.3×tamp=1578817754368
然后调用 RegistryFactory 获取注册中心,支持 SPI 扩展,这里配置的是 "zookeeper",因此调用 ZookeeperRegistryFactory 创建注册中心,最终得到 ZookeeperRegistry 注册中心。
再接下来判断是否配置了 group 属性,会影响到后面 "2.9 合并invokers" 的实现:如果配置了则传入的 cluster 实现为 MergeableClusterInvoker,如果未配置则根据 SPI 获取,此处使用默认实现 FailoverCluster。
最后调用 doRefer 进行服务引入:
这里关注注册消费者的实现,其他实现在下文其他部分单独介绍。
先根据注册中心的属性生成消费者 url,得到 subscribeUrl 如下:
consumer://192.168.199.243/com.wlm.dubbo.service.HelloWorld?application=wlm
&check=false&dubbo=2.0.2&init=true&interface=com.wlm.dubbo.service.HelloWorld
&lazy=false&methods=sayHello&pid=86042&protocol=dubbo&release=2.7.3&revision=1.0
&side=consumer&sticky=false&timeout=600000×tamp=1578818975240&version=1.0
然后在 subscribeUrl 基础上添加 category 属性,作为待注册的消费者 url,得到 registeredConsumerUrl:
consumer://192.168.199.243/com.wlm.dubbo.service.HelloWorld?application=wlm
&category=consumers&check=false&dubbo=2.0.2&init=true&interface=com.wlm.dubbo.service.HelloWorld
&lazy=false&methods=sayHello&pid=86042&protocol=dubbo&release=2.7.3&revision=1.0
&side=consumer&sticky=false&timeout=600000×tamp=1578818975240&version=1.0
然后将 registeredConsumerUrl 作为 Registry.register 的入参,创建 zookeeper 目录节点:
public void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
最终消费者地址注册到 zookeeper 路径:/dubbo/com.wlm.dubbo.service.HelloWorld/consumers/,内容如下:
2.3 构建路由策略链
Dubbo 调用 RegistryDirectory.buildRouterChain 构建路由策略链实例对象,入参为 subscribeUrl:
public void buildRouterChain(URL url) {
this.setRouterChain(RouterChain.buildChain(url));
}
public static <T> RouterChain<T> buildChain(URL url) {
return new RouterChain<>(url);
}
private RouterChain(URL url) {
List<RouterFactory> extensionFactories = ExtensionLoader.getExtensionLoader(RouterFactory.class)
.getActivateExtension(url, (String[]) null);
List<Router> routers = extensionFactories.stream()
.map(factory -> factory.getRouter(url))
.collect(Collectors.toList());
initWithRouters(routers);
}
在实例化 RouterChain 对象时,通过路由策略工厂 RouterFactory 获取路由策略,RouterFactory 支持 SPI 扩展,通过 getActivateExtension 获取到的默认实现有四个:
循环调用 RouterFactory.getRouter 得到的路由策略如下:
2.4 订阅数据
接下来就是调用 RegistryDirectory.subscribe 订阅数据,订阅之前为 subscribeUrl 添加属性 category=providers,configurators,routers,得到入参 url 如下:
consumer://192.168.199.243/com.wlm.dubbo.service.HelloWorld?application=wlm
&category=providers,configurators,routers&check=false&dubbo=2.0.2&init=true
&interface=com.wlm.dubbo.service.HelloWorld&lazy=false&methods=sayHello&pid=86252
&protocol=dubbo&release=2.7.3&revision=1.0&side=consumer&sticky=false&timeout=600000
×tamp=1578819977201&version=1.0
RegistryDirectory.subscribe 的实现如下:
public void subscribe(URL url) {
setConsumerUrl(url);
// 订阅配置中心
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
// 订阅 url
registry.subscribe(url, this);
}
RegistryDirectory 实现了 NotifyListener 接口,RegistryDirectory 的实例对象作为传入 Registry.subscribe 方法的参数,在订阅的数据发生变化时会被通知,Registry 为前面获取到的注册中心实例 ZookeeperRegistry。
这里订阅 url 数据和服务导出的实现也是一样的:
区别在于 toCategoriesPath 获取到的 category 目录列表不一样,此处为:
/dubbo/com.wlm.dubbo.service.HelloWorld/providers
/dubbo/com.wlm.dubbo.service.HelloWorld/configurators
/dubbo/com.wlm.dubbo.service.HelloWorld/routers
接下来遍历 category 目录列表,创建目录节点并注册 ChildListener 监听器。
这里先启动了服务提供者,因此 providers 目录节点下有数据,最终得到的 urls 如下:
dubbo://192.168.199.243:20880/com.wlm.dubbo.service.HelloWorld?anyhost=true
&application=wlm&bean.name=com.wlm.dubbo.service.HelloWorld&deprecated=false
&dubbo=2.0.2&dynamic=true&generic=false&interface=com.wlm.dubbo.service.HelloWorld
&methods=sayHello&pid=77895®ister=true&release=2.7.3&revision=1.0&service.filter=dubboFilter
&side=provider×tamp=1578734146864&version=1.0
empty://192.168.199.243/com.wlm.dubbo.service.HelloWorld?application=wlm&category=configurators
&check=false&dubbo=2.0.2&init=true&interface=com.wlm.dubbo.service.HelloWorld&lazy=false
&methods=sayHello&pid=86460&protocol=dubbo&release=2.7.3&revision=1.0&side=consumer
&sticky=false&timeout=600000×tamp=1578821094174&version=1.0
empty://192.168.199.243/com.wlm.dubbo.service.HelloWorld?application=wlm&category=routers
&check=false&dubbo=2.0.2&init=true&interface=com.wlm.dubbo.service.HelloWorld&lazy=false
&methods=sayHello&pid=86460&protocol=dubbo&release=2.7.3&revision=1.0&side=consumer
&sticky=false&timeout=600000×tamp=1578821094174&version=1.0
如果某个服务没有服务提供者,也是能进行服务引入操作的,因为这里会创建一个 providers 目录,并注册监听器,后续服务提供者上线后,会调用 notify 通知消费者。
接下来主动触发 notify。
2.5 notify
notify 是 AbstractRegistry 定义的方法,当服务提供者发生变化时,会调用该方法进行通知:
先将 urls 转换为 map 格式,以 category 作为 key,此处转换后的 result 数据如下:
然后遍历 result,调用 NotifyListener.notify 通知监听器。
前面介绍订阅数据流程时说过,传入的 NotifyListener 入参是 RegistryDirectory 对象,notify 的实现如下:
先将 urls 进行分类,目前有三种:configurators、routers、providers。如果有数据的话,再将 url 数据转换成对应的内部对象,添加到本地属性中。
这里重点关注 providers 分类的 url 数据,调用 refreshOverrideAndInvoker 方法转换成 invoker 的逻辑。
前面展示的 urls 中,有些协议头为 "empty" 的,是无效 url,因此在转换 url 的过程中,会被跳过。
最终调用 Protocol.refer 引入服务,此处协议头为 "dubbo",因此会调用 DubboProtocol:
这里生成的 key 为 url 的 fullString:
dubbo://192.168.199.243:20880/com.wlm.dubbo.service.HelloWorld?anyhost=true
&application=wlm&bean.name=com.wlm.dubbo.service.HelloWorld&check=false&deprecated=false
&dubbo=2.0.2&dynamic=true&generic=false&init=true&interface=com.wlm.dubbo.service.HelloWorld
&lazy=false&methods=sayHello&pid=86862&protocol=dubbo®ister=true®ister.ip=192.168.199.243
&release=2.7.3&remote.application=wlm&revision=1.0&service.filter=dubboFilter&side=consumer
&sticky=false&timeout=600000×tamp=1578734146864&version=1.0
2.6 真正的服务引入
DubboProtocol 继承了 AbstractProtocol,AbstractProtocol 调用子类实现的 protocolBindingRefer 方法,并将结果封装在 AsyncToSyncInvoker 返回:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
DubboProtocol 的实现如下:
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
Dubbo 调用 getClients 获取和服务端的连接,并封装成 DubboInvoker 对象返回。
Dubbo 中的连接分为共享连接和独立连接,如果配置了 connections 属性,则使用独立连接,否则使用共享连接。有关这部分后续单独介绍,这里重点关注如何建立连接,也就是 initClient 的实现。
initClient 的时序图如下:
Exchanger、Transporter、Client 都支持 SPI 扩展,相关的概念已经在介绍服务导出流程时解释过,此处不再赘述。
此处使用了 Dubbo 的默认的传输协议 netty,对应 NettyClient,在实例化时会建立和服务端的连接。
先调用 doOpen 初始化,注册编解码器 NettyCodecAdapter(和服务导出一致):
再调用 doConnect 建立连接,host 和 port 都从服务提供者 url 中获取:
2.7 发布服务引入事件
ProtocolListenerWrapper 调用完 refer 方法后,会返回 ListenerInvokerWrapper 包装类的实例对象,通过 Dubbo SPI 机制获取监听器 InvokerListener 列表,作为入参传递到 ListenerInvokerWrapper 的构造器中:
并在对象构造器内调用 InvokerListener.referred() 发布服务引入事件:
2.8 构建 filter 链
接下来进入 ProtocolFilterWrapper 构建 filter 链的逻辑:
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), REFERENCE_FILTER_KEY, CommonConstants.CONSUMER);
}
这里与服务导出时构建 filter 链的逻辑一致,就不赘述,区别在于此处传入的 key=reference.filter,group=consumer。
默认的服务消费者 filter 链如下:
2.9 合并 invokers
到这里,RegistryDirectory.subscribe 的逻辑执行完成,RegistryDirectory 包含了注册中心地址、注册中心内的服务提供者 invoker 列表、路由策略链等数据:
服务提供者 invoker 也是一个嵌套的结构,从上到下对应:ConsumerContextFilter -> FutureFilter -> MonitorFilter -> AsyncToSyncInvoker -> DubboInvoker,而 DubboInvoker 内包含了和服务端的连接 ExchangeClient 数组:
Dubbo 调用 Cluster.join 将该 Directory 对象内的多个 invokers 合并在一起,Cluster 支持 SPI 扩展,最终得到一个嵌套的 invoker 对象:MockClusterInvoker -> FailoverClusterInvoker。
3. 合并 invokers
如果前面获取服务地址列表是,得到的地址数量 > 1,则会进入合并 invokers 的处理:
这里的合并 invokers 与前面 Protocol.refer 的区别在于:
- Protocol.refer 针对的是某一个 url,如果 url 类型是注册中心,最终会得到一批服务提供者 invoker 列表,因此该合并针对的是同一个注册中心;
- 此处 url 有多个时,针对每个 url 执行 Protocol.refer 都会得到一个 invoker 对象,因此该合并针对的是多注册中心或多 url。
合并的实现都是基于 Cluster.join 接口,就不赘述。
4. 创建代理
前面的过程主要是把服务提供者转换成 invoker 对象,而这里是将 invoker 对象转换为服务的本地代理对象。
ProxyFactory 支持 SPI 扩展,默认获取到的是一个 ProxyFactory 链:StubProxyFactoryWrapper -> JavassistProxyFactory。
JavassistProxyFactory 依赖于 javassist 组件:
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
先根据引用的服务的 Class 对象创建代理 Proxy 对象,再将 invoker 对象封装到 InvokerInvocationHandler 对象,作为 Proxy 对象实例化的入参。
生成动态代理类的实现不是本文重点,感兴趣的读者自行了解 javassist 动态生成代理类的过程。
总结
本篇文章侧重于 Dubbo 服务引入的实现细节,主要包括:服务引入入口,获取服务地址列表,启动 qos server,注册消费者,构建路由策略链,订阅数据,notify,服务引入,发布服务引入事件,构建 filter 链,合并 invokers 等。其中省略了很多细节,限于篇幅,读者可自行查看。