6.4Dubbo 的服务引入过程
dubbo 服务的引入过程,是在 referencebean 的实例化过程中实现的。当dubbo 启动过程中,遇到@reference,即会创建一个 referencebean 的实例。
此实例一样实现了 InitializingBean 接口,在其调用的afterPropertiesSet 方法中,会为服务调用方创建一个远程代理对象:
ref 是通过 interface 和 url 信息生成的代理:
意思即是说,protocol 协议制作了一个 invoker 对象,你可以通过invoker 对象,向 protocol 协议发送信息(网络传输)。借用 springRMI协议来说明一个这个过程:
一个 protocol 协议建立后,会得到一个 object 输出对象,输出到网络信息的动作,全仗此对象进行。
本来,此对象类型已经是我们的业务接口类型,我们可以直接使用此对象进行通信了。但是,考虑到 protocol 本身不应该跟具体的业务接口耦合,于是,我们再次插入了 invoker 实体来解耦双方:
1、将 protocol 生成的输出对象 object,包装成 invoker 对象
2、在业务操作端,为了方便操作,再做一个代理对象,来转请求到 invoker上
PS:Dubbo 中的 invoker 概念,作用不仅仅于此,它统一了 dubbo 中各组件间相互交流的规范,大家统一都用 invoker 进行粘合(书同文、车同轴)。
6.5rpc 过程概述
我们再归纳一个服务暴露与服务引入的过程,如下图:
1、网络数据的传输的过程,由 protocol 组件负责。无论你是什么协议实现,主要目标,就是将消费端的调用信息(interface 接口描述),传递到服务端(java 反射使用)。
2、服务端使用 protocol 来监听网络,当数据信息到来时,需要按信息指示(interface 描述),调用对应的 service 服务来响应。
3、消费端发起远程调用。它需要将本地代理对象动作,转换成调用信息(interface 描述),通过 protocol 发送到网络上。
4、整个过程自始至终涉及到的信息有两种,一种为 url 信息(协议头://ip:port/path),一种 interface 参数。为了简化三个部分的协作,Dubbo提出一个实体域对象 invoker(内部封装了 url 和 interface)。
5、invoker 的思想:万事万物只有一个调用入口,invoker 的 invoke 方法。因此,服务端 protocol 对本地 service 的调用,被封装成了 invoker;消费端发起远程调用到网络的动作,也被封装成了 invoker。
6、从此,服务端监听到的网络请求,将自动触发服务端绑定 invoker.invoke, 消费端直接调用消费端的 invoker.invoke 也自动将信息发送到了 protocol 网络上。
6.6Dubbo 的服务注册与发现机制
在上面的 dubbo 服务暴露和引入过程中,核心动作是两个,通过protocol.export(serviceInvoker)将服务暴露出去,通过protocol.refer(DemoService.class, registryUrl)将网络上的服务引入进来。
整个 rpc 链条已成闭环。那么 dubbo 的动态服务发布,注册中心又是如何集成上去的呢?
按照开闭原则,dubbo 希望只增加代码来完成功能的增强,杜绝更改代码的,如何做呢?我们前面的 SPI 思想此时就展示出它的威力了。
我们的 protocol 对象,本身只是个 SPI 适配对象,它可以根据 URL的不同,选择不同的 protocol 实现类。于是,dubbo 在此基础上,进行一层嵌套:将服务的注册/发现功能,抽象成一个 protocol 协议,即RegistryProtocol(并不是个真的协议实现,是来做服务发现注册使用), 并在 RegistryProtocol 对象内,再次嵌套一个真实的 protocol,整个结构如下图:
6.6.1 服务暴露时注册服务
ServiceConfig.doExportUrls 执行服务 export,而 doExportUrls 方法分成两部分:
1、获取所有注册中心的 URL
2、遍历所有协议 ProtocolConfig,将每个 protocol 发布到所有注册中心上
private void doExportUrls() {
// 1.获取注册中心
URLList registryURLs = loadRegistries(true);
// 2.遍历所有协议,export protocol 并注册到所有注册中心
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
其中,loadRegistries 获取配置的注册中心 URL,首先执行 checkRegistry,判断是否有配置注册中心,如果没有,则从默认配置文件dubbo.properties中读取dubbo.registry.address组装成RegistryConfig。
AbstractInterfaceConfig.checkRegistry();
if (registries == null || registries.isEmpty()) {
String address = ConfigUtils.getProperty("dubbo.registry.address");
if (address != null && address.length() > 0) {registries = new ArrayList();
String[] as = address.split("\\s*[|]+\\s*");
for (String a : as) {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress(a);
registries.add(registryConfig);
}
}
}
然后根据 RegistryConfig 的配置,组装 registryURL,形成的 URL 格式如下:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provid
er&istry=zookeeper
这个 URL 表示它是一个 registry 协议(RegistryProtocol),地址是注册中心的ip:port,服务接口是 RegistryService,registry 的类型为 zookeeper。
doExportUrlsFor1Protocol 发布服务和注册因为 dubbo 支持多协议配置,对于每个 ProtocolConfig 配置,组装 protocolURL,注册到每个注册中心上。
首先根据 ProtocolConfig 构建协议的 URL:
1. 设置 side=provider,dubbo={dubboVersion},timestamp=时间戳,pid=进程 id
2. 从 application,module,provider,protocol 配置中添加 URL 的 parameter
3. 遍历 dubbo:service 下的 dubbo:method 及 dubbo:argument 添加 URL 的 parameter
4. 判断是否泛型暴露,设置 generic,及 methods=*,否则获取服务接口的所有 method
5. 获取 host 及 port,host 及 port 都是通过多种方式获取,保证最终不为空
// 获取绑定的 ip,1 从系统配置获取, 2 从 protocolConfig 获取, 3 从 providerConfig 获取
// 4 获取 localhost 对应得 ipv45 连接注册中心获取 6 直接获取 localhost 对应的 ip(127.0.0.1)
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
// 获取绑定的 port,1 从系统配置 2 从 protocolConfig3 从 providerConfig4 从 defaultPort 之上随机取可用的
Integer port = this.findConfigedPorts(protocolConfig, name, map);
最终构建 URL 对象:
// 创建 protocol export url
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" :
contextPath + "/") + path, map);
构建出的 protocolURL 格式如下:
dubbo://192.168.199.180:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=192.168.199.180&bind.port=20880&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5744&qos.port=22222&side=provider×tamp=1530746052546
这个 URL 表示它是一个 dubbo 协议(DubboProtocol),地址是当前服务器的 ip,端口是要暴露的服务的端口号,可以从dubbo:protocol 配置,服务接口为 dubbo:service 配置发布的接口。
遍历所有的 registryURL,执行以下操作:
1. 给 registryURL 设置 EXPORT_KEY 为上面构建的 protocolURL。
2. 根据实现对象,服务接口 Class 和 registryuRL 通过 ProxyFactory 获取代理 Invoker(继承于 AbstractProxyInvoker)。
3. 将 Invoker 对象和 ServiceConfig 组装成 MetaDataInvoker,通过 protocol.export(invoker)暴露出去。
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
// 组装监控 URL
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
// 以 registryUrl 创建 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// 包装 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new
DelegateProviderMetaDataInvoker(invoker, this);
// 以 RegistryProtocol 为主,注册和订阅注册中心,并暴露本地服务端口
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
这里的 protocol 是一个适配代理对象,根据 SPI 机制,这里的 procotol.export 执行时,会根据 Invoker 的 URL 的 protocol 来选择具体的实现类,此处 URL 的协议头为 registry,因此方法会交由 RegistryProtocol 处理 export 过程。
RegistryProtocol.export 暴露服务,doLocalExport 内执行服务的暴露逻辑,后续执行注册中心信息注册逻辑。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 发布本地 invoker,暴露本地服务,打开服务器端口
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 根据 url 从 registryFactory 中获取对应的 registry
final Registry registry = getRegistry(originInvoker);
//......(省略部分代码)
if (register) {
// 向注册中心注册 providerUrl
register(registryUrl, registedProviderUrl);
// 本地注册表设置此 provider 注册完成
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
//......(省略部分代码)
// 向注册中心订阅提供者 url 节点
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//......(省略部分代码)
}
从 Invoker 中获取 providerURL,同传入的 Invoker 对象组装成 InvokerDelegete,通过 protocol.export 根据 providerURL(还是 SPI 自适配逻辑)暴露服务,打开服务器端口,获得 Exporter 缓存到本地。
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
// 获取 cache key
String key = getCacheKey(originInvoker);
// 是否存在已绑定的 exporter
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 封装 invoker 和 providerUrl
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker,
getProviderUrl(originInvoker));
// export provider invoker,Protocol 的具体实现是由 Url 中的 protocol 属性决定的
// 封装创建出的 exporter 和 origin invoker
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)
protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
6.6.2 服务引入时订阅服务地址
ReferenceConfig.init()方法,最后会来创建代理对象。
ref = createProxy(map);
private T createProxy(Map map) {
//......(省略部分代码)
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
}
//......(省略部分代码)
// 创建服务代理
return (T) proxyFactory.getProxy(invoker);
}
看这一段代码,前面部分,主要用来找到并校验配置的 urls,此 url 一般只是一个注册中心的 url。值类似下面这样:
registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?registry=zookeeper
所以当 refprotocol.refer 调用时,直接还是适配到 RegistryProtocol。
RegistryProtocol 的 refer 动作,会转发到 doRefer 方法:
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String,
String>(directory.getUrl().getParameters());
// 初始化订阅 URL
URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL,
parameters.remove(Constants.REGISTER_IP_KEY), 0, type.getName(), parameters);
if (!Constants.ANY_VALUE.equals(url.getServiceInterface())
&& url.getParameter(Constants.REGISTER_KEY, true)) {
registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY,
Constants.CONSUMERS_CATEGORY,
Constants.CHECK_KEY, String.valueOf(false)));
}
// 注册监听节点,即向注册中心订阅服务
directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY,
Constants.PROVIDERS_CATEGORY
+ "," + Constants.CONFIGURATORS_CATEGORY
+ "," + Constants.ROUTERS_CATEGORY));
// 包装一个 invoker 集群返回
Invoker invoker = cluster.join(directory);
ProviderConsumerRegTable.registerConsumer(invoker, url, subscribeUrl, directory);
return invoker;
}
这一段代码逻辑非常清晰,就是组装注册中心 URL,订阅服务。
6.6.3 注册/订阅逻辑
dubbo 的注册/订阅动作,主要涉及以下接口:
org.apache.dubbo.registry.Registry
其中 Registry继承自 RegistryService,负责注册/订阅动作:
public interface RegistryService { // Registry extends RegistryService
/**
* 注册服务.
* @param url 注册信息,不允许为空,如:
dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void register(URL url);
/**
* 取消注册服务.
* @param url 注册信息,不允许为空,如:
dubbo://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
*/
void unregister(URL url);
/**
* 订阅服务. *
* @param url 订阅条件,不允许为空,如:
consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void subscribe(URL url, NotifyListener listener);
/**
* 取消订阅服务.
* @param url 订阅条件,不允许为空,如:
consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @param listener 变更事件监听器,不允许为空
*/
void unsubscribe(URL url, NotifyListener listener);
/**
* 查询注册列表,与订阅的推模式相对应,这里为拉模式,只返回一次结果。
* @param url 查询条件,不允许为空,如:
consumer://10.20.153.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
* @return 已注册信息列表,可能为空,
*/
List<URL> lookup(URL url);
}
订阅方法需要一个监听器参数:
NotifyListener.java:
public interface NotifyListener {
/**
* 当收到服务变更通知时触发。
* @param urls 已注册信息列表,总不为空
*/
void notify(List<URL> urls);
}
方法动作很好理解,register 方法就是将 url 写入注册中心,subscribe 则将监听器注册到 url 上,当服务 url 有变化时,则触发 notify 方法。其每个服务最终注册的信息结构,示例如下(以消费端在 zookeeper 为例):
当某个服务发生变动,notify 触发回来的 urls 信息也同样包含这些信息当然,Dubbo 此处定义的 Registry 服务,是个扩展点,有很多实现,可心通过 SPI 机制适配实现类。
扩展点接口:org.apache.dubbo.registry.RegistryFactory
此处的扩展,与这个配置相关联,SPI 的机制不再展开细说。
<!-- 定义注册中心 -->
<dubbo:registry id="xxx1" address="xxx://ip:port" />
<!-- 引用注册中心,如果没有配置 registry 属性,将在 ApplicationContext 中自动扫描 registry 配置 -->
<dubbo:service registry="xxx1" />
<!-- 引用注册中心缺省值,当<dubbo:service>没有配置 registry 属性时,使用此配置 -->
<dubbo:provider registry="xxx1" />
有兴趣的,可以去看下 Dubbo 的监听实现逻辑类 RegistryDirectory,它的包装了 Dubbo 的发布订阅逻辑并且其本身也是看监听器。监听触发逻辑在 notify 方法中,主要职责便是监听到的 url 信息转化为 invoker 实体,提供给Dubbo 使用。
为了性能,在 RegistryDirectory 中,可以看到有很多的缓存容器,urlInvokerMap/ methodInvokerMap/ cachedInvokerUrls 等用来缓存服务的信息。也就是说,notify 的作用是更改这些缓存信息,而 Dubbo在 rpc 过程中,则是直接使用缓存中的信息。
这里要强调一下,在 Dubbo 中,URL 是整个服务发布和调用流程的串联信息,它包含了服务的基本信息(服务名、服务方法、版本、分组),注册中心配置,应用配置等等信息,还包括在 dubbo 的消费端发挥作用的各种组件信息如:filter、loadbalance、cluster 等等。
在消费端 notify 中收到这些 url 信息时,意味着这个组件信息也已经得到了。Dubbo 此时便扩展逻辑,来加入这些组件功能了。
最后,完整描述下服务注册与发现机制:
基于注册 中心的事件通知(订阅与发布),一切支持事件订阅与发布的框架都可以作为 Dubbo 注册中心的选型。
服务提供者在暴露服务时,会向注册中心注册自己,具体就是在${serviceinterface}/providers 目录下添加 一个节点(临时),服务提供者需要与注册中心保持长连接,一旦连接断掉(重试连接)会话信息失效后,注册中心会认为该服务提供者不可用(提供者节点会被删除)。
消费者在启动时,首先也会向注册中心注册自己,具体在${interfaceinterface}/consumers 目录下创建一个节点。
消费者订阅${service interface}/ [ providers、configurators、routers ]三个目录,这些目录下的节点删除、新增事件都会通知消费者,根据通知,重构服务调用器(Invoker)。