写在前面
这里只走读基本流程的代码。保证最少的代码走读,了解dubbo的服务发布流程,至于其扩展特性及服务治理相关的特性,在后面的走读具体特性代码的时候,在记录。
开始
这里通过spring自定义标签的方式,来走读代码。
服务发布的配置如下:
<dubbo:application name="demo-provider"/>
<dubbo:registry id="zkRegistry" address="zookeeper://172.16.6.85:2181"/>
<dubbo:protocol name="dubbo" port="20880" threads="10"/>
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" version="1.0.0" group="123" filter="testfilter"
registry="zkRegistry"/>
以上就是一个最基本的服务发布的配置。
我们来看下标签的解析代码:
DubboNamespaceHandler
为标签的解析类。
@Override
public void init() {
//解析application标签成ApplicationConfig类型的bean
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
//解析module标签成ModuleConfig类型的bean
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
//解析registry标签
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
//解析monitor标签
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
// //解析provider标签
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
//解析consumer标签
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
//解析protocol标签
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
//解析service标签
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
// //解析reference标签
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
// //解析annotation标签,该注解意思是通过注解的方式暴露服务。
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
服务的发布和发现都是通过这个handler来处理的。service标签是声明服务发布的,reference标签是服务消费的配置。
这里主要走读service标签的逻辑。
spring自定义标签的解析与bean注册不是这里的重点。我们重点来看serviceBean类。这里是主要的服务发布逻辑。
先看ServiceBean的声明。public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
类ServiceBean继承自ServiceConfig,并实现了spring的一些接口。
该类很长,通过方法的先后顺序来走读。
首先执行ApplicationContextAware
中的setApplicationContext
方法。
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
SpringExtensionFactory.addApplicationContext(applicationContext);
if (applicationContext != null) {
SPRING_CONTEXT = applicationContext;
try {
Method method = applicationContext.getClass().getMethod("addApplicationListener", new Class<?>[]{ApplicationListener.class}); // backward compatibility to spring 2.0.1
method.invoke(applicationContext, new Object[]{this});
supportedApplicationListener = true;
} catch (Throwable t) {
if (applicationContext instanceof AbstractApplicationContext) {
try {
Method method = AbstractApplicationContext.class.getDeclaredMethod("addListener", new Class<?>[]{ApplicationListener.class}); // backward compatibility to spring 2.0.1
if (!method.isAccessible()) {
method.setAccessible(true);
}
method.invoke(applicationContext, new Object[]{this});
supportedApplicationListener = true;
} catch (Throwable t2) {
}
}
}
}
}
这里主要注册了一个ContextRefreshedEvent
事件监听。
接下来执行afterPropertiesSet
方法。这个方法比较长,但是逻辑比较简单,就是注入ServiceBean的一些属性。如application,provider,protocol,registries等。这里就不展示相关代码了。
最后,监听到了事件,具体事件是怎么触发的,可以看spring的时间机制。
这里服务发布逻辑就主要在这个事件处理里。
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
//发布服务
export();
}
}
export方法逻辑:
public synchronized void export() {
if (delay != null && delay > 0) {
// 若是延迟发布,则放入线程池中,等待配置时间后,调doExport方法去发布。
delayExportExecutor.schedule(new Runnable() {
@Override
public void run() {
doExport();
}
}, delay, TimeUnit.MILLISECONDS);
} else {
// 非延迟发布,则直接发布
doExport();
}
}
下面看下doExport
逻辑。该方法的代码也很长,主要还是收集发布服务所需的信息及校验配置。代码如下:
protected synchronized void doExport() {
//是否取消发布
if (unexported) {
throw new IllegalStateException("Already unexported!");
}
//是否已发布,已发布返回,未发布,继续逻辑
if (exported) {
return;
}
exported = true;
// .......都是一些校验相关的代码,这里省略
//是否是泛化实现
if (ref instanceof GenericService) {
interfaceClass = GenericService.class;
if (StringUtils.isEmpty(generic)) {
generic = Boolean.TRUE.toString();
}
} else {
try {
interfaceClass = Class.forName(interfaceName, true, Thread.currentThread()
.getContextClassLoader());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(e.getMessage(), e);
}
//发布逻辑。。。
doExportUrls();
ProviderModel providerModel = new ProviderModel(getUniqueServiceName(), this, ref);
ApplicationModel.initProviderModel(getUniqueServiceName(), providerModel);
}
doExportUrls
方法的代码:
private void doExportUrls() {
//组装注册中心的URL
List<URL> registryURLs = loadRegistries(true);
// 循环的发布每种协议的服务
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol
这段代码里主要是通过SPI机制获取Invoker并获取不同协议的代理类,export服务。
主要功能代码如下:
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
proxyFactory
和protocol
都是通过spi机制生成。其代理类代码如下
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getProxy(arg0);
}
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null)
throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
proxyFactory默认返回的是JavassistProxyFactory。这里protocol返回的是RegistryProtocol。
看registryProtocol源码。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
URL registryUrl = getRegistryUrl(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
//to judge to delay publish whether or not
boolean register = registedProviderUrl.getParameter("register", true);
ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registedProviderUrl);
if (register) {
// 向注册中心注册服务
register(registryUrl, registedProviderUrl);
ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
}
// Subscribe the override data
// FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registedProviderUrl);
}
主要的两个操作doLocalExport
和register
;
先看doLocalExport
代码:
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
// 这里是主要的操作。调用dubboProtocol的export方法,发布服务。
exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//初始化socket,等待客户端连接。(默认是netty方式),到这里,默认端口20880就可以接受客户端请求了。
openServer(url);
optimizeSerialization(url);
return exporter;
}
再回到register
方法。register主要是调用了FailbackRegistry的注册方法。最后调用FailbackRegistry子类的doRegister方法。
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
这里正式的向zk上注册了服务了。服务消费者可以在zk上获取服务,从而进行连接20880端口进行远程通信。
至此,dubbo的服务发布相关的基本流程代码就走完了。
接下来会通过服务的消费来走读dubbo的负载均衡,过滤器,编解码,监听器等功能代码。