最近一直在看Netty方面的资料,发现在某宝上面卖得最好的关于Netty的书是《Netty实战》,恕我直言,这本翻译自《Netty in Action》的中文版对于想入门的同学来说真的不太好。看完之后都不知道在讲些什么,让人摸不着头脑。还是建议大家去看英文原版。我个人推荐通过几个简单的Demo,模仿别人的代码多造“轮子”,分析Netty的源码,最后结合《Netty精髓》,才能更好地入门,当然如果想精通还是要运用到实际项目中。
什么是Netty?为什么要用Netty?怎么安装Netty?这些问题大家可以在网上搜索到答案,这边不再多说了。这篇文章主要介绍如何写一个支持Http协议的Server端并对源码进行分析。
服务端代码
public class TestServer {
public static void main(String[] args) {
//(1)创建两个NIO线程组。bossGroup负责接收客户端的连接,workerGroup负责网络读写
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
//(2) serverBootstrap 是NIO服务端的辅助启动类,可以降低服务端的开发难度
ServerBootstrap serverBootstrap = new ServerBootstrap();
//(3) group将线程组作为参数加入到serverBootstrap,设置channel类似JDK中的ServerSocketChannel类
// 绑定channel初始化类TestServerInitializer
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).
childHandler(new TestServerInitializer());
try {
//(4) server启动类配置完成之后 调用bind绑定监听端口,sync同步等待绑定操作完成
// 绑定成功之后返回ChannelFuture类似JDK中的Future,用于异步操作回调通知
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
//(5) 等待服务端链路关闭之后main函数退出
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//(6) 优雅的关闭两个线程池,释放线程资源
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
上面是一个简单的Server代码,每行代码都有注释,如果不理解这些注释,没关系下面对代码中的主要类源码进行分析。
EventLoopGroup源码分析
EventLoopGroup 主要有两个作用:注册channel 以及 执行Runnable任务。
EventLoopGroup的源代码如下
下面是EventLoopGroup的整体类图,可以看到NioEventLoopGroup继承自MultithreadEventLoopGroup,而MultithreadEventLoopGroup实现了EventLoopGroup。
继续看MultithreadEventLoopGroup源码发现他继承MultithreadEventExecutorGroup。该类的构造方法如下
可以看到MultithreadEventLoopGroup 主要负责封装线程数组,子类中newChild
负责具体的初始化。
再看一下子类NioEventLoopGroup中的newChild方法如下。
经过上述EventLoopGroup源代码分析我们可以得下面的EventLoopGroup与EventLoop的关系。
EventLoop源码分析
EventLoop负责监听Channel读写事件以及注册Channel。我们知道Linux下有三种事件监控方法:select、poll、epoll。对应到Netty中有两种类:NioEventLoop 和 EpollEventLoop。前者使用的是JDK Selector接口实现Channel事件检测(poll方式),而后者是Netty自己实现epoll对Channel事件检测。
下面重点介绍介绍一个NioEventLoop。
NioEventLoop继承自SingleThreadEventExecutor。SingleThreadEventExecutor有如下代码:
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
可以看到SingleThreadEventExecutor包含了一个线程和一个队列。NioEventLoop 既要执行Selector 带过来IO事件,还要执行队列中的非IO事件。
下面看一下NioEventLoop中线程执行逻辑。
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
1、 首先switch
判断是否有需要执行select
过程,select过程之后讨论。
2、根据ioRatio
(比较IO任务的时间占比)来执行IO任务和非IO任务。如果ioRatio=100,表示执行全部的IO任务。默认ioRatio=50,表示一半时间执行IO任务,另外一半时间执行非IO任务。如何控制非IO任务的时间呢?
在runAllTasks
函数中每执行64个任务会判断是否超时。
接着我们再看select
函数做了什么
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 1.定时任务截至事时间快到了,中断本次轮询
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
//2.轮询过程中发现有任务加入,中断本次轮询
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 3.阻塞式select操作
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
// 4.解决jdk的nio bug
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
}
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}
1、计算select过程可以执行到的时间点selectDeadLineNanos
2、把时间点转化成毫秒,如果时间很短,即timeoutMillis<=0,并且是第一次执行 selectCnt==0,那么执行selectNow,该函数返回已经准备好IO操作的select key 集合
3、如果有任务需要执行,那么就跳出for 循环
4、执行selector.select(timeoutMillis)
,如果有事件那么跳出for 循环,如果没有事件发生,那么会进入下一个循环直到timeoutMillis小于0,跳出循环。 这一步同时会对selectCnt
变量加1操作,SELECTOR_AUTO_REBUILD_THRESHOLD
默认为512,当selectCnt超过这个阀值时就会,重新建立新的Selector,把原来的Channel迁移到新的Selector 上。为什么要设立一个selectCnt变量呢?因为JDK nio可能会有Bug,具体可以参照http://bugs.java.com/bugdatabase/view_bug.do?bug_id=6595055,这个bug会导致select 空轮询,在执行select操作时如果没有事件发生,直接返回,没有等待timeoutMillis, 这是一次空轮询,selectCnt加1。
下图是NioEventLoop的工作示意图。
bind过程分析
ServerBootstrap 继承自AbstractBootstrap,AbstractBootstrap
的bind过程如下图。
其中doBind的代码如下:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
1、initAndRegister返回ChannelFuture实例regFuture,以此来判断initAndRegister 是否执行完毕。
- 如果执行完毕那么调用doBind0进行socket绑定
- 否则添加listener监听器,当initAndRegister完成时,再调用doBind0进行绑定
2、initAndRegister函数会创建NioServerSocketChannel,主要做一些初始化的工作,包括pipeline添加handler,把channel注册到seletor上。
3、回到ServerBootstrap类,ServerBootstrap的init(Channel channel)方法,会添加handler到channel的pipeline中该handler就是ServerBootstrapAcceptor。代码如下:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
4、如何把channel注册到selector上呢?EventLoopGroup bossGroup会选择内部的一个EventLoop来执行实际的注册行为然后开始将该Channel注册到上述EventLoopGroup bossGroup。注册成功后会执行上述的initChannel方法。
bind过程结束,当Selector检测到NioServerSocketChannel有新的连接事件时,就会交给NioServerSocketChannel的ChannelPipeline中的ServerBootstrapAcceptor处理。
ServerBootstrapAcceptor做两件事:
- 为新的Channel的ChannelPipeline配置我们上述代码中的childHandler指定的ChannelHandler
- 将新的Channel注册到了上述EventLoopGroup workerGroup中
sync介绍
bind方法是异步方法,返回ChannelFuture,sync可以等待该异步过程结束。每一个ChannelFuture都是和一个Channel绑定的,而每一个ChannelFuture又有一个closeFuture方法,然后调用sync方法等待ChannelFuture的结束,只有当Channel 关闭,closeFuture才会被调用,一般正常情况下不会被调用,所以主线程会一直阻塞在sync方法上。
注意点
Reactor模型中可以通过多个Acceptor线程加快accept操作,我们是否可以增大bossGroup线程数来加快accept呢?答案是否,没有效果因为bossGroup中多线程是为了绑定多个端口,ServerSocketChannel创建后会绑定到bossGroup中的一个Eventloop, 由他来负责accept操作。
Tomocat 6开始也支持NIO模型,可以开启多个Acceptor线程,可以参照这篇文章
参考文章:
https://www.jianshu.com/p/e577803f0fb8
https://my.oschina.net/pingpangkuangmo/blog/742929