开篇
NioEventLoop是Netty框架的Reactor线程;
NioEventLoop负责处理注册在其上面的所有Channel的IO事件,通常情况下一个NioEventLoop会下挂多个Channel;
NioEventLoop同时会负责通过execute方法提交的任务,以及通过schedule方法提交的定时任务;
在接下来几篇文章,我会通过Netty的源码深入讲解NioEventLoop的实现机制。
特别说明:基于4.1.52版本的源码
类继承关系以及重要的成员变量
先来看下NioEventLoop的类关系图和重要的属性,对其有一个整体的感知,便于后面详细分析。
- 类继承关系
可以看到NioEventLoop的继承关系非常复杂,最上层是JDK的Executor
接口,说明它归根到底是一个执行器,是用来执行任务的。另外,它实现了EventLoop
接口、EventExecutorGroup
接口和ScheduledExecutorService
接口,继承了SingleThreadEventExecutor
类,这些接口和类为这个执行器添加了十分繁复的功能特性,要搞清楚NioEventLoop的具体实现机制就要不停的在这些父类和接口中来回跳转。 - 重要的成员变量
private Selector selector;
private SelectedSelectionKeySet selectedKeys;
private volatile Thread thread;
private final EventExecutorGroup parent;
private final Queue<Runnable> taskQueue;
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
private final Queue<Runnable> tailTasks;
-
selector
:作为NIO框架的Reactor线程,NioEventLoop需要处理网络IO事件,因此它需要有一个多路复用器,即Java NIO的Selector对象; -
selectedKeys
:每次select操作选出来的有事件就绪的SelectionKey集合,在NioEventLoop的run
方法中会处理这些事件; -
thread
:即每个NioEventLoop绑定的线程,它们是一对一的关系,一旦绑定,在整个生命周期内都不会改变; -
parent
:即当前的NioEventLoop所属的EventExecutorGroup; -
taskQueue
:NioEventLoop中三大队列之一,用于保存需要被执行的任务。 -
scheduledTaskQueue
:NioEventLoop中三大队列之一,是一个优先级队列(内部其实是一个按照任务的下次执行时间排序的小顶堆),用于保存定时任务,当检测到定时任务需要被执行时,会将任务从scheduledTaskQueue
中取出,放入taskQueue
; -
tailTasks
:NioEventLoop中三大队列之一,用于存储当前或下一次事件循环结束后需要执行的任务;
构造函数
首先来看NioEventLoop的构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// 设置parent、executor、addTaskWakesUp(添加任务时是否唤醒select)、创建taskQueue和tailTask队
// 列
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
// selector初始化
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
在构造函数中,会创建任务队列和tailTask队列
private static Queue<Runnable> newTaskQueue(
EventLoopTaskQueueFactory queueFactory) {
if (queueFactory == null) {
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
默认情况下,会创建MPSC,即多生产者单消费者的队列,这里最终会用到JCTools库,这里不过多介绍,感兴趣的可以自己去了解。
构造函数中还会初始化selector和根据配置对selectedKeys进行优化
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 如果优化选项没有开启,则直接返回
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
if (cause != null) {
return cause;
}
cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
if (cause != null) {
return cause;
}
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
return e;
} catch (IllegalAccessException e) {
return e;
}
}
});
selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
如果设置了优化开关(默认优化选项是开启的),则通过反射的方式从Selector中获取selectedKeys和publicSelectedKeys,将这两个成员设置为可写,通过反射,使用Netty构造的selectedKeySet将原生JDK的selectedKeys替换掉。
我们知道使用Java原生NIO接口时,需要先调Selector的select方法,再调selectedKeys方法才可以获得有IO事件准备好的SelectionKey集合。这里优化过后,只通过一步select调用,就可以从selectedKeySet获得需要的SelectionKey集合。
另外,原生Java的SelectionKey集合是一个HashSet,这里优化过后的SelectedSelectionKeySet底层是一个数组,效率更高。
run方法解析
EventLoop的职责可以用下面这张图形象的表示
EventLoop的
run
方法在一个for死循环中,周而复始的做着三件事:
1、从已注册的Channel监听IO事件;
2、处理IO事件;
3、从任务队列取任务执行。
protected void run() {
int selectCnt = 0;
for (;;) {
int strategy;
try {
// 计算本次循环的执行策略
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT:
// 调用Java NIO的多路复用器,检查注册在NioEventLoop上的Channel的IO状态
strategy = select(curDeadlineNanos);
}
} catch (IOException e) {
}
// 处理IO事件
processSelectedKeys();
// 处理任务队列中的任务
ranTasks = runAllTasks();
...
}
下面详细解析:
- 先来看calculateStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty() || !tailTasks.isEmpty();
}
每次循环,都会检测任务队列和IO事件,如果任务队列中没有任务,则直接返回SelectStrategy.SELECT;如果任务队列中有任务,则会调用非阻塞的selectNow
检测有IO事件准备好的Channel数。
-
阻塞的select
当任务队列中没有任务时,直接进入select分支
case SelectStrategy.SELECT:
// 找到下一个将要执行的定时任务的截止时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) {
// 阻塞调用select
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
nextScheduledTaskDeadlineNanos
方法返回下一个将要被执行的定时任务的截止时间
protected final long nextScheduledTaskDeadlineNanos() {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
return scheduledTask != null ? scheduledTask.deadlineNanos() : -1;
}
final ScheduledFutureTask<?> peekScheduledTask() {
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
return scheduledTaskQueue != null ? scheduledTaskQueue.peek() : null;
}
NioEventLoop的定时任务队列是一个优先级队列,队列中存储的是ScheduledFutureTask对象
PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
if (scheduledTaskQueue == null) {
scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
SCHEDULED_FUTURE_TASK_COMPARATOR,
11);
}
return scheduledTaskQueue;
}
通过ScheduledFutureTask的compareTo
方法可以看出,优先级队列中的元素是以任务的截止时间来排序的,队首元素的截止时间最小,当截止时间相同时,以任务ID排序,ID小的排在前面。
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else {
assert id != that.id;
return 1;
}
}
当定时任务ScheduledFutureTask执行后,会根据periodNanos
的取值决定是否要将任务重新放回队列。从netty的注释可以清晰看到:
当
periodNanos
为0时,表示的是只执行一次的任务,执行完后丢弃就好,不再放回队列;
当periodNanos
大于0时表示的是以固定的频率执行任务,下一次任务执行的开始时间是以上一次任务的开始时间为基准而得来;
当periodNanos
小于0时表示的是以固定的延迟时间执行任务,下一次任务的开始时间是以上一次任务的结束时间为基准而得来。
/* 0 - no repeat, >0 - repeat at fixed rate, <0 - repeat with fixed delay */
private final long periodNanos;
看下ScheduledFutureTask的run
方法
public void run() {
assert executor().inEventLoop();
try {
if (delayNanos() > 0L) {
// 执行时间还未到,则要么移除任务,要么重新加入队列
if (isCancelled()) {
scheduledExecutor().scheduledTaskQueue().removeTyped(this);
} else {
scheduledExecutor().scheduleFromEventLoop(this);
}
return;
}
// 只执行一次的任务,执行完后丢弃就好,不再放回队列
if (periodNanos == 0) {
if (setUncancellableInternal()) {
V result = runTask();
setSuccessInternal(result);
}
} else {
if (!isCancelled()) {
runTask();
if (!executor().isShutdown()) {
// 根据periodNanos ,计算截止时间
if (periodNanos > 0) {
deadlineNanos += periodNanos;
} else {
deadlineNanos = nanoTime() - periodNanos;
}
if (!isCancelled()) {
// 重新加入队列
scheduledExecutor().scheduledTaskQueue().add(this);
}
}
}
}
} catch (Throwable cause) {
setFailureInternal(cause);
}
}
当任务的执行时间还未到,则判断任务是否已经取消,如果已取消则移除任务,否则重新加入队列。对于只执行一次的任务,执行完了不会再放回队列。其他的任务,则根据periodNanos
的类型,重新计算截止时间,重新放回队列,等待下次调度。
定时任务的优先级队列到此介绍完毕,接着看NioEventLoop的run
方法
nextWakeupNanos.set(curDeadlineNanos);
try {
// 再次判断任务队列中是否有任务
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
}
private int select(long deadlineNanos) throws IOException {
// 如果没有定时任务,直接调Java NIO的select,进入阻塞
if (deadlineNanos == NONE) {
return selector.select();
}
// 如果截止时间小于0.5ms,则timeoutMillis 为0,直接调非阻塞的selectNow()方法
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
在调用select
之前,再次调用hasTasks()
判断从上次调用该方法到目前为止是否有任务加入,多做了一层防护,因为调用select
时,可能会阻塞,这时,如果任务队列中有任务就会长时间得不到执行,所以须小心谨慎。
如果任务队列中还是没有任务,则会调用select
方法。在这个方法中会根据入参deadlineNanos
来选择调用NIO的哪个select方法:
如果
deadlineNanos
为NONE,即没有定时任务时,直接调用NIO的无参select
方法,进入永久阻塞,除非检测到Channel的IO事件或者被wakeup;
如果存在定时任务,且定时任务的截止时间小于0.5ms,则timeoutMillis 为0,直接调非阻塞的selectNow
方法,也就是说马上有定时任务需要执行了,不要再进入阻塞了;
其他情况,调用select(timeout)
,进入有超时时间的阻塞。
到这里,可能有人要问了:在上面的方法中,如果调用了Java NIO的无参的select
方法,就会进入阻塞,除非检测到Channel的IO事件,那么在检测到IO事件之前,加入到任务队列中的任务怎么得到执行呢?
好,你想,在检测到IO事件之前,可以退出阻塞的方法是什么?对,调用wakeup
方法。那么我们来搜一下NioEventLoop中有调用Selector的wakeup
方法的地方吗:
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
selector.wakeup();
}
}
还真搜到了,再看一下这个方法被调用的地方
看到SingleThreadEventExecutor的execute
方法了吗,就是说在调execute
方法,向EventLoop提交任务时,会将EventLoop线程从Java NIO的select阻塞中唤醒。
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
...
}
if (!addTaskWakesUp && immediate) {
// 唤醒EventLoop线程,执行任务队列中的任务
wakeup(inEventLoop);
}
}
到这里,NioEventLoop的run方法的职责之一:检测Channel的IO事件就讲解完毕。
至于IO事件的处理以及任务队列中任务的处理会在后面的文章中解析,敬请期待。
总结
在本文中,对Netty的NioEventLoop进行了深入的解读,并且详细讲解了它的三大职责之一:检测Channel的IO事件的机制。
NioEventLoop是Netty最核心的概念,内部运行机制很复杂,在接下来的两篇文章中会继续分析。