5.1 配置解析
目前Dubbo提供3中配置方式:XML,注解和属性文件(properties和ymal),最常用的是xml和注解。
5.1.1 schema解析
Spring框架对Java产生了深远的影响,Dubbo框架也直接继承了Spring的能力,利用了Spring配置文件扩展出自定义的解析方式。Dubbo配置约束文件在dubbo-config/dubbo-config-spring/src/main/resources/dubbo.xsd中,在IDEA中能自动找到这个文件,当用户使用属性时进行自动提示。
dubbo.xsd文件用来约束使用xml配置时的标签和属性。Spring在解析到自定义的namespace标签时(如<dubbo:service>标签),会查找对应的spring.schemas和spring.handlers文件,最终触发Dubbo的DubboNamespaceHandler类进行初始化和解析。
// spring.schemas 文件
http\://dubbo.apache.org/schema/dubbo/dubbo.xsd=META-INF/dubbo.xsd
http\://code.alibabatech.com/schema/dubbo/dubbo.xsd=META-INF/compat/dubbo.xsd
// spring.handlers文件
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
其中spring.schemas文件指定了约束文件的具体路径,spring.handlers文件指定了用DubboNamespaceHandler类来解析标签。因为Dubbo经历了从闭源到捐给apache开源的过程,所以两个文件各自包含两行内容。
Dubbo设计之初考虑到属性最大限度的使用,因此对schema进行了精心的设计,Dubbo schema层级的详细设计如下:
Dubbo设计的粒度很多都是针对方法级别的,比如方法级别的timeout、retries和mock特性。图中上边是有继承关系的类型,下边是独立的类型。
- consumerType 配置消费方全局的配置,比如tcp连接数connections,注意客户端的配置会覆盖providerType透传的属性。
- referenceType 配置消费方接口范围信息,比如引用的接口名称和是否泛化调用标志等。
- providerType 配置服务提供方的全局配置,比如服务费timeout。
- serviceType 配置服务提供方接口范围信息,比如服务暴露的接口和具体实现类。
- applicationType 配置应用级别信息。
- protocolType 配置服务提供者暴露的协议。
- registryType 配置注册中心地址和协议。
- monitorType 配置应用监控上报相关地址。
- parameterType 选项参数配置,可以作为dubbo:protocol、service、reference、provider、consumer的子标签,方便添加自定义参数,会透传到框架的url中。
- argumentType 配置应用方法参数等辅助信息,如高级特性中异步参数回调索引的配置。
- methodType 配置方法级别参数,主要用于dubbo:service、reference的子标签
- moduleType 配置应用所属模块相关信息
图中还少一个annotationType模块,主要用于配置项目要扫描的注解包。
下面以protocolType为例,我们看一个dubbo.xsd的真实配置:
<xsd:complexType name="protocolType">
<xsd:sequence minOccurs="0" maxOccurs="unbounded">
<xsd:element ref="parameter" minOccurs="0" maxOccurs="unbounded" />
</xsd:sequence>
...
<xsd:attribute name="default" type="xsd:string">
<xsd:annotation>
<xsd:documentation><![CDATA[ Is default. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:anyAttribute namespace="##other" processContents="lax" />
</xsd:complexType>
上面代码,我们可以简单理解为其为协议定义约束字段,只有在这里定义的属性才会在Dubbo的xml配置文件中智能提示,当我们基于Dubbo做二次开发时,应该在schema中添加合适的字段,同时应该在dubbo-config-api对应的Config类中添加属性和get&set方法,这样用户在配置属性时框架会自动注入这个值。光属性定义还不够,为了让Spring正确解析标签,我们要定义element标签,与上面的protocolType进行绑定,示例如下:
<xsd:element name="protocol" type="protocolType">
<xsd:annotation>
<xsd:documentation><![CDATA[ Service provider config ]]></xsd:documentation>
<xsd:appinfo>
<tool:annotation>
<tool:exports type="org.apache.dubbo.config.ProtocolConfig" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:element>
目前绝大多数场景使用默认的Dubbo配置就足够了,如果新增特性,比如增加epoll特性,则只需要在providerType、consumerType、ProviderConfig和ConsumerConfig中增加epoll属性和方法即可。如果使用已经存在schema类型(比如protocolType),则只需要添加新属性即可,也不需要定义新的element标签。如果接口是级别通用的,一般我们只需要在interfaceType中增加属性即可,继承自interfaceType的类型会拥有该字段。同理,在Dubbo对应类AbstractInterfaceConfig中增加属性和方法即可。(估计这里说的应该是源代码基础上直接修改的扩展,而不是通过继承或者配置的方式)
5.1.2 XML配置解析
前面我们已经了解了dubbo.xsd中约束的定义,以及如何扩展字段,接下来我们探讨框架是如何解析配置的。主要解析逻辑入口是在DubboNamespaceHandler类中完成的:
DubboNamespaceHandler主要把不同的标签关联到解析实现类中。registerBeanDefinitionParser方法约定了在Dubbo框架中遇到标签application、module和registry等都会委托给DubboBeanDefinitionParser处理。需要注意的是,在新版本中重写了注解实现,主要解决了以前实现的很多缺陷(比如无法处理AOP等)。
接下来我们看看DubboBeanDefinitionParser的实现,因为解析逻辑比较长,分段拆解来讲解。
前面的逻辑主要负责把标签解析成对应的Bean定义,并注册到Spring上下文中,同时保证id不重复。
接下来分析具体的标签是如何解析的:
上面代码的逻辑主要处理了嵌套标签的场景,其他的属性处理还在后面。
以上是核心属性值解析的注释代码,本质上都是把属性注入到Spring框架的BeanDefinition。如果属性是引用对象,则Dubbo会创建RuntimeBeanReference类型注入,运行时由Spring注入引用对象。通过对属性解析代码的理解,我们知道,其实Dubbo只做了属性提取的事情,运行时属性注入和转换都是Spring处理的。而且Dubbo最终生成的BeanDefinition也是委托Spring来创建其对应的Java对象。
5.1.3 注解配置解析
重启开源后,Dubbo的注解已经完全重写了,因为原来的注解是基于AnnotationBean实现的,主要存在以下几个问题:
- 注解支持不充分,需要XML配置<dubbo:annotation>
- @ServiceBean不支持Spring AOP
- @Reference不支持字段继承性
在原来的实现思路的基础上无法解决历史遗留问题,但采用另外一种思路实现可以很好地修复并改进遗留问题。
Dubbo注解核心组件:
@EnableDubbo的配置等同于@EnableDubboConfig+@DubboComponentScan(注意这种注解组合机制是由Spring实现的,并非Jdk提供的)。
先看@EnableDubboConfig:
@Import注解是Spring提供的自动导入BeanDefinition机制,Spring会自动调用其参数DubboConfigConfigurationRegistrar的registerBeanDefinitions方法:
内部通过调用了AnnotatedBeanDefinitionRegistryUtils#registerBeans方法来进行BeanDefinition的读取:
AnnotatedBeanDefinitionRegistryUtils#registerBeans方法通过新建一个Spring提供的AnnotatedBeanDefinitionReader来读取传进来的DubboConfigConfiguration.Single类上的注解。
Single类上使用了EnableConfigurationBeanBindings注解:
EnableConfigurationBeanBindings注解又一次使用了@Import注解来触发ConfigurationBeanBindingsRegister的registerBeanDefinitions方法的执行:
ConfigurationBeanBindingsRegister的registerBeanDefinitions方法又通过ConfigurationBeanBindingRegistrar去遍历EnableConfigurationBeanBindings注解中配置的每一个EnableConfigurationBeanBinding:
然后读取EnableConfigurationBeanBinding注解中配置的prefix和type类,进一步调用registerConfigurationBeans方法:
通过prefix从enviroment中将所有匹配的前缀配置读到configurationProperties这个map中,然后获取beanName(map中设置的id或者没有设置的话就根据类名生成),最后调用registerConfigurationBean去创建BeanDefinition并注册:
就这样,@EnableDubboConfig完成了配置文件与配置类的绑定与注册,如dubbo.application前缀的配置与ApplicationConfig这个Bean的BeanDefinition的绑定与注册。
另外要注意ConfigurationBeanBindingRegistrar#registerConfigurationBean的最后一行registerConfigurationBindingBeanPostProcessor方法,会向Spring注册一个ConfigurationBeanBindingPostProcessor,该类实现了BeanPostProcessor接口,意在postProcessBeforeInitialization方法中,对刚才注册的bean,在实例化之后,初始化之前,做属性绑定,即配置文件里面加载的属性map与bean的绑定。
接下来,我们再看@EnableDubbo引入的另一个注解@DubboComponentScan:
它会触发DubboComponentScanRegistrar的registerBeanDefinitions方法:
其中registerServiceAnnotationBeanPostProcessor方法会向Spring注册一个ServiceAnnotationBeanPostProcessor,该类实现了Spring的BeanDefinitionRegistryPostProcessor接口,在spring所有beanDefinition加载完但还没实例化的时候,注册Dubbo监听器以及扫描@DubboComponentScan定义的包中@Service注解,并作为ServiceBean类型的BeanDefinition向Spring注册:
这里千万要注意做了两次扫描,第一次对@Service注解的类注册普通bean的定义,第二次注册对应的Dubbo真正负责协议暴露和通信的ServiceBean。红框中是遍历第二次扫描结果,通过registerServiceBean方法,向Spring注册ServiceBean。
DubboComponentScanRegistrar除了扫描注册@Service以外,最后还调用了registerCommonBeans()方法(这个方法其实在@EnableDubboConfig中的DubboConfigConfigurationRegistrar中也有调用):
向Spring注册了一些监听器和处理器,其中的处理器ReferenceAnnotationBeanPostProcessor实现了Spring接口MergedBeanDefinitionPostProcessor,在Bean初始化前,这个处理器会查找bean中所有标注了@Reference或@DubboReference的字段和方法,对字段和方法与引用进行反射绑定,即将所引用的ReferenceBean注入进去。
补充:关于xml解析中没有讲到与@DubboComponentScan向对应的扫描@Service的逻辑,这个其实是在DubboNamespaceHandler的init方法中注册了一个处理注解的parser
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
这个parser内部实现了跟@DubboComponentScan
5.2 服务暴露的实现原理
5.2.1 配置承载初始化
不管是服务暴露还是服务消费,Dubbo框架都会根据优先级对配置信息做聚合处理,目前默认覆盖策略主要遵循以下几点:
(1) -D传递给JVM参数的优先级最高,如-Ddubbo.protocol.port=20880
(2) 代码或XML配置优先级次高,如xml指定<dubbo:protocol port="20880" />
(3) 配置文件优先级最低,如dubbo.properties中指定ddubbo.protocol.port=20880
Dubbo的配置还会收到provider的影响,这个属于运行期属性值的影响,同样遵循以下2点:
(1)如果只有provider端指定了配置,则会自动透传到客户端(即作为客户端的默认值)。
(2)如果客户端也配置了相应的属性,则覆盖服务端的配置。
5.2.2 远程服务的暴露机制
上图是整体RPC的暴露原理,Dubbo框架的服务暴露分量大部分,第一步是将持有的服务示例通过代理转成Invoker,第二步是把Invoker具体的协议(如Dubbo)转换成对应的Exporter。这里的Invoker可以简单理解为一个真实的服务对象实例,是Dubbo框架的实体域,所有模型都会向它靠拢,可以向它发起invoke调用。它可能是一个本地实现,也可能是一个远程实现,还可能是一个集群实现。
5.2.3 服务暴露的前置工作
接下来我们深入框架的实现细节,框架真正进行服务暴露的入口点是ServiceConfig#doExport方法,无论xml还是注解配置,最终都会解析成ServiceBean(ServiceConfig的子类)。我们先大致了解一下看看doExport方法是怎么被调用到的,前面@DubboComponentScan的注册器中提到最后注册了一些监听其,其中一个是DubboBootstrapApplicationListener,这个类实现了ApplicationListener接口,在Spring启动完成得到ContextRefreshedEvent事件通知后,会调用DubboBootstrap的start方法,start方法中调用的exportServices()就会遍历所有的ServiceConfig,并调用其export方法,进而调用doExport方法。(篇幅有限就不贴代码了)
Dubbo支持多注册中心同时写,如果配置了同时注册多个注册中心,则会在doExportUrls方法中依次暴露。同时也支持相同服务暴露多个协议,比如同时暴露Dubbo和REST协议,协议内部会依次对使用的协议都做一次服务暴露,每个协议注册元数据都会写入多个注册中心。而真正的暴露逻辑是在 doExportUrlFor1Protocol()方法中:
第一步,组装url
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
// 如果协议名为空,或空串,则将协议名变量设置为 dubbo
if (name == null || name.length() == 0) {
name = "dubbo";
}
Map<String, String> map = new HashMap<String, String>();
// 添加 side、版本、时间戳以及进程号等信息到 map 中
map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE);
map.put(Constants.DUBBO_VERSION_KEY, Version.getProtocolVersion());
map.put(Constants.TIMESTAMP_KEY, String.valueOf(System.currentTimeMillis()));
if (ConfigUtils.getPid() > 0) {
map.put(Constants.PID_KEY, String.valueOf(ConfigUtils.getPid()));
}
// 通过反射将对象的字段信息添加到 map 中
appendParameters(map, application);
appendParameters(map, module);
appendParameters(map, provider, Constants.DEFAULT_KEY);
appendParameters(map, protocolConfig);
appendParameters(map, this);
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (methods != null && !methods.isEmpty()) {
// 这段代码用于添加 Callback 配置到 map 中,代码太长,待会单独分析
}
// 检测 generic 是否为 "true",并根据检测结果向 map 中添加不同的信息
if (ProtocolUtils.isGeneric(generic)) {
map.put(Constants.GENERIC_KEY, generic);
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put("revision", revision);
}
// 为接口生成包裹类 Wrapper,Wrapper 中包含了接口的详细信息,比如接口方法名数组,字段信息等
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
// 添加方法名到 map 中,如果包含多个方法名,则用逗号隔开,比如 method = init,destroy
if (methods.length == 0) {
logger.warn("NO method found in service interface ...");
map.put(Constants.METHODS_KEY, Constants.ANY_VALUE);
} else {
// 将逗号作为分隔符连接方法名,并将连接后的字符串放入 map 中
map.put(Constants.METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 添加 token 到 map 中
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
// 随机生成 token
map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(Constants.TOKEN_KEY, token);
}
}
// 判断协议名是否为 injvm
if (Constants.LOCAL_PROTOCOL.equals(protocolConfig.getName())) {
protocolConfig.setRegister(false);
map.put("notify", "false");
}
// 获取上下文路径
String contextPath = protocolConfig.getContextpath();
if ((contextPath == null || contextPath.length() == 0) && provider != null) {
contextPath = provider.getContextpath();
}
// 获取 host 和 port
String host = this.findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = this.findConfigedPorts(protocolConfig, name, map);
// 组装 URL
URL url = new URL(name, host, port, (contextPath == null || contextPath.length() == 0 ? "" : contextPath + "/") + path, map);
// 省略无关代码
}
上面的代码首先是将一些信息,比如版本、时间戳、方法名以及各种配置对象的字段信息放入到 map 中,map 中的内容将作为 URL 的查询字符串。构建好 map 后,紧接着是获取上下文路径、主机名以及端口号等信息。最后将 map 和主机名等数据传给 URL 构造方法创建 URL 对象。需要注意的是,这里出现的 URL 并非 java.net.URL,而是com.alibaba.dubbo.common.URL。
上面省略了一段代码,这里简单分析一下。这段代码用于检测 <dubbo:method> 标签中的配置信息,并将相关配置添加到 map 中。代码如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// ...
// methods 为 MethodConfig 集合,MethodConfig 中存储了 <dubbo:method> 标签的配置信息
if (methods != null && !methods.isEmpty()) {
for (MethodConfig method : methods) {
// 添加 MethodConfig 对象的字段信息到 map 中,键 = 方法名.属性名。
// 比如存储 <dubbo:method name="sayHello" retries="2"> 对应的 MethodConfig,
// 键 = sayHello.retries,map = {"sayHello.retries": 2, "xxx": "yyy"}
appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
// 检测 MethodConfig retry 是否为 false,若是,则设置重试次数为0
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
// 获取 ArgumentConfig 列表
List<ArgumentConfig> arguments = method.getArguments();
if (arguments != null && !arguments.isEmpty()) {
for (ArgumentConfig argument : arguments) {
// 检测 type 属性是否为空,或者空串(分支1 ⭐️)
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
if (methods != null && methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// 比对方法名,查找目标方法
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
if (argument.getIndex() != -1) {
// 检测 ArgumentConfig 中的 type 属性与方法参数列表
// 中的参数名称是否一致,不一致则抛出异常(分支2 ⭐️)
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
// 添加 ArgumentConfig 字段信息到 map 中,
// 键前缀 = 方法名.index,比如:
// map = {"sayHello.3": true}
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config error: ...");
}
} else { // 分支3 ⭐️
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
// 从参数类型列表中查找类型名称为 argument.type 的参数
if (argclazz.getName().equals(argument.getType())) {
appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("argument config error: ...");
}
}
}
}
}
}
}
// 用户未配置 type 属性,但配置了 index 属性,且 index != -1
} else if (argument.getIndex() != -1) { // 分支4 ⭐️
// 添加 ArgumentConfig 字段信息到 map 中
appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("argument config must set index or type");
}
}
}
}
}
// ...
}
上面这段代码 for 循环和 if else 分支嵌套太多,导致层次太深,不利于阅读,需要耐心看一下。
大家在看这段代码时,注意把几个重要的条件分支找出来。只要理解了这几个分支的意图,就可以弄懂这段代码。请注意上面代码中⭐️符号,这几个符号标识出了4个重要的分支,下面用伪代码解释一下这几个分支的含义。
// 获取 ArgumentConfig 列表
for (遍历 ArgumentConfig 列表) {
if (type 不为 null,也不为空串) { // 分支1
1. 通过反射获取 interfaceClass 的方法列表
for (遍历方法列表) {
1. 比对方法名,查找目标方法
2. 通过反射获取目标方法的参数类型数组 argtypes
if (index != -1) { // 分支2
1. 从 argtypes 数组中获取下标 index 处的元素 argType
2. 检测 argType 的名称与 ArgumentConfig 中的 type 属性是否一致
3. 添加 ArgumentConfig 字段信息到 map 中,或抛出异常
} else { // 分支3
1. 遍历参数类型数组 argtypes,查找 argument.type 类型的参数
2. 添加 ArgumentConfig 字段信息到 map 中
}
}
} else if (index != -1) { // 分支4
1. 添加 ArgumentConfig 字段信息到 map 中
}
}
第二步,暴露服务:
有了前置工作,接下来就可以进行服务暴露了。服务暴露分为暴露到本地 (JVM),和暴露到远程。在深入分析服务导出的源码前,我们先来从宏观层面上看一下服务导出逻辑。如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
// 省略无关代码
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
// 加载 ConfiguratorFactory,并生成 Configurator 实例,然后通过实例配置 url
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
String scope = url.getParameter(Constants.SCOPE_KEY);
// 如果 scope = none,则什么都不做
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// scope != remote,导出到本地
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
// scope != local,导出到远程
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent(Constants.DYNAMIC_KEY, registryURL.getParameter(Constants.DYNAMIC_KEY));
// 加载监视器链接
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
// 将监视器链接作为参数添加到 url 中
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
String proxy = url.getParameter(Constants.PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(Constants.PROXY_KEY, proxy);
}
// 为服务提供类(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用于持有 Invoker 和 ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 导出服务,并生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
// 不存在注册中心,仅导出服务
} else {
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
this.urls.add(url);
}
上面代码根据 url 中的 scope 参数决定服务导出方式,分别如下:
- scope = none,不导出服务
- scope != remote,导出到本地
- scope != local,导出到远程
不管是导出到本地,还是远程。进行服务导出之前,均需要先创建 Invoker,这是一个很重要的步骤。因此下面先来分析 Invoker 的创建过程。
从前面的代码知道Invoker 是由 ProxyFactory 创建而来,Dubbo 默认的 ProxyFactory 实现类是 JavassistProxyFactory,我们来看看JavassistProxyFactory的getInvoker方法实现:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 为目标类创建 Wrapper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
// 创建匿名 Invoker 类对象,并实现 doInvoke 方法。
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 调用 Wrapper 的 invokeMethod 方法,invokeMethod 最终会调用目标方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
如上,JavassistProxyFactory 创建了一个继承自 AbstractProxyInvoker 类的匿名对象,并覆写了抽象方法 doInvoke。覆写后的 doInvoke 逻辑比较简单,仅是将调用请求转发给了 Wrapper 类的 invokeMethod 方法。Wrapper 用于“包裹”目标类,Wrapper 是一个抽象类,仅可通过 getWrapper(Class) 方法创建子类。在创建 Wrapper 子类的过程中,子类代码生成逻辑会对 getWrapper 方法传入的 Class 对象进行解析,拿到诸如类方法,类成员变量等信息。以及生成 invokeMethod 方法代码和其他一些方法代码。代码生成完毕后,通过 Javassist 生成 Class 对象,最后再通过反射创建 Wrapper 实例。
5.2.4 本地暴露
前面代码中已经看到了,通过调用ServiceConfig的exportLocal方法进行本地暴露:
private void exportLocal(URL url) {
// 如果 URL 的协议头等于 injvm,说明已经导出到本地了,无需再次导出
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL) // 设置协议头为 injvm
.setHost(LOCALHOST)
.setPort(0);
ServiceClassHolder.getInstance().pushServiceClass(getServiceClass(ref));
// 创建 Invoker,并导出服务,这里的 protocol 会在运行时调用 InjvmProtocol 的 export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
exportLocal 方法比较简单,首先根据 URL 协议头决定是否导出服务。若需导出,则创建一个新的 URL 并将协议头、主机名以及端口设置成新的值。然后创建 Invoker,并调用 InjvmProtocol 的 export 方法导出服务。下面我们来看一下 InjvmProtocol 的 export 方法都做了哪些事情:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
// 创建 InjvmExporter
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的 export 方法仅创建了一个 InjvmExporter,无其他逻辑。
5.2.5 远程暴露
与导出服务到本地相比,导出服务到远程的过程要复杂不少,其包含了服务导出与服务注册两个过程。这两个过程涉及到了大量的调用,比较复杂。按照代码执行顺序,本节先来分析服务导出逻辑,服务注册逻辑将在下一节进行分析。下面开始分析,我们把目光移动到 RegistryProtocol 的 export 方法上。
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 导出服务
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
// 获取注册中心 URL,以 zookeeper 注册中心为例,得到的示例 URL 如下:
// zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.17.48.52%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider
URL registryUrl = getRegistryUrl(originInvoker);
// 根据 URL 加载 Registry 实现类,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
// 获取已注册的服务提供者 URL,比如:
// dubbo://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL registeredProviderUrl = getRegisteredProviderUrl(originInvoker);
// 获取 register 参数
boolean register = registeredProviderUrl.getParameter("register", true);
// 向服务提供者与消费者注册表中注册服务提供者
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
// 根据 register 的值决定是否注册服务
if (register) {
// 向注册中心注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// 获取订阅 URL,比如:
// provider://172.17.48.52:20880/com.alibaba.dubbo.demo.DemoService?category=configurators&check=false&anyhost=true&application=demo-provider&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
// 创建监听器
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 向注册中心进行订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 创建并返回 DestroyableExporter
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
}
看上去比较复杂,主要做了:
- 调用doLocolExport暴露服务
- 向注册中心注册服务
- 向注册中心订阅override数据(即订阅configurators节点)
- 创建并返回DestroyableExporter
先看doLocolExport:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
// 访问缓存
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
// 创建 Invoker 为委托类对象
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 调用 protocol 的 export 方法导出服务
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
// 写缓存
bounds.put(key, exporter);
}
}
}
return exporter;
}
假设运行时协议为 dubbo,此处的 protocol 变量会在运行时加载 DubboProtocol,并调用 DubboProtocol 的 export 方法。所以,接下来我们目光转移到 DubboProtocol 的 export 方法上:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// 获取服务标识,理解成服务坐标也行。由服务组名,服务名,服务版本号以及端口组成。比如:
// demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
// 创建 DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
// 将 <key, exporter> 键值对放入缓存中
exporterMap.put(key, exporter);
// 本地存根相关代码
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
// 省略日志打印代码
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
// 启动服务器
openServer(url);
// 优化序列化
optimizeSerialization(url);
return exporter;
}
如上,我们重点关注 DubboExporter 的创建以及 openServer 方法,其他逻辑看不懂也没关系,不影响理解服务导出过程。另外,DubboExporter 的代码比较简单,就不分析了。下面分析 openServer 方法。
private void openServer(URL url) {
// 获取 host:port,并将其作为服务器实例的 key,用于标识当前的服务器实例
String key = url.getAddress();
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
// 访问缓存
ExchangeServer server = serverMap.get(key);
if (server == null) {
// 创建服务器实例
serverMap.put(key, createServer(url));
} else {
// 服务器已创建,则根据 url 中的配置重置服务器
server.reset(url);
}
}
}
如上,在同一台机器上(单网卡),同一个端口上仅允许启动一个服务器实例。若某个端口上已有服务器实例,此时则调用 reset 方法重置服务器的一些配置。接下来分析服务器实例的创建过程。如下:
private ExchangeServer createServer(URL url) {
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY,
// 添加心跳检测配置到 url 中
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
// 获取 server 参数,默认为 netty
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
// 通过 SPI 检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
// 添加编码解码器参数
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
// 创建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server...");
}
// 获取 client 参数,可指定 netty,mina
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
// 获取所有的 Transporter 实现类名称集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 检测当前 Dubbo 所支持的 Transporter 实现类名称列表中,
// 是否包含 client 所表示的 Transporter,若不包含,则抛出异常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type...");
}
}
return server;
}
如上,createServer 包含三个核心的逻辑。第一是检测是否存在 server 参数所代表的 Transporter 拓展,不存在则抛出异常。第二是创建服务器实例。第三是检测是否支持 client 参数所表示的 Transporter 拓展,不存在也是抛出异常。但创建服务器的操作目前还不是很清晰,我们继续往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 获取 Exchanger,默认为 HeaderExchanger。
// 紧接着调用 HeaderExchanger 的 bind 方法创建 ExchangeServer 实例
return getExchanger(url).bind(url, handler);
}
上面代码比较简单,就不多说了。下面看一下 HeaderExchanger 的 bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 创建 HeaderExchangeServer 实例,该方法包含了多个逻辑,分别如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
HeaderExchanger 的 bind 方法包含的逻辑比较多,但目前我们仅需关心 Transporters 的 bind 方法逻辑即可。该方法的代码如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handlers == null || handlers.length == 0) {
throw new IllegalArgumentException("handlers == null");
}
ChannelHandler handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素数量大于1,则创建 ChannelHandler 分发器
handler = new ChannelHandlerDispatcher(handlers);
}
// 获取自适应 Transporter 实例,并调用实例方法
return getTransporter().bind(url, handler);
}
如上,getTransporter() 方法获取的 Transporter 是在运行时动态创建的,类名为 TransporterAdaptive 会在运行时根据传入的 URL 参数决定加载什么类型的 Transporter,默认为 NettyTransporter。下面我们继续跟下去,这次分析的是 NettyTransporter 的 bind 方法。
public Server bind(URL url, ChannelHandler listener) throws RemotingException {
// 创建 NettyServer
return new NettyServer(url, listener);
}
这里仅有一句创建 NettyServer 的代码,无需多说,我们继续向下看。
public class NettyServer extends AbstractServer implements Server {
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// 调用父类构造方法
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}
}
public abstract class AbstractServer extends AbstractEndpoint implements Server {
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
// 调用父类构造方法,这里就不用跟进去了,没什么复杂逻辑
super(url, handler);
localAddress = getUrl().toInetSocketAddress();
// 获取 ip 和端口
String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
// 设置 ip 为 0.0.0.0
bindIp = NetUtils.ANYHOST;
}
bindAddress = new InetSocketAddress(bindIp, bindPort);
// 获取最大可接受连接数
this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
try {
// 调用模板方法 doOpen 启动服务器
doOpen();
} catch (Throwable t) {
throw new RemotingException("Failed to bind ");
}
DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
}
protected abstract void doOpen() throws Throwable;
protected abstract void doClose() throws Throwable;
}
上面代码多为赋值代码,不需要多讲。我们重点关注 doOpen 抽象方法,该方法需要子类实现。下面回到 NettyServer 中。
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
// 创建 boss 和 worker 线程池
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
// 创建 ServerBootstrap
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
bootstrap.setOption("child.tcpNoDelay", true);
// 设置 PipelineFactory
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// 绑定到指定的 ip 和端口上
channel = bootstrap.bind(getBindAddress());
}
以上就是 NettyServer 创建的过程,dubbo 默认使用的 NettyServer 是基于 netty 3.x 版本实现的,比较老了。因此 Dubbo 另外提供了 netty 4.x 版本的 NettyServer,大家可在使用 Dubbo 的过程中按需进行配置。
5.2.6 服务注册
服务注册操作对于Dubbo来说不是必须的,通过服务直连的方式就可以绕过注册中心。但通常不会这么做,因为不利于服务治理。下面以zookeeper作为注册中心分析服务注册的实现,我们回到RegistryProtocol 的 export 方法:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// ${导出服务}
// 省略其他代码
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 注册服务
register(registryUrl, registeredProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
// 订阅 override 数据
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
// 省略部分代码
}
RegistryProtocol 的 export 方法包含了服务导出,注册,以及数据订阅等逻辑。其中服务导出逻辑上一节已经分析过了,本节将分析服务注册逻辑,相关代码如下:
public void register(URL registryUrl, URL registedProviderUrl) {
// 获取 Registry
Registry registry = registryFactory.getRegistry(registryUrl);
// 注册服务
registry.register(registedProviderUrl);
}
register方法包含两步操作,第一步获取注册中心实例,第二步向注册中心注册服务。
public Registry getRegistry(URL url) {
url = url.setPath(RegistryService.class.getName())
.addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
String key = url.toServiceString();
LOCK.lock();
try {
// 访问缓存
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
// 缓存未命中,创建 Registry 实例
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry...");
}
// 写入缓存
REGISTRIES.put(key, registry);
return registry;
} finally {
LOCK.unlock();
}
}
protected abstract Registry createRegistry(URL url);
如上,getRegistry先访问缓存,缓存没有则调用createRegistry创建registry,然后写入缓存。这里的 createRegistry 是一个模板方法,具体由ZookeeperRegistryFactory实现:
public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
// zookeeperTransporter 由 SPI 在运行时注入,类型为 ZookeeperTransporter$Adaptive
private ZookeeperTransporter zookeeperTransporter;
public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
this.zookeeperTransporter = zookeeperTransporter;
}
@Override
public Registry createRegistry(URL url) {
// 创建 ZookeeperRegistry
return new ZookeeperRegistry(url, zookeeperTransporter);
}
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
// 获取组名,默认为 dubbo
String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(Constants.PATH_SEPARATOR)) {
// group = "/" + group
group = Constants.PATH_SEPARATOR + group;
}
this.root = group;
// 创建 Zookeeper 客户端,默认为 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加状态监听器
zkClient.addStateListener(new StateListener() {
@Override
public void stateChanged(int state) {
if (state == RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
});
}
在上面的代码代码中,我们重点关注 ZookeeperTransporter 的 connect 方法调用,这个方法用于创建 Zookeeper 客户端。创建好 Zookeeper 客户端,意味着注册中心的创建过程就结束了。接下来,再来分析一下 Zookeeper 客户端的创建过程。
前面说过,这里的 zookeeperTransporter 类型为自适应拓展类,因此 connect 方法会在被调用时决定加载什么类型的 ZookeeperTransporter 拓展,默认为 CuratorZookeeperTransporter。下面我们到 CuratorZookeeperTransporter 中看一看。
public ZookeeperClient connect(URL url) {
// 创建 CuratorZookeeperClient
return new CuratorZookeeperClient(url);
}
public class CuratorZookeeperClient extends AbstractZookeeperClient<CuratorWatcher> {
private final CuratorFramework client;
public CuratorZookeeperClient(URL url) {
super(url);
try {
// 创建 CuratorFramework 构造器
CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
.connectString(url.getBackupAddress())
.retryPolicy(new RetryNTimes(1, 1000))
.connectionTimeoutMs(5000);
String authority = url.getAuthority();
if (authority != null && authority.length() > 0) {
builder = builder.authorization("digest", authority.getBytes());
}
// 构建 CuratorFramework 实例
client = builder.build();
// 添加监听器
client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState state) {
if (state == ConnectionState.LOST) {
CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
} else if (state == ConnectionState.CONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
} else if (state == ConnectionState.RECONNECTED) {
CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
}
}
});
// 启动客户端
client.start();
} catch (Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
}
CuratorZookeeperClient 构造方法主要用于创建和启动 CuratorFramework 实例。以上基本上都是 Curator 框架的代码,大家如果对 Curator 框架不是很了解,可以参考 Curator 官方文档。
所以从代码来看,获取注册中心实例,实际上是已经连上注册中心了,接下来看如何向注册中心注册服务,调用的是ZookeeperRegistry的register方法:
public void register(URL url) {
super.register(url);
failedRegistered.remove(url);
failedUnregistered.remove(url);
try {
// 模板方法,由子类实现
doRegister(url);
} catch (Exception e) {
Throwable t = e;
// 获取 check 参数,若 check = true 将会直接抛出异常
boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
&& url.getParameter(Constants.CHECK_KEY, true)
&& !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
boolean skipFailback = t instanceof SkipFailbackWrapperException;
if (check || skipFailback) {
if (skipFailback) {
t = t.getCause();
}
throw new IllegalStateException("Failed to register");
} else {
logger.error("Failed to register");
}
// 记录注册失败的链接
failedRegistered.add(url);
}
}
如上,我们重点关注 doRegister 方法调用即可
protected void doRegister(URL url) {
try {
// 通过 Zookeeper 客户端创建节点,节点路径由 toUrlPath 方法生成,路径格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如
// /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register...");
}
}
如上,ZookeeperRegistry 在 doRegister 中调用了 Zookeeper 客户端创建服务节点。
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要创建的节点类型非临时节点,那么这里要检测节点是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 递归创建上一级路径
create(path.substring(0, i), false);
}
// 根据 ephemeral 的值创建临时或持久节点
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
好了,到此关于服务注册的过程就分析完了。整个过程可简单总结为:先创建注册中心实例,之再通过注册中心实例注册服务。
补充一下,服务暴露的同时还会初始化过滤器链:
注意我们使用DubboProtocol进行服务暴露时,DubboProtocol是SPI接口Protocol的实现,该接口还提供了ProtocolListenerWrapper和ProtocolFilterWrapper两个包装扩展,其中ProtocolFilterWrapper会在export方法中触发buildInvokerChain方法进行filter链的构造。