Provider启动流程
通过dubbo的启动日志分析dubbo的服务发布原理
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" />
1. 暴露本地服务
[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to local registry, dubbo version: 2.0.0, current host: 169.254.23.23
2. 暴露远程服务
[DUBBO] Export dubbo service com.alibaba.dubbo.demo.DemoService to url dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&bind.ip=169.254.23.23&bind.port=20881&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&....., dubbo version: 2.0.0, current host: 169.254.23.23
[DUBBO] Register dubbo service com.alibaba.dubbo.demo.DemoService url dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&..... to registry registry://47.94.102.25:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&dubbo=2.0.0&owner=wliliam&pid=10948&qos.port=22222®istry=zookeeper×tamp=1527390445496, dubbo version: 2.0.0, current host: 169.254.23.23
3. 启动netty
[DUBBO] Start NettyServer bind /0.0.0.0:20881, export /169.254.23.23:20881, dubbo version: 2.0.0, current host: 169.254.23.23
4. 打开连接zk
INFO zookeeper.ClientCnxn: Opening socket connection to server 47.94.102.25/47.94.102.25:2181. Will not attempt to authenticate using SASL (unknown error)
5. 到zk注册
[DUBBO] Register: dubbo://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=wliliam&pid=10948&side=provider×tamp=1527390445523, dubbo version: 2.0.0, current host: 169.254.23.23
6. 监听zk的configurators节点
[DUBBO] Subscribe: provider://169.254.23.23:20881/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&loadbalance=roundrobin&methods=sayHello&owner=wliliam&pid=10948&side=provider×tamp=1527390445523, dubbo version: 2.0.0, current host: 169.254.23.23
重要概念
- Protocol
1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。 - exporter:维护invoder的生命周期。
- exchanger:信息交换层,封装请求响应模式,同步转异步。
-
transporter:网络传输层,用来抽象netty和mina的统一接口。
image.png
<dubbo:service/>标签解析
我们发布一个服务使用的是类似这样的配置
<dubbo:service interface="com.alibaba.dubbo.demo.DemoService" ref="demoService" protocol="dubbo1" registry="aliZk"/>
这个自定义schema标签也是服务发布的入口。dubbo命名空间的解析类是DubboNamespaceHandler,在这个类中可以看到给service标签注册的标签解析类为new DubboBeanDefinitionParser(ServiceBean.class, true)
。
public class DubboNamespaceHandler extends NamespaceHandlerSupport {
static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
}
public class DubboBeanDefinitionParser implements BeanDefinitionParser {
private final Class<?> beanClass;
private final boolean required;
public BeanDefinition parse(Element element, ParserContext parserContext) {
return parse(element, parserContext, beanClass, required);
}
@SuppressWarnings("unchecked")
private static BeanDefinition parse(Element element, ParserContext parserContext, Class<?> beanClass, boolean required) {
RootBeanDefinition beanDefinition = new RootBeanDefinition();
//指定生成bean的class
beanDefinition.setBeanClass(beanClass);
beanDefinition.setLazyInit(false);
....
return beanDefinition;
}
解析service标签完成后会生成一个ServiceBean.class的实例并加入spring容器,由于ServiceBean.class实现了ApplicationListener接口,所以当spring容器加载完成后会回调ServiceBean的onApplicationEvent(...)方法,这就是服务暴露的起点。
package com.alibaba.dubbo.config.spring;
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent>, BeanNameAware {
......
public void onApplicationEvent(ContextRefreshedEvent event) {
if (isDelay() && !isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
......
}
服务发布
ServiceBean继承自ServiceConfig,ServiceConfig实际就是存储该service的配置
代码执行流程
ServiceBean.onApplicationEvent
-->export
-->ServiceConfig.export
-->doExport
-->doExportUrls//里面有一个for循环,代表一个服务可以有多个通信协议,例如tcp http协议,默认是tcp协议
-->loadRegistries(true) //从dubbo.properties里面组装registry的url信息
-->doExportUrlsFor1Protocol
#com.alibaba.dubbo.config.ServiceConfig#doExportUrls
private void doExportUrls() {
//读取配置中注册中心的配置信息
List<URL> registryURLs = loadRegistries(true);
//一个服务可能有多个通信协议,例如tcp协议和http协议,默认是tcp协议
//<dubbo:protocol name="dubbo" threads="5" port="20880" dispather="execution" threadpool="cached" dispatcher="execution" id="dubbo" />
for (ProtocolConfig protocolConfig : protocols) {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
...
//配置为none不暴露
if (! Constants.SCOPE_NONE.toString().equalsIgnoreCase(scope)) {
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
//本地服务暴露
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
url = url.addParameterIfAbsent("dynamic", registryURL.getParameter("dynamic"));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(Constants.MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//远程服务暴露
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
...
}
暴露本地服务和暴露远程服务的区别是什么?
- 暴露本地服务
指暴露在一个JVM里面,不用通过调用ZK来进行远程通信。例如:在同一个服务,在自己调用自己的接口,就没必要进行网络IP连接来通信; - 暴露远程服务
指暴露给远程客户端的IP和端口号,通过网络来实现通信。
本地服务暴露
- 代码执行流程
ServiceConfig.doExportUrlsFor1Protocol
-->exportLocal
-->proxyFactory.getInvoker(ref, (Class) interfaceClass, local)
-->ProxyFactory$Adaptive.getInvoker
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension("javassist");
-->StubProxyFactoryWrapper.getInvoker(proxy, type, url);
-->proxyFactory.getInvoker(proxy, type, url);
-->JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
-->Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl.class);
-->makeWrapper(Class<?> c)
-->new AbstractProxyInvoker<T>(proxy, type, url)
-->protocol.export
-->Protocol$Adaptive.export
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension("injvm")
-->extension.export(arg0);
-->ProtocolFilterWrapper.buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER) //创建8个Filter
-->ProtocolFilterWrapper.export
-->InjvmProtocol.export
-->return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
-->目的:exporterMap.put(key, this); //key="com.alibaba.dubbo.demo.DemoService" this = InjvmExporter
代码实在太多没法一一分析>.<,其实跟着上面这个思路debug几遍流程就能很清晰的把握本地服务暴露的脉络。
#com.alibaba.dubbo.config.ServiceConfig#exportLocal
private void exportLocal(URL url) {
if (!Constants.LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
URL local = URL.valueOf(url.toFullString())
.setProtocol(Constants.LOCAL_PROTOCOL)
.setHost(NetUtils.LOCALHOST)
.setPort(0);
//都是动态编译生成的 protocol$Adaptive proxyFactory$Adaptive
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() +" to local registry");
}
}
proxyFactory:就是为了获取一个接口的代理类,例如获取一个远程接口的代理。它有2个方法,代表2个作用
- a.getInvoker:针对server端,将服务对象,如DemoServiceImpl包装成一个Invoker对象。
- b.getProxy :针对client端,创建接口的代理对象,例如DemoService的接口。
#com.alibaba.dubbo.config.ServiceConfig#proxyFactory
private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
#com.alibaba.dubbo.rpc.ProxyFactory
@SPI("javassist")
public interface ProxyFactory {
@Adaptive({Constants.PROXY_KEY})
<T> T getProxy(Invoker<T> invoker) throws RpcException;
@Adaptive({Constants.PROXY_KEY})
<T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;
}
动态编译生成的proxyFactory$Adaptive,修改log4j的配置文件查看动态编译的adaptive类
<root>
<level value="DEBUG" />
<appender-ref ref="CONSOLE" />
</root>
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory {
public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws com.alibaba.dubbo.rpc.RpcException {
if (arg2 == null) throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg2;
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
.getExtension(extName);
return extension.getInvoker(arg0, arg1, arg2);
}
public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = url.getParameter("proxy", "javassist");
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])");
com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class)
.getExtension(extName);
return extension.getProxy(arg0);
}
}
- 创建Invoker
proxyFactory.getInvoker创建Invoker的时候最终调用了JavassistProxyFactory.getInvoker(T proxy, Class<T> type, URL url)
方法(proxy实际上是服务的实现类),最终返回的是AbstractProxyInvoker,并由他实现了doInvoke方法。调用的时候会有这样一个调用链
Invoker.doInvoke>AbstractProxyInvoker.doInvoke>wrapper.invokeMethod
。
/**
* JavaassistRpcProxyFactory
*/
public class JavassistProxyFactory extends AbstractProxyFactory {
/**
* @Author pengyunlong
* @Description 服务引用生成动态代理
*/
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
/**
* @Author pengyunlong
* @Description 服务暴露不生成动态代理生成wrapper直接调用本地的方法
* @param proxy 服务实例
* @param type 服务接口
*/
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
}
- Invoker
它是一个可执行的对象,能够根据方法的名称、参数得到相应的执行结果。它里面有一个很重要的方法 Result invoke(Invocation invocation),
Invocation是包含了需要执行的方法和参数等重要信息,目前它只有2个实现类RpcInvocation、MockInvocation。它有3种类型的Invoker:
- 本地执行类的Invoker
server端:要执行 demoService.sayHello,就通过InjvmExporter来进行反射执行demoService.sayHello就可以了。 - 远程通信类的Invoker
client端:要执行 demoService.sayHello,它封装了DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client. - 多个远程通信执行类的Invoker聚合成集群版的Invoker
client端:要执行 demoService.sayHello,就要通过AbstractClusterInvoker来进行负载均衡,DubboInvoker进行远程通信,发送要执行的接口给server端。
server端:采用了AbstractProxyInvoker执行了DemoServiceImpl.sayHello,然后将执行结果返回发送给client.
- Wrapper wrapper = Wrapper.getWrapper
也许大家跟我一样会很好奇这个Wrapper wrapper = Wrapper.getWrapper(com.alibaba.dubbo.demo.provider.DemoServiceImpl)
得到的wrapper 到底是什么?它又是怎么实现本地方法调用的?
下面将动态生成的Wrapper导出并反编译才发现了真相:实际上就是依次判断接口的各个方法名和参数中传入的方法名是否匹配,如果匹配就直接调用实例的该方法。其实它类似spring的BeanWrapper,它就是包装了一个接口或一个类,可以通过wrapper对实例对象进行赋值 取值以及制定方法的调用。
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package com.alibaba.dubbo.common.bytecode;
public class Wrapper0 extends Wrapper implements DC {
......
public Object invokeMethod(Object var1, String var2, Class[] var3, Object[] var4) throws InvocationTargetException {
DemoService var5;
try {
var5 = (DemoService)var1;
} catch (Throwable var8) {
throw new IllegalArgumentException(var8);
}
try {
if ("sayHello".equals(var2) && var3.length == 1) {
return var5.sayHello((String)var4[0]);
}
} catch (Throwable var9) {
throw new InvocationTargetException(var9);
}
throw new NoSuchMethodException("Not found method \"" + var2 + "\" in class com.alibaba.dubbo.demo.DemoService.");
}
......
}
- 使用injvm暴露服务
package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}
public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );
if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)
ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);
return extension.export(arg0);
}
public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {......
}
}
- 过滤器链
extension.export(arg0);
ProtocolFilterWrapper包装了,所以被在服务暴露的时候还给Invoker包装了一个过滤器链,总共应该是8个过滤器;在方法调用的时候会依次调用这个链上所有的filter.invoke
#com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() {
.....
public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
}
......
};
}
}
return last;
}
- 暴露完成加入缓存
#com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
//本地暴露
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
#com.alibaba.dubbo.rpc.protocol.injvm.InjvmExporter
InjvmExporter(Invoker<T> invoker, String key, Map<String, Exporter<?>> exporterMap){
super(invoker);
this.key = key;
this.exporterMap = exporterMap;
//key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter
exporterMap.put(key, this);
}
实际是将key=com.alibaba.dubbo.demo.DemoService, this=InjvmExporter存储在com.alibaba.dubbo.rpc.protocol.AbstractProtocol#exporterMap
里面
同时com.alibaba.dubbo.config.ServiceConfig#exporters
也会存储一份
3. 远程服务暴露
ServiceConfig.doExportUrlsFor1Protocol
-->proxyFactory.getInvoker //执行过程和本地暴露一样
-->protocol.export(wrapperInvoker)
-->Protocol$Adaptive.export(Invoker arg0)
-->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("registry")
-->extension.export(arg0);
-->ProtocolFilterWrapper.export
-->ProtocolListenerWrapper.export //没有添加过滤器与监听器
-->RegistryProtocol.export
-->doLocalExport(originInvoker)
-->getCacheKey(originInvoker)//读取缓存【key】
-->protocol.export(invokerDelegete)
-->Protocol$Adaptive.export
-->ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("dubbo")
-->extension.export(arg0)
-->ProtocolFilterWrapper.export
-->buildInvokerChain //创建8个filter
-->ProtocolListenerWrapper.export
1.netty服务暴露的开始---- -->DubboProtocol.export
-->serviceKey(url)//组装key=com.alibaba.dubbo.demo.DemoService:20881
-->目的:exporterMap.put(key, this); //key="com.alibaba.dubbo.demo.DemoService:20881" this = DubboExporter 本地暴露只有接口名远程暴露有接口名和端口号
-->openServer(url)//打开服务
-->createServer
2.信息交换层的开始--- -->Exchangers.bind(url, requestHandler)//exchanger是一个信息交换层
-->getExchanger(url)
-->getExchanger("header")
-->ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension("header")
-->HeaderExchanger.bind
-->Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))
-->new HeaderExchangeHandler(handler) //仅仅是this.handler = handler;
-->new DecodeHandler
-->AbstractChannelHandlerDelegate//this.handler = handler;
3.网络传输层-------- -->Transporters.bind
-->getTransporter()
-->ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()
-->Transporter$Adaptive.bind
-->ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.Transporter.class).getExtension("netty")
-->NettyTransporter.bind
-->new NettyServer(url, listener)
-->AbstractPeer //this.url = url; this.handler = handler;
-->AbstractEndpoint //codec timeout=1000 connectTimeout=3000
4.打开端口暴露服务--- --doOpen()
-->设置NioServerSocketChannelFactory boss worker线程个数为3
-->设置编解码handler
-->bootstrap.bind(getBindAddress())
-->new HeaderExchangeServer
-->this.server = NettyServer
-->this.heartbeat=60000
-->heartbeatTimeout=180000
-->startHeatbeatTimer//这是一个心跳定时器,采用了线程池,如果断开就心跳重连
//配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
if (!Constants.SCOPE_REMOTE.toString().equalsIgnoreCase(scope)) {
exportLocal(url);
}
//如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露本地服务)
if (! Constants.SCOPE_LOCAL.toString().equalsIgnoreCase(scope) ){
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (registryURLs != null && registryURLs.size() > 0
&& url.getParameter("register", true)) {
for (URL registryURL : registryURLs) {
//获取调用本地实例的invoker对象
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(Constants.EXPORT_KEY, url.toFullString()));
//远程暴露
Exporter<?> exporter = protocol.export(invoker);
exporters.add(exporter);
}
}
Exporter<?> exporter = protocol.export(invoker);
本地暴露使用的扩展点是injvm而远程暴露使用了registry,但是流程差不多只是包装类暴露时不再包装过滤器链
#com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper#export
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
//远程暴露
return protocol.export(invoker);
}
//injvm等
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
}
最终进入目标类RegistryProtocol
#com.alibaba.dubbo.registry.integration.RegistryProtocol#export
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
//export invoker
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker);
//registry provider
final Registry registry = getRegistry(originInvoker);
final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
registry.register(registedProviderUrl);
// 订阅override数据
// FIXME 提供者订阅时,会影响同一JVM即暴露服务,又引用同一服务的的场景,因为subscribed以服务名为缓存的key,导致订阅信息覆盖。
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
//保证每次export都返回一个新的exporter实例
return new Exporter<T>() {
public Invoker<T> getInvoker() {
return exporter.getInvoker();
}
public void unexport() {
try {
exporter.unexport();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
registry.unregister(registedProviderUrl);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
try {
overrideListeners.remove(overrideSubscribeUrl);
registry.unsubscribe(overrideSubscribeUrl, overrideSubscribeListener);
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
};
}
@SuppressWarnings("unchecked")
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker){
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
if (exporter == null) {
final Invoker<?> invokerDelegete = new InvokerDelegete<T>(originInvoker, getProviderUrl(originInvoker));
//进入dubbo的protocol扩展
exporter = new ExporterChangeableWrapper<T>((Exporter<T>)protocol.export(invokerDelegete), originInvoker);
bounds.put(key, exporter);
}
}
}
return (ExporterChangeableWrapper<T>) exporter;
}
进入DubboProtocol.export,将export存储在exporterMap中,本地暴露key是接口名,远程暴露为接口名:端口号com.alibaba.dubbo.demo.DemoService:20880
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispaching event
Boolean isStubSupportEvent = url.getParameter(Constants.STUB_EVENT_KEY,Constants.DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(Constants.IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice){
String stubServiceMethods = url.getParameter(Constants.STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0 ){
if (logger.isWarnEnabled()){
logger.warn(new IllegalStateException("consumer [" +url.getParameter(Constants.INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
openServer(url);
return exporter;
}
最后进入NettyServer开启服务
#com.alibaba.dubbo.remoting.transport.netty.NettyServer#doOpen
@Override
protected void doOpen() throws Throwable {
//参考netty的demo
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
//线程个数workerCount=3
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
//解码
pipeline.addLast("decoder", adapter.getDecoder());
//编码
pipeline.addLast("encoder", adapter.getEncoder());
//逻辑处理类
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
总结
- Protocol
1.export:暴露远程服务(用于服务端),就是将proxyFactory.getInvoker创建的代理类 invoker对象,通过协议暴露给外部。
2.refer:引用远程服务(用于客户端), 通过proxyFactory.getProxy来创建远程的动态代理类,例如DemoService的远程动态接口。 - exporter:维护invoder的生命周期。
- exchanger:信息交换层,封装请求响应模式,同步转异步。
-
transporter:网络传输层,用来抽象netty和mina的统一接口。
image.png