1.概述
RPC
作为分布式系统中不可或缺的中间件,在业界已经具有相当成熟的技术实现,其中Dubbo
应用得特别广泛,本文将对Dubbo
服务暴露的流程进行介绍。在正式进入Dubbo
原理探究之前,需要先弄清楚RPC
的基本模型:
consumer
代表服务调用方,provider
代表服务提供方,registry
代表注册中心。当服务提供方启动时会将自己的信息(服务ip,port等)记录在注册中心,这样在调用方调用的时候,会先从注册中心获取到提供方的基本信息,然后发送网络请求给provider
完成调用;同时consumer
在启动的时候,会向注册中心订阅消息,这样就能在provider
发生变更的时候获取到最新的信息,保证请求路由到正确的provider
。
2.Dubbo
服务暴露概览
Dubbo
服务暴露就是指provider
向registry
注册的过程,以zookeeper作为注册中心,netty作为通讯框架,本文基于2.6.x
版本代码分析。使用Dubbo
官方提供的demo运行,dubbo-demo-provider
配置文件如下:
<!-- provider's application name, used for tracing dependency relationship -->
<dubbo:application name="demo-provider"/>
<!-- use multicast registry center to export service -->
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<!-- use dubbo protocol to export service on port 20880 -->
<dubbo:protocol name="dubbo" port="20880"/>
<!-- service implementation, as same as regular local bean -->
<bean id="demoService" class="com.alibaba.dubbo.demo.provider.DemoServiceImpl"/>
<!-- declare the service interface to be exported -->
<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
服务暴露的整个过程大致可以分为四个阶段:
- 创建invoker对象并对其加工
- 开启netty服务,用于后续接收消费方发起的网络请求
- 获取zookeeper连接并将服务注册成为zookeeper上的节点(服务引用的时候通过获取节点则能够找到provider然后建立网络连接)
- 返回
Exporter
对象并放入ServiceConfig
中的exporters
变量中存储
3.创建invoker对象并对其加工
在ServiceConfig
类中会先调用exportLocal
完成本地暴露,然后调用proxyFactory.getInvoker
获取invoker对象,先关注下该类中的静态变量proxyFactory
,它会调用ExtensionLoader
中的getAdaptiveExtension
方法完成初始化,该类是Dubbo
框架自身实现的一种SPI
机制,能够根据配置文件灵活选择接口的具体实现。
package com.alibaba.dubbo.config;
/**
* ServiceConfig
*
* @export
*/
public class ServiceConfig<T> extends AbstractServiceConfig {
private static final long serialVersionUID = 3033787999037024738L;
/**根据dubbo spi机制加载**/
private static final Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
/**根据dubbo spi机制加载**/
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
private final List<Exporter<?>> exporters = new ArrayList<Exporter<?>>();
private Class<?> interfaceClass;
// don't export when none is configured
if (!Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
// export to local if the config is not remote (export to remote only when config is remote)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
/**本地暴露**/
exportLocal(url);
}
// export to remote if the config is not local (export to local only when config is local)
if (!Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope)) {
if (registryURLs != null && !registryURLs.isEmpty()) {
for (URL registryURL : registryURLs) {
/**获取invoker对象**/
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this)
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
}
}
}
}
proxyFactory
在初始化的时候会被赋值为ProxyFactory$Adaptive
,该类是通过字节码增强实现的
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adaptive implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
/**获取url中的proxy参数,默认为javassist**/
String extName = url.getParameter("proxy", "javassist");
if (extName == null)
throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
/**根据proxy参数获取对应的ProxyFactory接口的实现**/
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
}
Dubbo
针对每一个需要根据配置决定具体实现的接口都会在运行时生成以$Adaptive
后缀结尾的类,目的是能够根据配置文件,在运行时灵活地选择接口的具体实现,例如:String extName = url.getParameter("proxy", "javassist")
这行代码,如果url
中的proxy
属性为空即没有在配置文件中指定,则默认使用javassist
作为extName
的值,加载出JavassistProxyFactory
类作为ProxyFactory
接口的缺省实现。因为这里配置<dubbo:service proxy="jdk" interface="com.alibaba.dubbo.demo.DemoService" ref="demoService"/>
,所以会加载JdkProxyFactory
。但通过debug发现,com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName)
返回的是StubProxyFactoryWrapper
,原因是ProxyFactory
接口默认有一个包装类实现,无论是JavassistProxyFactory
还是JdkProxyFactory
都是在StubProxyFactoryWrapper
中通过构造函数完成proxyFactory
属性的设置,当ProxyFactory$Adaptive
中执行extension.getInvoker(arg0, arg1, arg2)
的时候,先会调用到StubProxyFactoryWrapper
的getInvoker
方法,该方法的实现为proxyFactory.getInvoker(proxy, type, url)
,所以最后会调用到JdkProxyFactory
的getInvoker
方法。对于这种有包装类实现的接口,$Adaptive
会优先加载出包装类,而根据配置所对应的具体实现则是通过构造函数的形式作为包装类的属性被注入,在调用的时候先调用包装类从而间接调用到配置所对应的实现,这里使用了装饰器模式,可以在调用之间增加额外的处理。整个流程可以归纳如下:
-
ProxyFactory$Adaptive
获取extension
- 初始化
StubProxyFactoryWrapper
,构造函数设置proxyFactory
属性为JdkProxyFactory
public class StubProxyFactoryWrapper implements ProxyFactory {
private final ProxyFactory proxyFactory;
/**构造函数中设置proxyFactory值为配置所对应的接口实现**/
public StubProxyFactoryWrapper(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException {
return proxyFactory.getInvoker(proxy, type, url);
}
}
-
ProxyFactory$Adaptive
调用extension.getInvoker
等同于StubProxyFactoryWrapper
调用getInvoker
对应时序图如下所示:
返回invoker
对象后,进入到protocol.export
方法,protocol
初始化为Protocol$Adaptive
,Protocol
接口有两个包装类ProtocolListenerWrapper
与ProtocolFilterWrapper
,顾名思义监听器与过滤器,该过程会先调用ProtocolListenerWrapper
,该类的protocol
属性在初始化的时候会被设置为ProtocolFilterWrapper
,在ServiceConfig
中调用protocol.export
时会直接进入到ProtocolListenerWrapper
中的protocol.export(invoker)
方法,接着进入ProtocolFilterWrapper
中的protocol.export(invoker)
方法,ProtocolFilterWrapper
初始化时会设置protocol
属性为RegistryProtocol
,因此该过程最终会调用到RegistryProtocol
的export
方法
/**
* ListenerProtocol
*/
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
/**ServiceConfig调用protocol.export直接进入到该方法**/
return protocol.export(invoker);
}
/**RegistryProtocol调用protocol.export时候进入**/
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
}
}
根据ProtocolFilterWrapper
类的命名,可以看出它拥有与Spring Filter类似的功能,本质上是一个过滤器
/**
* ListenerProtocol
*/
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
/**ServiceConfig调用protocol.export直接进入到该方法**/
return protocol.export(invoker);
}
/**RegistryProtocol调用protocol.export时候进入**/
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
}
因为设置<dubbo:protocol name="dubbo" port="20880"/>
,RegistryProtocol
中的protocol
会被设置为DubboRegistry
,当调用protocol.export(invokerDelegete)
的时候,会再次进入到ProtocolListenerWrapper
包装类中,这次会执行new ListenerExporterWrapper<T>
/**
* ListenerExporter
*/
public class ListenerExporterWrapper<T> implements Exporter<T> {
private static final Logger logger = LoggerFactory.getLogger(ListenerExporterWrapper.class);
private final Exporter<T> exporter;
private final List<ExporterListener> listeners;
public ListenerExporterWrapper(Exporter<T> exporter, List<ExporterListener> listeners) {
if (exporter == null) {
throw new IllegalArgumentException("exporter == null");
}
this.exporter = exporter;
this.listeners = listeners;
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
/**调用listener的exported方法,作为hook函数**/
listener.exported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
@Override
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
@Override
public void unexport() {
try {
exporter.unexport();
} finally {
if (listeners != null && !listeners.isEmpty()) {
RuntimeException exception = null;
for (ExporterListener listener : listeners) {
if (listener != null) {
try {
listener.unexported(this);
} catch (RuntimeException t) {
logger.error(t.getMessage(), t);
exception = t;
}
}
}
if (exception != null) {
throw exception;
}
}
}
}
}
重点关注ListenerExporterWrapper
的构造函数,该函数主要完成两件事,一是赋值exporter
变量,将外部传入的Exporter
对象保存,二是遍历外部传入的ExporterListener
列表并调用其exported
方法。此处为扩展点,Dubbo
对于ExporterListener
接口只给出了ExporterListenerAdapter
这一实现且exported
方法实现为空,开发者可以自己实现该方法以达到扩展的目的,当然也可以自己实现ExporterListener
接口,依据Dubbo SPI机制加载执行自定义的业务逻辑。除构造函数以外,unexport
也拥有类似的功能,当调用unexport
的时候会遍历之前保存的listeners
并调用其unexport
方法。
继续回到ProtocolFilterWrapper
,当调用return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER))
时,会触发buildInvokerChain
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
@Override
public Class<T> getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
/**包装invoker对象**/
return filter.invoke(next, invocation);
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
遍历通过SPI加载出来的Filter
来包装invoker
对象,当invoker
调用invoke
方法时,就会触发Filter
中的实现逻辑,此处使用责任链设计模式,当然开发者也可以实现自己的Filter
来对此处逻辑进行扩展。整个流程时序图如下所示:
4.开启netty服务
RegistryProtocol
中调用exporter = new ExporterChangeableWrapper<T>((Exporter<T>) protocol.export(invokerDelegete), originInvoker)
进入到DubboProtocol
类中的export
方法,
/**
* dubbo protocol support.
*/
public class DubboProtocol extends AbstractProtocol {
public static final String NAME = "dubbo";
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
/**获取key,com.alibaba.dubbo.demo.DemoService:20880**/
String key = serviceKey(url);
/**创建exporter对象**/
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
/**保存exporter对象**/
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
/**创建netty server**/
openServer(url);
/**初始化序列化器**/
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
/**ip+port**/
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(Constants.IS_SERVER_KEY, true);
if (isServer) {
/**从缓存中获取server对象**/
ExchangeServer server = serverMap.get(key);
if (server == null) {
/**缓存中没有则创建server**/
serverMap.put(key, createServer(url));
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
// send readonly event when server closes, it's enabled by default
url = url.addParameterIfAbsent(Constants.CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString());
// enable heartbeat by default
url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT));
String str = url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str))
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
/**设置传输协议为dubbo**/
url = url.addParameter(Constants.CODEC_KEY, DubboCodec.NAME);
ExchangeServer server;
try {
/**netty Server初始化**/
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(Constants.CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
}
首先会创建DubboExporter
对象,并把该对象保存到exporterMap
中,其中类:端口
会被作为key,例如:com.alibaba.dubbo.demo.DemoService:20880,openServer
中是获取netty Server的具体逻辑,serverMap
中会存放创建好的server
,key是ip:port
,如果在serverMap
中没有存储server
,则调用createServer
方法创建。
5.获取zookeeper连接并将服务注册成为zookeeper上的节点
完成Netty Server启动后,通过RegistryProtocol
中的getRegistry
方法创建ZookeeperRegistry
对象,该对象的构造函数会进行zookeeper的连接。
private Registry getRegistry(final Invoker<?> originInvoker) {
URL registryUrl = getRegistryUrl(originInvoker);
return registryFactory.getRegistry(registryUrl);
}
通过返回的ZookeeperRegistry
对象,调用subscribe
方法便将服务注册成为了zookeeper的节点,/dubbo/com.alibaba.dubbo.demo.DemoService/providers
,值为dubbo://ip:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bean.name=com.alibaba.dubbo.demo.DemoService&dubbo=2.0.2&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1644&proxy=jdk&side=provider×tamp=1603026176171
,里面包含服务的ip、端口、接口名、接口中包含的方法、代理模式等等。
6.总结
整个服务暴露的过程就是服务向注册中心注册的过程,除了基本的实现以外,Dubbo
在该过程中还提供了Listener
和Filter
这两个扩展点方便开发者进行定制化的实现。