Dubbo 服务暴露过程中,主要有Dubbo的xml标签解析器、装配ServiceBean、服务暴露、订阅与通知、元数据注册等步骤,其中服务暴露又分为本地暴露和远程暴露,远程暴露中主要有将invoker转换为exporter、启动netty、注册zookeeper等,下面进行详细解析。
2.1 Dubbo自定义xml解析器
Dubbo定义了DubboNamespaceHandler、DubboBeanDefinitionParser 来处理xml的BeanDefinition,并将BeanDefinition转换成相应的对象。
2.1.1 DubboNamespaceHandler 添加过程
当Spring Boot启动时,会通过 @ImportResource("classpath:spring/applicationContext.xml")
来加载xml配置文件,dubbo的xml配置文件就是通过 applicationContext.xml
文件引入的,然后通过Spring 的loadBeanDefinitions(beanFactory)
来获取Xml的空间命名处理器,具体代码如下:
protected int doLoadBeanDefinitions(InputSource inputSource, Resource resource)
throws BeanDefinitionStoreException {
try {
Document doc = doLoadDocument(inputSource, resource);
return registerBeanDefinitions(doc, resource);
}
...
}
public int registerBeanDefinitions(Document doc, Resource resource) throws BeanDefinitionStoreException {
BeanDefinitionDocumentReader documentReader = createBeanDefinitionDocumentReader();
int countBefore = getRegistry().getBeanDefinitionCount();
documentReader.registerBeanDefinitions(doc, createReaderContext(resource));
return getRegistry().getBeanDefinitionCount() - countBefore;
}
public XmlReaderContext createReaderContext(Resource resource) {
return new XmlReaderContext(resource, this.problemReporter, this.eventListener,
this.sourceExtractor, this, getNamespaceHandlerResolver());
}
代码中,将xml转换成为到Document,创建 XmlReaderContext,通过 getNamespaceHandlerResolver -> createDefaultNamespaceHandlerResolver -> getHandlerMappings
,获取命名空间处理器的。具体的代码如下:
private Map<String, Object> getHandlerMappings() {
if (this.handlerMappings == null) {
synchronized (this) {
if (this.handlerMappings == null) {
try {
Properties mappings = PropertiesLoaderUtils.loadAllProperties(this.handlerMappingsLocation, this.classLoader);
Map<String, Object> handlerMappings = new ConcurrentHashMap<String, Object>(mappings.size());
CollectionUtils.mergePropertiesIntoMap(mappings, handlerMappings);
this.handlerMappings = handlerMappings;
}
...
}
}
}
return this.handlerMappings;
}
注:Double-Check的方式避免非线程安全问题。这里的getHanndlerMappings()
在debug时,是通过IDEA自动启动的线程执行的,因为IDEA在Debug时,会自动重启线程执行类的 toString()
方法,而这里的 toString()
方法如下:
public String toString() {
return "NamespaceHandlerResolver using mappings " + getHandlerMappings();
}
不在Debug状态时,是在对元素解析时,进行获取的,parseCustomElement -> resolve
,代码如下:
public BeanDefinition parseCustomElement(Element ele, BeanDefinition containingBd) {
String namespaceUri = getNamespaceURI(ele);
NamespaceHandler handler = this.readerContext.getNamespaceHandlerResolver().resolve(namespaceUri);
...
return handler.parse(ele, new ParserContext(this.readerContext, this, containingBd));
}
public NamespaceHandler resolve(String namespaceUri) {
Map<String, Object> handlerMappings = getHandlerMappings();
Object handlerOrClassName = handlerMappings.get(namespaceUri);
...
NamespaceHandler namespaceHandler = (NamespaceHandler) BeanUtils.instantiateClass(handlerClass);
namespaceHandler.init();
handlerMappings.put(namespaceUri, namespaceHandler);
...
}
}
2.1.2 DubboBeanDefinitionParser 添加过程
从上面代码中可以看出,resolve -> init
,对命名空间处理器进行了初始化,初始化时注册了标签的解析器,代码如下:
public void init() {
registerBeanDefinitionParser("application",
new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module",
new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry",
new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center",
new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report",
new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor",
new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider",
new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer",
new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol",
new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service",
new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference",
new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
2.1.3 DubboBeanDefinitionParser 解析过程
添加完 DubboBeanDefinitionParser 解析器后,就是对标签的解析过程:
public BeanDefinition parseCustomElement(Element ele, BeanDefinition containingBd) {
...
return handler.parse(ele, new ParserContext(this.readerContext, this, containingBd));
}
2.2 装配 ServiceBean
完成了对xml标签的解析后,Spring会通过finishBeanFactoryInitialization(beanFactory);
对ServiceBean进行初始化,具体调用步骤如下:
AbstractApplicationContext#finishBeanFactoryInitialization
|- AbstractBeanFactory#createBean
|- AbstractAutowireCapableBeanFactory#doCreateBean
|- AbstractAutowireCapableBeanFactory#createBeanInstance
|- AbstractAutowireCapableBeanFactory#instantiateBean
|- SimpleInstantiationStrategy#instantiate
|- BeanUtils.instantiateClass(constructorToUse);
从上面调用步骤可以看出,最后使用JDK的方式初始化ServiceBean的。
2.2.1 初始化静态成员
看看ServiceBean的静态成员变量都有哪些?
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
根据初始化的顺序,会先对静态成员变量初始化,然后才调用构造函数。根据SPI的原理,初始化这两个类时的都加载了哪些类?
META-INF/dubbo/internal/org.apache.dubbo.rpc.ProxyFactory:
stub=org.apache.dubbo.rpc.proxy.wrapper.StubProxyFactoryWrapper
jdk=org.apache.dubbo.rpc.proxy.jdk.JdkProxyFactory
javassist=org.apache.dubbo.rpc.proxy.javassist.JavassistProxyFactory
META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=org.apache.dubbo.rpc.support.MockProtocol
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=org.apache.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=org.apache.dubbo.rpc.protocol.hessian.HessianProtocol
http=org.apache.dubbo.rpc.protocol.http.HttpProtocol
org.apache.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=org.apache.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=org.apache.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=org.apache.dubbo.rpc.protocol.redis.RedisProtocol
rest=org.apache.dubbo.rpc.protocol.rest.RestProtocol
registry=org.apache.dubbo.registry.integration.RegistryProtocol
qos=org.apache.dubbo.qos.protocol.QosProtocolWrapper
没有@Adaptive注解,所以会动态创建类,动态类的字节码如下:
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("...");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("...");
}
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
...
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
...
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
...
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
package org.apache.dubbo.rpc;
import org.apache.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements org.apache.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
...
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public java.lang.Object getProxy(org.apache.dubbo.rpc.Invoker arg0, boolean arg1) throws org.apache.dubbo.rpc.RpcException {
...
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0, arg1);
}
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
2.2.2 构造函数初始化
public ServiceBean() {
super();
this.service = null;
}
可以看到没有做其他操作。
2.2.3 ApplicationContext 与 afterPropertiesSet 初始化
构造函数初始化后,会调用初始化函数,具体调用入口如下:
AbstractAutowireCapableBeanFactory#initializeBean:
protected Object initializeBean(final String beanName, final Object bean, RootBeanDefinition mbd) {
Object wrappedBean = bean;
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
}
try {
invokeInitMethods(beanName, wrappedBean, mbd);
}
if (mbd == null || !mbd.isSynthetic()) {
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
}
return wrappedBean;
}
从上面可以看到,调用顺序为:
applyBeanPostProcessorsBeforeInitialization
|- invokeInitMethods
|- ((InitializingBean) bean).afterPropertiesSet();
|- invokeCustomInitMethod(beanName, bean, mbd);
|- applyBeanPostProcessorsAfterInitialization
其中,applyBeanPostProcessorsBeforeInitialization
中又做了哪些初始化呢?
private void invokeAwareInterfaces(Object bean) {
if (bean instanceof Aware) {
if (bean instanceof EnvironmentAware) {
bean.setEnvironment(this.applicationContext.getEnvironment());
}
if (bean instanceof EmbeddedValueResolverAware) {
bean.setEmbeddedValueResolver(this.embeddedValueResolver);
}
if (bean instanceof ResourceLoaderAware) {
((ResourceLoaderAware) bean).setResourceLoader(this.applicationContext);
}
if (bean instanceof ApplicationEventPublisherAware) {
bean.setApplicationEventPublisher(this.applicationContext);
}
if (bean instanceof MessageSourceAware) {
((MessageSourceAware) bean).setMessageSource(this.applicationContext);
}
if (bean instanceof ApplicationContextAware) {
bean.setApplicationContext(this.applicationContext);
}
}
}
发现调用了 setApplicationContext
,ServiceBean的这个函数又做了什么呢?
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
supportedApplicationListener = addApplicationListener(applicationContext, this);
}
做了两件事,其一,将applicationContext
设置到SpringExtensionFactory
中,用于后续从SpringExtensionFactory
中获取Bean,其二,调用方法addApplicationListener
,之后将当前类(因为当前类监听了ContextRefreshedEvent事件)加入spring的监听器列表。
然后调用 ServiceBean 的 afterPropertiesSet, 初始化变量,判断延迟的事件是否大于0,如果是,执行export(),进行服务暴露,如果不是,结束(这种情况下服务暴露,会发生在发布上下文刷新事件的时候)
2.2.4 Spring上下文刷新时事件监听
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
...
export();
}
}
当服务没有暴露,并且没有反注册时,进行服务暴露。
2.3 服务暴露
服务暴露分为本地暴露和远程暴露,整个暴露的调用顺序如下:
ServiceBean#onApplicationEvent
|- ServiceConfig#export
|- ServiceConfig#checkAndUpdateSubConfigs
|- ServiceConfig#doExport
|- ServiceConfig#doExportUrls
|- AbstractInterfaceConfig#loadRegistries
|- ServiceConfig#doExportUrlsFor1Protocol
|- ServiceConfig#exportLocal
|- proxyFactory.getInvoker
|- protocol#export
|- proxyFactory.getInvoker
|- protocol#export
在本地暴露和远程暴露之前,会checkAndUpdateSubConfigs
检查和更新一下ServiceBean的配置,然后loadRegistries
获取要暴露的服务注册信息,即拼接一下URL,只有在远程暴露时才用到。另外,还做一些初始化工作,ProviderModel
、 ApplicationModel
,如下:
private void doExportUrls() {
List<URL> registryURLs = loadRegistries(true);
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
最终的暴露对象都存放在ServiceConfig中,如下:
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
另外,在服务暴露中比较重要角色之一协议(Protocol),实现的功能如下:
@SPI("dubbo")
public interface Protocol {
/**
* 获取缺省端口,当用户没有配置端口时使用。
*
* @return 缺省端口
*/
int getDefaultPort();
/**
* 暴露远程服务:<br>
* 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().
* setRemoteAddress();<br>
* 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
* 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
*
* @param <T> 服务的类型
* @param invoker 服务的执行体
* @return exporter 暴露服务的引用,用于取消暴露
* @throws RpcException 当暴露服务出错时抛出,比如端口已占用
*/
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
/**
* 引用远程服务:<br>
* 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端
* export()传入的Invoker对象的invoke()方法。<br>
* 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
* 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
*
* @param <T> 服务的类型
* @param type 服务的类型
* @param url 远程服务的URL地址
* @return invoker 服务的本地代理
* @throws RpcException 当连接服务提供方失败时抛出
*/
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
/**
* 释放协议:<br>
* 1. 取消该协议所有已经暴露和引用的服务。<br>
* 2. 释放协议所占用的所有资源,比如连接和端口。<br>
* 3. 协议在释放后,依然能暴露和引用新的服务。<br>
*/
void destroy();
}
ServiceBean都是对外提供服务,所以用的都是export,而ReferenceBean是调用服务,所以都是用refer。
2.3.1 拼凑URL
拼接URL分为两部分,一部分为远程暴露用的,一部分是为本地暴露用的。
-
远程暴露URL拼接
拼接的URL主要包括 address、application、registry-config、path、protocol等信息
protected List<URL> loadRegistries(boolean provider) { // check && override if necessary List<URL> registryList = new ArrayList<URL>(); if (CollectionUtils.isNotEmpty(registries)) { for (RegistryConfig config : registries) { String address = config.getAddress(); if (StringUtils.isEmpty(address)) { address = Constants.ANYHOST_VALUE; } if (!RegistryConfig.NO_AVAILABLE.equalsIgnoreCase(address)) { Map<String, String> map = new HashMap<String, String>(); appendParameters(map, application); appendParameters(map, config); map.put(Constants.PATH_KEY, RegistryService.class.getName()); appendRuntimeParameters(map); if (!map.containsKey(Constants.PROTOCOL_KEY)) { map.put(Constants.PROTOCOL_KEY, Constants.DUBBO_PROTOCOL); } List<URL> urls = UrlUtils.parseURLs(address, map); for (URL url : urls) { url = URLBuilder.from(url) .addParameter(Constants.REGISTRY_KEY, url.getProtocol()) .setProtocol(Constants.REGISTRY_PROTOCOL) .build(); if ((provider && url.getParameter(Constants.REGISTER_KEY, true)) || (!provider && url.getParameter(Constants.SUBSCRIBE_KEY, true))) { registryList.add(url); } } } } } return registryList; }
最终得到的url串如下:
dubbo://10.216.128.81:20880/com.gmr.dubbo.remote.facade.DemoFacade?anyhost=true&application=dubbo-demo-provider&bean.name=com.gmr.dubbo.remote.facade.DemoFacade&bind.ip=10.216.128.81&bind.port=20880&default.deprecated=false&default.dynamic=true&default.group=test&default.register=true&default.version=1.0&deprecated=false&dubbo=2.0.2&dynamic=true&environment=test&generic=false&interface=com.gmr.dubbo.remote.facade.DemoFacade&logger=slf4j&methods=sayHello&organization=decision&owner=hzguomeiran&pid=25468®ister=true&release=2.7.1&side=provider×tamp=1559268057411
-
本地暴露URL拼接
代码中可以看到拼接的URL 主要包括side、application、module、provider、protocol、method、revision、token、host、port 等信息。代码如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) { Map<String, String> map = new HashMap<String, String>(); map.put(Constants.SIDE_KEY, Constants.PROVIDER_SIDE); appendRuntimeParameters(map); appendParameters(map, application); appendParameters(map, module); appendParameters(map, provider, Constants.DEFAULT_KEY); appendParameters(map, protocolConfig); appendParameters(map, this); if (CollectionUtils.isNotEmpty(methods)) { for (MethodConfig method : methods) { ... } // end of methods for } if (ProtocolUtils.isGeneric(generic)) { map.put(Constants.GENERIC_KEY, generic); map.put(Constants.METHODS_KEY, Constants.ANY_VALUE); } else { String revision = Version.getVersion(interfaceClass, version); if (revision != null && revision.length() > 0) { map.put("revision", revision); } ... } if (!ConfigUtils.isEmpty(token)) { if (ConfigUtils.isDefault(token)) { map.put(Constants.TOKEN_KEY, UUID.randomUUID().toString()); } else { map.put(Constants.TOKEN_KEY, token); } } // export service String host = this.findConfigedHosts(protocolConfig, registryURLs, map); Integer port = this.findConfigedPorts(protocolConfig, name, map); URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map); if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .hasExtension(url.getProtocol())) { url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class) .getExtension(url.getProtocol()).getConfigurator(url).configure(url); } String scope = url.getParameter(Constants.SCOPE_KEY); // don't export when none is configured if (!Constants.SCOPE_NONE.equalsIgnoreCase(scope)) { // export to local if the config is not remote (export to remote only when config is remote) if (!Constants.SCOPE_REMOTE.equalsIgnoreCase(scope)) { exportLocal(url); } ... } ... }
最终得到url串如下:
injvm://127.0.0.1/com.gmr.dubbo.remote.facade.DemoFacade?anyhost=true&application=dubbo-demo-provider&bean.name=com.gmr.dubbo.remote.facade.DemoFacade&bind.ip=10.216.128.81&bind.port=20880&default.deprecated=false&default.dynamic=true&default.group=test&default.register=true&default.version=1.0&deprecated=false&dubbo=2.0.2&dynamic=true&environment=test&generic=false&interface=com.gmr.dubbo.remote.facade.DemoFacade&logger=slf4j&methods=sayHello&organization=decision&owner=hzguomeiran&pid=25468®ister=true&release=2.7.1&side=provider×tamp=1559268057411
2.3.2 本地暴露
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URLBuilder.from(url)
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(LOCALHOST_VALUE)
.setPort(0)
.build();
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
}
}
从代码中,可以看到先通过代理工厂,ProxyFactory获取Invoker,这里的ProxyFactory是通过动态代理生成的对象ProxyFactory$Adaptive,看看getInvoker具体做了什么?
public org.apache.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, org.apache.dubbo.common.URL arg2) throws org.apache.dubbo.rpc.RpcException {
org.apache.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
org.apache.dubbo.rpc.ProxyFactory extension = (org.apache.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
可以看到通过spi获取了javassist的代理工厂,JavassistProxyFactory,getInvoker
方法新建了一个Invoker对象,代码如下:
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
下面看看protocol.export
做了哪些工作:
public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
...
org.apache.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
...
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
[图片上传失败...(image-727c73-1614659196687)]
从图中,可以看到获取的protocol是InjvmProtocol,创建了Exporter对象,具体方法是:
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
最后将InjvmExporter添加到 exporters 中。
2.3.3 远程暴露
远程和本地暴露基本相同,具体操作如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker =
new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
-
RegistryProtocol
用DelegateProviderMetaDataInvoker将invoker进行了封装。除此之外,用到的protocol不同,是RegistryProtocol,产生的暴露器类型不同,如下图:
[图片上传失败...(image-158efa-1614659196687)]
具体代码如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { URL registryUrl = getRegistryUrl(originInvoker); // url to export locally URL providerUrl = getProviderUrl(originInvoker); final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener); //服务暴露 final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl); ... }
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> { Invoker<?> invokerDelegete = new InvokerDelegate<>(originInvoker, providerUrl); return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegete), originInvoker); }); }
-
DubboProtocol
这里先从缓存中看有没有注册过,如果没有再调用
protocol.export(invokerDelegete)
进行服务暴露,这里用的暴露协议是DubboProtocol,DubboProtocol协议在服务暴露时又做了哪些工作呢?public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap); exporterMap.put(key, exporter); // 启动服务 openServer(url); optimizeSerialization(url); return exporter; }
DubboProtocol#openServer |- DubboProtocol#createServer |- Exchangers.bind(url, requestHandler) |- Exchangers#getExchanger |- HeaderExchanger#bind |- Transporters#bind |- NettyServer#doOpen
Transporters 根据SPI获取默认Transporter,为NettyTransporter,bind实现的功能是创建一个NettyServer,具体创建过程如下:
protected void doOpen() throws Throwable { bootstrap = new ServerBootstrap(); bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true)); workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS), new DefaultThreadFactory("NettyServerWorker", true)); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this); channels = nettyServerHandler.getChannels(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE) .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { // FIXME: should we use getTimeout()? int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this); ch.pipeline() .addLast("decoder", adapter.getDecoder()) .addLast("encoder", adapter.getEncoder()) .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)) .addLast("handler", nettyServerHandler); } }); // bind ChannelFuture channelFuture = bootstrap.bind(getBindAddress()); channelFuture.syncUninterruptibly(); channel = channelFuture.channel(); }
这里启动一个nettyServer,是一个主从多线程模型。
-
注册到zk
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { ... // url to registry final Registry registry = getRegistry(originInvoker); final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl); ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl); //to judge if we need to delay publish boolean register = registeredProviderUrl.getParameter("register", true); if (register) { register(registryUrl, registeredProviderUrl); providerInvokerWrapper.setReg(true); } ... }
调用顺序如下:
RegistryProtocol#export |- RegistryProtocol#register |- RegistryService#register |- FailbackRegistry#register |- FailbackRegistry#doRegister |- ZookeeperRegistry#doRegister |- AbstractZookeeperClient#create
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
这行代码可以看出是创建的节点是否是临时节点,虽然给了默认值是true,但是在AbstractServiceConfig,给了默认值是false,protected Boolean dynamic = false;
所以在dubbo2.7版本中,默认创建不是临时节点。public void create(String path, boolean ephemeral) { if (!ephemeral) { if (checkExists(path)) { return; } } int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } if (ephemeral) { createEphemeral(path); } else { createPersistent(path); } }
这是个递归创建过程,由于临时节点下面不能再创建临时节点,所以只有叶子节点才能创建临时节点。当服务提供者断开连接时,仅仅删除的是叶子节点。
-
订阅处理
注册完服务之后,还需要实现服务的订阅功能,具体如下:
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException { ... final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl); final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); ... // Deprecated! Subscribe to override rules in 2.6.x or before. registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener); exporter.setRegisterUrl(registeredProviderUrl); exporter.setSubscribeUrl(overrideSubscribeUrl); ... }
具体订阅操作如下:
public void doSubscribe(final URL url, final NotifyListener listener) { try { ... for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } ... zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); ... } notify(url, listener, urls); } ... }
订阅Zookeeper节点是通过创建ChildListener来实现的具体调用的方法是
addChildListener()
,具体调用顺序如下:FailbackRegistry#subscribe |- ZookeeperRegistry#doSubscribe |- AbstractZookeeperClient#addChildListener |- CuratorZookeeperClient#addTargetChildListener |- client.getChildren().usingWatcher(listener).forPath(path); |- GetChildrenBuilderImpl#usingWatcher
client.getChildren().usingWatcher(listener).forPath(path)
这行代码是具体的监听订阅执行操作。public List<String> addChildListener(String path, final ChildListener listener) { ... TargetChildListener targetListener = listeners.get(listener); if (targetListener == null) { listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); targetListener = listeners.get(listener); } return addTargetChildListener(path, targetListener); }
具体做了以下工作:
- 根据
ChildListener
获取TargetChildListener
- 创建一个真正的用来执行当path节点的子节点发生变化时的逻辑
- 将刚刚创建出来的子节点监听器订阅path的变化,这样之后,path的子节点发生了变化时,
TargetChildListener
才会执行相应的逻辑。而实际上TargetChildListener
又会调用ChildListener
的实现类的childChanged(String parentPath, List<String> currentChilds)
方法,而该实现类,正好是ZookeeperRegistry中实现的匿名内部类,在该匿名内部类的childChanged(String parentPath, List<String> currentChilds)
方法中,调用了ZookeeperRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
方法
createTargetChildListener
会创建一个CuratorWatcher
来做具体的监听工作。static class CuratorWatcherImpl implements CuratorWatcher, TreeCacheListener { private CuratorFramework client; private volatile ChildListener childListener; private volatile DataListener dataListener; public CuratorWatcherImpl(CuratorFramework client, ChildListener listener) { this.client = client; this.childListener = listener; } ... @Override public void process(WatchedEvent event) throws Exception { if (childListener != null) { String path = event.getPath() == null ? "" : event.getPath(); childListener.childChanged(path, StringUtils.isNotEmpty(path) ? client.getChildren().usingWatcher(this).forPath(path) : Collections.<String>emptyList()); } }
- 根据
-
通知处理
通知处理的具体方法是
AbstractRegistry#notify
,主要做了以下工作:- 将urls按照category进行分类
- 进行properties设置和文件保存
- 调用传入放入listener的notify()方法
接着会调用
OverrideListener#notify
方法,具体实现如下:public synchronized void notify(List<URL> urls) { logger.debug("original override urls: " + urls); List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY)); ... doOverrideIfNecessary(); } public synchronized void doOverrideIfNecessary() { ... //The current, may have been merged many times URL currentUrl = exporter.getInvoker().getUrl(); //Merged with this configuration URL newUrl = getConfigedInvokerUrl(configurators, originUrl); newUrl = getConfigedInvokerUrl(serviceConfigurationListeners .get(originUrl.getServiceKey()) .getConfigurators(), newUrl); newUrl = getConfigedInvokerUrl(providerConfigurationListener .getConfigurators(), newUrl); if (!currentUrl.equals(newUrl)) { RegistryProtocol.this.reExport(originInvoker, newUrl); ... } }
这里发现,如果url发生变化,会重新暴露服务。总之,当前的provider订阅了/dubbo/.../configurators,当其下的子节点发生变化时,如果其下的子节点的url或者当前的providerUrl发生了变化,需要重新暴露。
2.3.4 元数据注册
当远程暴露、订阅和通知完成后,会进行元数据注册,代码如下:
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
...
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
...
}
位置如下图: