本文开始分析Netty的源码,由于目标是自顶向下分析,在这一节将分析Netty是如何构建起如上图所示的整体框架。首先将使用一个示例展示怎么使用Bootstarp构建服务端应用,然后将深入源码了解底层机制和原理。
1.使用示例
首先使用Netty构造如图所示的框架,源码如下:
// 指定mainReactor
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 指定subReactor
EventLoopGroup workerGroup = new NioEventLoopGroup();
// 用户自定义的ThreadPool
EventExecutorGroup threadPool = new ThreadPool();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100) // 设置TCP参数
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(threadPool,
new DecoderHandler(), // 解码处理器
new ComputeHandler()); // 计算处理器
new EncoderHandler(), // 编码处理器
}
});
// 绑定到本地端口等待客户端连接
ChannelFuture f = b.bind(PORT).sync();
// 等待接受客户端连接的Channel被关闭
f.channel().closeFuture().sync();
} finally {
// 关闭两个线程组
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
threadPool.shutdown();
}
逐行分析代码,EventLoopGroup
是Netty实现的线程池接口,两个线程池:bossGroup和workerGroup分别对应mainReactor和subReactor,其中boss专门用于接受客户端连接,worker也就是常说的IO线程专门用于处理IO事件。IO事件包括两类,一类如服务端接收到客户端数据的Read事件,另一类如用户线程主动向客户端发送数据的Write事件。在4.0版本中,用户自定义的业务线程池须实现EventExecutorGroup
接口,4.1版本则可以直接使用JAVA自带的线程池。
为了帮助用户快速构建基于Netty的服务,Netty提供了两个启动器ServerBootstrap
和Bootstrap
,分别用于启动服务器端和客户端程序。group(EventLoopGroup...)
方法用于指定一个或两个Reactor,本例中指定为两个。channel(Channel)
方法本质用来指定一个Channel工厂,本例中该工厂生产服务端用于accept客户端连接的Channel,将默认使用Channel的无参构造方法。如果用户需要自定义有参数的Channel,可自定义所需的工厂实现。option(Key, Value)
用于指定TCP相关的参数以及一些Netty自定义的参数。childHandler()
用于指定subReactor中的处理器,类似的,handler()
用于指定mainReactor的处理器,只是默认情况下mainReactor中已经添加了acceptor处理器,所以无需再指定。需要注意的是:这两个方法并不能累积调用而达到增加多个处理器的目的,所以引入了 ChannelInitializer
,它是一个特殊的Handler,功能是初始化多个Handler,如本例中的DecoderHandler
,ComputeHandler
,EncoderHandler
。完成初始化工作后,ChannelInitializer
会从Handler链中删除。至此,如图所示的框架已经构建完毕。
最后临门一脚,bind(int)
方法将服务端Channel绑定到本地端口,成功后将accept客户端的连接,从而是整个框架运行起来。使用sync()
方法是由于Netty中的事件都是异步的,所以需要同步等待结果。准确的说,这个方法在这里使用是有问题的,sync()
完成后只能表明绑定事件运行完毕,但并不能说明绑定成功,虽然失败的可能性微乎其微。
f.channel().closeFuture().sync()
方法仅仅是为了使当前main线程阻塞而不立即执行之后的各种shutdown()
方法,其语义是等到服务端接受客户端连接的Channel被关闭时,才执行后面代码的操作。在实际应用中,这样的代码并不实用,我们可能需要接受诸如kill
命令后,优雅关闭线程组。
一些情况下,我们并不使用如图所示的结构,比如当业务逻辑都很简单,也就是如图所示的decode,compute,encode能在短时间完成(数十毫秒或更少),那么可以不使用业务线程池。代码也很简单,只需要改动ChannelInitializer
即可:
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DecoderHandler()); // 解码处理器
p.addLast(new ComputeHandler()); // 计算处理器
p.addLast(new EncoderHandler()); // 编码处理器
}
});
事实上这是Netty的默认方法,也就是说不在addLast(Handler)
方法中指定线程池,那么将使用默认的subReacor即woker线程池也即IO线程池执行处理器中的业务逻辑代码。
又比如,如开始的例子只让IO线程池处理read,write等IO事件会觉得有点大材小用,于是将decode和encode交给IO线程处理,如果此时的compute查询需要数据库中的数据,那么代码可改动为如下:
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new DecoderHandler()); // 解码处理器
p.addLast(new EncoderHandler()); // 编码处理器
p.addLast(threadPool, new ComputeWithSqlHandler()); // 附带SQL查询的计算
}
});
最佳实践
简单快速的业务逻辑可由IO线程池执行,复杂耗时的业务(如查询数据库,取得网络连接等)使用新的业务逻辑线程池执行。
本文介绍了Bootstrap的使用,如果还想知道背后的原理,可移步后续文章:自顶向下深入分析Netty(三)--Bootstrap源码分析。