在本文中主要是深入了解EventLoop,以便对netty的线程模型有更好的了解。Netty是Reactor模型的一个实现, 那么首先从Reactor的线程模型开始吧。reactor线程模型分为单线程模型、多线程模型、主从多线程模型。
多线程模型:
Reactor 的多线程模型与单线程模型的区别就是acceptor是一个单独的线程处理, 并且有一组特定的NIO线程来负责各个客户端连接的 IO 操作。Reactor 多线程模型 有如下特点:
- 有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求。
- 客户端连接的IO操作都是由一个特定的NIO线程池负责。每个客户端连接都与一个特定的NIO线程绑定, 因此在这个客户端连接中的所有IO操作都是在同一个线程中完成的。
-
客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中。
多线程主从模型:
如果服务器需要同时处理大量的客户端连接请求或在进行客户单连接时,进行一些权限的检查,那么单线程的Acceptor很有可能处理不过来,造成大量的客户端不能连接到服务器。所以在Reactor模型中,服务器端接收客户端的连接请求不在是一个线程,而是由一个独立的线程池组成的。
NioEventLoopGroup与Reactor线程模型的对应
netty是通过不同设置来实现reactor线程模式的。
单线程模式:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
...
在之前分析服务端启动的时候,我们其实是设置了两个EventLoopGroup:bossGroup和workGroup。我们这里只设置了一个EventLoopGroup入参设置为1表示服务端只有一个线程来处理,代表的是监听客户端和IO操作只有一个线程来处理,对应reactor单线程模型。可以看下b.group(bossGroup)
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
可以看到bossGroup和workerGroup 就是同一个NioEventLoopGroup了。那么后续Netty中的acceptor和后续的所有客户端连接的IO操作都是在一个线程中处理的。那么对应到Reactor的线程模型中, 我们这样设置NioEventLoopGroup时,就相当于Reactor单线程模型。
多线程模式:
对netty单线程模型有了解,那么多线程模型很自然能推倒而出。
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
···
bossGroup是监听客户端连接的,对应acceptor,单线程,workGroup是cpu核心数*2,负责处理io操作。很明显这就是reactor的多线程模型。
多线程主从模型
Netty的服务器端的acceptor阶段, 没有使用到多线程,因此主从多线程模型在Netty的服务器端是不存在的。
NioEventLoop
让我们来看看NioEventLoop的类继续关系:关注其中比较重要的继承线:
NioEventLoop -> SingleThreadEventLoop -> SingleThreadEventExecutor -> AbstractScheduledEventExecutor
NioEventLoop继承于SingleThreadEventLoop,而 SingleThreadEventLoop又继承于 SingleThreadEventExecutor。SingleThreadEventExecutor 是Netty中对本地线程的抽象,它内部有一个Thread thread属性, 存储了一个本地Java线程. 因此我们可以认为,一个 NioEventLoop其实和一个特定的线程绑定,并且在其生命周期内, 绑定的线程都不会再改变。
通常来说,NioEventLoop 肩负着两种任务, 第一个是作为IO线程, 执行与Channel相关的IO操作, 包括调用select等待就绪的IO事件、读写数据与数据的处理等;而第二个任务是作为任务队列, 执行taskQueue中的任务,例如用户调用eventLoop.schedule提交的定时任务也是这个线程执行的。
1、NioEventLoop实例化
NioEventLoop的实例化过程其实在之前已经说明了,是在实例化EventLoopGroup时候就实例化了NioEventLoop,一个EventLoopGroup可以有有个NioEventLoop,nThreads个NioEventLoop。SingleThreadEventExecutor 有一个名为 thread 的 Thread 类型字段, 这个字段就代表了与SingleThreadEventExecutor 关联的本地线程。
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
。。。
});
}
executor.execute其实是ThreadPerTaskExecutor.execute,即新建一个线程,而这个线程的主要工作就是:SingleThreadEventExecutor.this.run();而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是 NioEventLoop.run() 方法。thread = Thread.currentThread();代表executor.execute新建的这个线程,NioEventLoop.run()是在这个线程中执行,即表示该线程就是NioEventLoop绑定的线程。
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
2、EventLoop的启动
在前面我们已经知道了,NioEventLoop 本身就是一个SingleThreadEventExecutor,因此NioEventLoop的启动,其实就是NioEventLoop所绑定的本地Java线程的启动。然后让我们重温下AbstractBootstrap.initAndRegister(),
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
在跟踪register(channel);方法直到AbstractChannel.register():
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
。。。
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
一路从 Bootstrap.bind 方法跟踪到 AbstractChannel#AbstractUnsafe.register 方法, 整个代码都是在主线程中运行的, 因此上面的 eventLoop.inEventLoop() 就为 false, 于是进入到 else 分支, 在这个分支中调用了 eventLoop.execute. eventLoop 是一个 NioEventLoop 的实例, 而 NioEventLoop 没有实现 execute 方法, 因此调用的是 SingleThreadEventExecutor.execute,在execute中最后会调用到doStartThread方法会启动NioEventLoop绑定的java本地线程。总的来说,当EventLoop.execute第一次被调用时,就会触发doStartThread的调用,进而导致了EventLoop所对应的Java线程的启动。
3、netty的IO事件的循环处理
回忆下nio中selector的使用流程(https://www.jianshu.com/p/a61a19eb390f):
1、通过 Selector.open() 打开一个 Selector.
2、将 Channel 注册到 Selector 中, 并设置需要监听的事件(interest set)
3、循环做以下流程:
1)、调用 select() 方法
2)、调用 selector.selectedKeys() 获取 selected keys
3)、迭代每个 selected key:
1)、从 selected key 中获取 对应的 Channel 和附加信息(如果有的话)
2)、判断是哪些 IO 事件已经就绪了, 然后处理它们. 如果是 OP_ACCEPT 事件, 则调用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 获取 SocketChannel, 并将它设置为 非阻塞的, 然后将这个 Channel 注册到 Selector 中.
3)、根据需要更改 selected key 的监听事件.
4)、将已经处理过的 key 从 selected keys 集合中删除.
第一步打开selector在NioEventLoop初始化的过程中已经实现:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
第二步channel注册到select中在Bootstrap.initAndRegister或者ServerBootstrap.initAndRegister中也已经实现;
关键就是第三步的实现:
3.1、NioEventLoop中的run循环
在之前已经提到当EventLoop.execute第一次被调用时, 就会触发SingleThreadEventExecutor .doStartThread的调用, 进而导致了 EventLoop 所对应的 Java 线程的启动. doStartThread方法中的run方法主要工作就是调用了 SingleThreadEventExecutor.this.run() 方法. 而SingleThreadEventExecutor.run() 是一个抽象方法, 它的实现在 NioEventLoop 中。那么重点自然是在NioEventLoop的run方法中:
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
可以看到for (;;)是个死循环,就是NioEventLoop事件循环的秘密所在。selector第三步就在这里实现。去除细枝节,接下来看select()方法的调用是在哪里:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
...
int selectedKeys = selector.select(timeoutMillis);
...
}
这边说明下:select()方法会一直阻塞直到有数据ready,selectNow()则会立即返回,selector.select(timeoutMillis), 而这个调用是会阻塞住当前线程的, timeoutMillis 是阻塞的超时时间。这里其实也比较好理解,当hasTask()为false时走SelectStrategy.SELECT分支,没有任务的话,可以阻塞等待IO就绪事件。等到有事件就绪后,就是需要获取selected keys,然后针对每一种key进行事件处理。沿着代码路径看:
run-->processSelectedKeys()-->processSelectedKeysOptimized()
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
在这个方法中通过selectedKeys.keys[i]获取到获取selected keys,根据key处理事件则在processSelectedKey方法中。
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
return;
}
if (eventLoop != this || eventLoop == null) {
return;
}
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
processSelectedKey 中处理了三个事件, 分别是:
- OP_READ, 可读事件, 即 Channel 中收到了新数据可供上层读取.
- OP_WRITE, 可写事件, 即上层可以向 Channel 写入数据.
- OP_CONNECT, 连接建立事件, 即 TCP 连接已经建立, Channel 处于 active 状态.
很建议研读OP_READ事件中的unsafe.read()源码,对理解EventLoop很有帮助。
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
重点代码是int localRead = doReadMessages(readBuf);和pipeline.fireChannelRead(readBuf.get(i));
read()中实现了:
- 分配 ByteBuf
- 从 SocketChannel 中读取数据
- 调用 pipeline.fireChannelRead 发送一个 inbound 事件.
4、任务队列机制
在Netty 中,一个 NioEventLoop 通常需要肩负起两种任务,第一个是作为IO线程,处理 IO 操作;第二个就是作为任务线程,处理taskQueue中的任务。这一节的重点就是分析一下 NioEventLoop 的任务队列机制的。任务队列机制分为两部分:任务的添加和任务的执行。
4.1、任务的添加
NioEventLoop 继承于 SingleThreadEventExecutor, 而SingleThreadEventExecutor 中有一个 Queue<Runnable> taskQueue 字段, 用于存放添加的 Task. 在 Netty 中, 每个 Task 都使用一个实现了 Runnable 接口的实例来表示。例如当我们需要将一个 Runnable 添加到 taskQueue 中时, 我们可以进行类似如下操作:
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
然后调用SingleThreadEventExecutor的execute方法:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
addTask即添加任务。除了这类普通任务,还可以通过调用eventLoop.scheduleXXX 之类的方法来添加一个定时任务。EventLoop 中实现任务队列的功能在超类 SingleThreadEventExecutor 实现的, 而 schedule 功能的实现是在 SingleThreadEventExecutor 的父类, 即 AbstractScheduledEventExecutor 中实现的。在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
通过AbstractScheduledEventExecutor.schedule方法实现任务的添加。
4.2、任务的执行
当一个任务被添加到 taskQueue 后, 它是怎么被 EventLoop 执行的呢?让我们回到 NioEventLoop.run() 方法中, 在这个方法里, 会分别调用processSelectedKeys() 和 runAllTasks() 方法, 来进行 IO 事件的处理和 task 的处理. processSelectedKeys() 方法我们已经分析过了, 下面我们来看一下 runAllTasks() :
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
fetchedAll = fetchFromScheduledTaskQueue();
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
fetchFromScheduledTaskQueue()其实就是将 scheduledTaskQueue中已经可以执行的(即定时时间已到的 schedule 任务) 拿出来并添加到 taskQueue中, 作为可执行的 task 等待被调度执行,然后runAllTasksFrom方法就会不断调用 task = pollTask() 从 taskQueue 中获取一个可执行的 task, 然后调用它的 run() 方法来运行此 task。