引子:
- 在 dubbo剖析:一 服务发布 中,我们讲到了
RegistryProtocol.export
过程中有一个关键步骤,即调用doLocalExport(final Invoker<T> originInvoker)
生成Exporter
,其最终调用了DubboProtocol
的export()
方法。 -
DubboProtocol
的export()
方法:完成了 "启动并监听网络服务" 的工作,具体是通过HeaderExchanger
的bind()
方法创建了一个HeaderExchangerServer
实现的。 - 本章我们就来介绍
HeaderExchangerServer
的 设计架构 和 功能实现。
一、入口流程
服务发布流程中,RegistryProtocol
会调用到DubboProtocol
的export()
方法,用于完成网络服务的启动和监听。
1.1 入口代码
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
//step1 export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//...省略部分代码...
//step2 创建ExchangeServer
openServer(url);
return exporter;
}
private void openServer(URL url) {
String key = url.getAddress();
if (isServer) {
ExchangeServer server = serverMap.get(key);
if (server == null) {
//调用createServer
serverMap.put(key, createServer(url));
} else {
server.reset(url);
}
}
}
private ExchangeServer createServer(URL url) {
//...省略部分代码,参数解析之类的...
ExchangeServer server;
try {
//关键代码,使用HeaderExchanger.bind创建HeaderExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
return server;
}
1.2 流程图解
DubboProtocol.export()流程图
第一步:生成Exporter:
- 将
AbstractProxyInvoker
作为构造参数,new出一个DubboExporter; -
DubboExporter
维护了AbstractProxyInvoker
的生命周期; - 服务url、
DubboExporter
放入DubboProtocol
的缓存map
供消息接收处理使用;
第二步:创建ExchangeServer:
-
AbstractProxyInvoker
中获取服务url; - 服务url作为入参、同时使用
DubboProtocol
包含的网络事件处理器requestHandler,调用HeaderExchanger
的bind()
方法创建ExchangeServer
; -
HeaderExchanger
中又依赖了NettyTransporter
,使用其bind()
方法创建NettyServer
; -
NettyServer
是启动网络服务的核心类。HeaderExchangerServer
使用NettyServer
作为构造参数,扩展了它的功能;
二、server端网络层结构
server端网络层类图关系说明
2.1 网络传输层
-
EndPoint
为网络端点的抽象接口,定义了获取网络端点地址、连接、及最原始的发送消息的方法。 -
ChannelHandler
为网络事件处理器接口,定义了Server端监听到各种类型的网络事件时的处理方法(connected、disconnected、sent、received、caught),Netty中也有类似定义。 -
Server
为网络服务端的抽象接口,继承了EndPoint
的功能,并扩展了获取与服务端建连的通道Channel
的方法。 -
Transporter
为网络传输层的抽象接口,核心作用就是提供了创建Server
和Client
两个核心接口实现类的方法。
2.2 信息交换层
-
ExchangeHandler
,在ChannelHandler
接口基础上,添加了 响应请求 的方法。 -
ExchangeServer
,在Server
接口基础上,将获取Channel
的方法扩展为获取ExchangeChannel
的方法。 -
Exchanger
为信息交换层的抽象接口,核心作用就是提供了创建ExchangeServer
和ExchangeClient
两个核心接口实现类的方法。
2.3 网络通道Channel
网络通道接口定义
-
Channel
为网络通道的抽象接口,继承了EndPoint
的功能,并扩展了绑定获取属性和获取通道对端地址的方法。 -
ExchangeChannel
,在Channel
接口的基础上,扩展了请求响应模式的功能,并能获取绑定在通道上的网络事件监听器。
三、HeaderExchangeServer & NettyServer实现详解
Server实现层次结构图
3.1 网络层
AbstractPeer
类(网络事件处理器和网络节点的通用实现):
- 定义了属性
ChannelHandler
和URL
,作为构造方法入参注入; - 实现了
ChannelHandler
和EndPoint
接口,ChannelHandler
接口的相关方法依赖其channelHandler
属性完成实现;
AbstractEndPoint
类(加入编解码功能):
- 定义了构造方法,入参包含属性
ChannelHandler
和URL
; - 定义了属性
Codec2
,用于编解码,通过SPI动态注入; - 定义了timeout/connectTimeout相关超时属性,由
URL
解析赋值; - 对外暴露了获取
Codec2
和超时相关属性的方法,供上层依赖调用;
AbstractServer
类(网络服务端通用抽象,抽象出open
、close
、send
的公共流程,并提供了doOpen
和doClose
的实现扩展):
- 定义了构造方法,入参包含属性
ChannelHandler
和URL
,并触发doOpen()
扩展; - 重写
EndPoint
接口的close()
方法,触发doClose()
扩展; - 实现
EndPoint
接口的send()
方法,遍历并调用Channel.send()
;
NettyServer
类(网络服务端Netty实现类,实现了doOpen
、doClose
、getChannels
三个具体扩展):
- 实现了
doOpen()
扩展方法,使用Netty的ServerBootstrap
完成服务启动监听,其网络世界处理器为this
包装成的NettyHandler
; - 实现了
doClose()
扩展方法,调用Netty的boostrap
、channel
完成网络资源释放; - 实现了
getChannels()
方法,channels的值由网络事件处理器在connect、disconnect事件触发时变动维护;
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
bootstrap = new ServerBootstrap(channelFactory);
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
channels = nettyHandler.getChannels();
// https://issues.jboss.org/browse/NETTY-365
// https://issues.jboss.org/browse/NETTY-379
// final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ChannelPipeline pipeline = Channels.pipeline();
/*int idleTimeout = getIdleTimeout();
if (idleTimeout > 10000) {
pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
}*/
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
// bind
channel = bootstrap.bind(getBindAddress());
}
@Override
protected void doClose() throws Throwable {
try {
if (channel != null) {
// unbind.
channel.close();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
Collection<com.alibaba.dubbo.remoting.Channel> channels = getChannels();
if (channels != null && channels.size() > 0) {
for (com.alibaba.dubbo.remoting.Channel channel : channels) {
try {
channel.close();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (bootstrap != null) {
// release external resource.
bootstrap.releaseExternalResources();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
try {
if (channels != null) {
channels.clear();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
3.2 交换层
HeaderExchangeServer
类(交换层服务端,将网络层的Channel
扩展为交换层的ExchangeChannel
,并加入心跳检测功能):
- 定义了构造方法,入参包含属性
Server
,用于实现服务端网络层功能; - 定义了属性
定时任务线程池scheduled
,用于执行“定时心跳收发及心跳超时监测”任务; - 定义了hearbeat/heartbeatTieout相关心跳属性,由
URL
解析赋值; - 构造方法中启动“定时心跳收发及心跳超时监测”任务,超时时“Server断连、Client断连重连”;
- 将网络层
Channel
扩展为交换层ExchangeChannel
,具体实现后续另辟章节;