本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
NioEventLoop
通过前面的学习,我们对NioEventLoop做过如下几点简单的概述:
① NioEventLoop是一个基于JDK NIO的异步事件循环类,它负责处理一个Channel的所有事件在这个Channel的生命周期期间。
② NioEventLoop的整个生命周期只会依赖于一个单一的线程来完成。一个NioEventLoop可以分配给多个Channel,NioEventLoop通过JDK Selector来实现I/O多路复用,以对多个Channel进行管理。
③ 如果调用Channel操作的线程是EventLoop所关联的线程,那么该操作会被立即执行。否则会将该操作封装成任务放入EventLoop的任务队列中。
④ 所有提交到NioEventLoop的任务都会先放入队列中,然后在线程中以有序(FIFO)/连续的方式执行所有提交的任务。
⑤ NioEventLoop的事件循环主要完成了:a)已经注册到Selector的Channel的监控,并在感兴趣的事件可执行时对其进行处理;b)完成任务队列(taskQueue)中的任务,以及对可执行的定时任务和周期性任务的处理(scheduledTaskQueue中的可执行的任务都会先放入taskQueue中后,再从taskQueue中依次取出执行)。
其中几点已经在启动流程的源码分析中做了详细的介绍。本文主要针对NioEventLoop事件循环的流程对NioEventLoop进行更深一步的学习。
JCTools
JCTools:适用于JVM的Java并发工具。该项目旨在提供一些当前JDK缺失的并发数据结构。
SPSC/MPSC/SPMC/MPMC 变种的并发队列:
- SPSC:用于单生产者单消费者模式(无等待,有限长度 和 无限长度)
- MPSC:用于多生产者单消费者模式(无锁的,有限长度 和 无限长度)
- SPMC:用于单生产者多消费者模式(无锁的,有限长度)
- MPMC:用于多生产者多消费模式(无锁的,有限长度)
JCTools提供的队列是一个无锁队列,也就是队列的底层通过无锁的方式实现了线程安全的访问。
MpscUnboundedArrayQueue是由JCTools提供的一个多生产者单个消费者的数组队列。多个生产者同时并发的访问队列是线程安全的,但是同一时刻只允许一个消费者访问队列,这是需要程序控制的,因为MpscQueue的用途即为多个生成者可同时访问队列,但只有一个消费者会访问队列的情况。如果是其他情况你可以使用JCTools提供的其他队列。
Q:为什么说MpscUnboundedArrayQueue的性能高于LinkedBlockingQueue了?
A:① MpscUnboundedArrayQueue底层通过无锁的方式实现了多生产者同时访问队列的线程安全性,而LinkedBlockingQueue是一个多生产者多消费者的模式,它则是用过Lock锁的方式来实现队列的线程安全性。
② Netty的线程模型决定了taskQueue可以用多个生产者线程同时提交任务,但只会有EventLoop所在线程来消费taskQueue队列中的任务。这样JCTools提供的MpscQueue完全符合Netty线程模式的使用场景。而LinkedBlockingQueue会在生产者线程操作队列时以及消费者线程操作队列时都对队列加锁以保证线程安全性。虽然,在Netty的线程模型中程序会控制访问taskQueue的始终都会是EventLoop所在线程,这时会使用偏向锁来降低线程获得锁的代价。
偏向锁:HotSpot的作者经过研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程再进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需要简单地测试一下对象头的Mark Word(Mark Word是Java对象头的内容,用于存储对象的hashCode或锁信息等)里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁的标识是否设置成1(表示当前是偏向锁):如果没有设置,则使用CAS竞争锁;如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。
重要属性
- taskQueue
// 用于存储任务的队列,是一个MpscUnboundedArrayQueue实例。
private final Queue<Runnable> taskQueue;
- tailTasks
// 是一个MpscUnboundedArrayQueue实例。用于存储当前或下一次事件循环(eventloop)迭代结束后需要执行的任务。
private final Queue<Runnable> tailTasks;
- scheduledTaskQueue
// 定时或周期任务队列,是一个PriorityQueue实例。
Queue<ScheduledFutureTask<?>> scheduledTaskQueue;
- SELECTOR_AUTO_REBUILD_THRESHOLD
private static final int MIN_PREMATURE_SELECTOR_RETURNS = 3;
private static final int SELECTOR_AUTO_REBUILD_THRESHOLD;
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
SELECTOR_AUTO_REBUILD_THRESHOLD用于标识Selector空轮询的阈值,当超过这个阈值的话则需要重构Selector。
如果有设置系统属性”io.netty.selectorAutoRebuildThreshold”,并且该属性值大于MIN_PREMATURE_SELECTOR_RETURNS(即,3),那么该属性值就为阈值;如果该属性值小于MIN_PREMATURE_SELECTOR_RETURNS(即,3),那么阈值为0。如果没有设置系统属性”io.netty.selectorAutoRebuildThreshold”,那么阈值为512,即,默认情况下阈值为512。
- selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
selectNow提供器,在事件循环里用于选择策略(selectStrategy)中。
- pendingTasksCallable
private final Callable<Integer> pendingTasksCallable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return NioEventLoop.super.pendingTasks();
}
};
@Override
public int pendingTasks() {
// As we use a MpscQueue we need to ensure pendingTasks() is only executed from within the EventLoop as
// otherwise we may see unexpected behavior (as size() is only allowed to be called by a single consumer).
// See https://github.com/netty/netty/issues/5297
if (inEventLoop()) {
return super.pendingTasks();
} else {
return submit(pendingTasksCallable).syncUninterruptibly().getNow();
}
}
因为pendingTasks()方法的底层就是调用taskQueue.size()方法,而前面我们已经说了taskQueue是一个MpscQueue,所以只能由EventLoop所在的线程来调用这个pendingTasks()方法,如果当前线程不是EventLoop所在线程,那么就将pendingTasks()封装在一个Callable(即,pendingTasksCallable)提交到taskQueue中去执行,并同步的等待执行的结果。
- 静态代码块
// Workaround for JDK NIO bug.
//
// See:
// - http://bugs.sun.com/view_bug.do?bug_id=6427854
// - https://github.com/netty/netty/issues/203
static {
final String key = "sun.nio.ch.bugLevel";
final String buglevel = SystemPropertyUtil.get(key);
if (buglevel == null) {
try {
AccessController.doPrivileged(new PrivilegedAction<Void>() {
@Override
public Void run() {
System.setProperty(key, "");
return null;
}
});
} catch (final SecurityException e) {
logger.debug("Unable to get/set System Property: " + key, e);
}
}
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION);
logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD);
}
}
这里的静态代码块,主要做了两件事:
① 解决在java6 中 NIO Selector.open()可能抛出NPE异常的问题。
问题:http://bugs.java.com/view_bug.do?bug_id=6427854
解决:https://github.com/netty/netty/issues/203
这里我们对问题和Netty的解决方案进行一个简单的讲解。
问题描述:
sun.nio.ch.Util中包含👇线程不安全的代码,并可能抛出一个NullPointerException异常。
正是因为
java.security.PrivilegedAction pa =
new GetPropertyAction("sun.nio.ch.bugLevel");
// the next line can reset bugLevel to null
bugLevel = (String)AccessController.doPrivileged(pa);
👆的调用导致bugLevel又被重置为了null。导致了NPE bug的发生。
Netty的解决方案:在开始使用Selector.open()方法之前,先将"sun.nio.ch.bugLevel"系统属性设置为non-null的。即,如果"sun.nio.ch.bugLevel”系统属性值为null,则设置”sun.nio.ch.bugLevel”=“”
② 为了在事件循环时解决JDK NIO类库的epoll bug,先设置好SELECTOR_AUTO_REBUILD_THRESHOLD,即selector空轮询的阈值。具体的赋值流程上面已经详细说明过了。
- 唤醒select标识符
// 一个原子类的Boolean标识用于控制决定一个阻塞着的Selector.select是否应该结束它的选择操作。
private final AtomicBoolean wakenUp = new AtomicBoolean();
- ioRatio
// 在事件循环中期待用于处理I/O操作时间的百分比。默认为50%。
// 也就是说,在事件循环中默认情况下用于处理I/O操作的时间和用于处理任务的时间百分比都为50%,
// 即,用于处理I/O操作的时间和用于处理任务的时间时一样的。用户可以根据实际情况来修改这个比率。
private volatile int ioRatio = 50
方法
构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
a) 完成成员属性taskQueue、tailQueue的构建;
最终会调用newTaskQueue方法来完成构建:
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// This event loop never calls takeTask()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
参数maxPendingTasks默认为Integer.MAX_VALUE,则会通过“PlatformDependent.<Runnable>newMpscQueue()”返回来构造一个MpscUnboundedArrayQueue实例,其初容量大小为1024,最大容量限制为2048。
b) 设置成员属性addTaskWakesUp为false。
c) 设置成员属性rejectedExecutionHandler值为RejectedExecutionHandlers.reject()方法将返回一个RejectedExecutionHandler实例。
/**
* Similar to {@link java.util.concurrent.RejectedExecutionHandler} but specific to {@link SingleThreadEventExecutor}.
*/
public interface RejectedExecutionHandler {
/**
* Called when someone tried to add a task to {@link SingleThreadEventExecutor} but this failed due capacity
* restrictions.
*/
void rejected(Runnable task, SingleThreadEventExecutor executor);
}
RejectedExecutionHandler接口类似于JDK 的 java.util.concurrent.RejectedExecutionHandler,但是RejectedExecutionHandler只针对于SingleThreadEventExecutor。
该接口中有一个唯一的接口方法rejected,当尝试去添加一个任务到SingleThreadEventExecutor中,但是由于容量的限制添加失败了,那么此时该方法就会被调用。
RejectedExecutionHandlers.reject()返回的是一个RejectedExecutionHandler常量REJECT
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
该RejectedExecutionHandler总是抛出一个RejectedExecutionException异常。
d) final SelectorTuple selectorTuple = openSelector();
开启Selector,构造SelectorTuple实例,SelectorTuple是一个封装了原始selector对象和封装后selector对象(即,SelectedSelectionKeySetSelector对象)的类:
private static final class SelectorTuple {
final Selector unwrappedSelector;
final Selector selector;
这里,成员变量unwrappedSelector就是通过SelectorProvider.provider().openSelector()开启的Selector;而成员变量selector则是一个SelectedSelectionKeySetSelector对象。
SelectedSelectionKeySetSelector中持有unwrappedSelector实例,并作为unwrappedSelector的代理类,提供Selector所需要的方法,而Selector相关的操作底层实际上都是由unwrappedSelector来完成的,只是在操作中增加了对selectionKeys进行相应的设置。SelectedSelectionKeySetSelector中除了持有unwrappedSelector实例外还持有一个SelectedSelectionKeySet对象。该对象是Netty提供的一个可以‘代替’Selector selectedKeys的对象。openSelector()方法中通过反射机制将程序构建的SelectedSelectionKeySet对象给设置到了Selector内部的selectedKeys、publicSelectedKeys属性。这使Selector中所有对selectedKeys、publicSelectedKeys的操作实际上就是对SelectedSelectionKeySet的操作。
SelectedSelectionKeySet类主要通过成员变量SelectionKey[]数组来维护被选择的SelectionKeys,并将扩容操作简单的简化为了’newCapacity为oldCapacity的2倍’来实现。同时不在支持remove、contains、iterator方法。并添加了reset方法来对SelectionKey[]数组进行重置。
SelectedSelectionKeySetSelector中主要是在每次select操作的时候,都会先将selectedKeys进行清除(reset)操作。
e) 设置成员属性selectStrategy的值为DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(),即一个DefaultSelectStrategy实例。
事件循环
NioEventLoop的事件循环主要完成下面几件事:
① 根据当前NioEventLoop中是否有待完成的任务得出select策略,进行相应的select操作
② 处理select操作得到的已经准备好处理的I/O事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务)。
③ 如果NioEventLoop所在线程执行了关闭操作,则执行相关的关闭操作处理。这一块在之前Netty 源码解析 ——— Netty 优雅关闭流程的文章已经做了详细的说明,这里就不再赘述了。
下面我们详细展开每一步
① 根据当前NioEventLoop中是否有待完成的任务得出select策略,进行相应的select操作:
『selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())』
前面我们已经说过selectNowSupplier是一个selectNow提供器:
// NioEventLoop#selectNowSupplier
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
// NioEventLoop#selectNow()
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
// SelectedSelectionKeySetSelector#selectNow()
public int selectNow() throws IOException {
selectionKeys.reset();
return delegate.selectNow();
}
// SelectedSelectionKeySetSelector#wakeup()
public Selector wakeup() {
return delegate.wakeup();
}
selectNowSupplier提供的selectNow()操作是通过封装过的selector(即,SelectedSelectionKeySetSelector对象)来完成的。而SelectedSelectionKeySetSelector的selectorNow()方法处理委托真实的selector完成selectoNow()操作外,还会将selectionKeys清空。
hasTasks()方法用于判断taskQueue或tailTasks中是否有任务。
前面我们也提到过selectStrategy就是一个DefaultSelectStrategy对象:
final class DefaultSelectStrategy implements SelectStrategy {
static final SelectStrategy INSTANCE = new DefaultSelectStrategy();
private DefaultSelectStrategy() { }
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
DefaultSelectStrategy的选择策略就是:
如果当前的EventLoop中有待处理的任务,那么会调用selectSupplier.get()方法,也就是最终会调用Selector.selectNow()方法,并清空selectionKeys。Selector.selectNow()方法不会发生阻塞,如果没有一个channel(即,该channel注册的事件发生了)被选择也会立即返回,否则返回就绪I/O事件的个数。
如果当前的EventLoop中没有待处理的任务,那么返回’SelectStrategy.SELECT(即,-1)’。
如果‘selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())’操作返回的是一个>0的值,则说明有就绪的的I/O事件待处理,则直接进入流程②。否则,如果返回的是’SelectStrategy.SELECT’则进行select(wakenUp.getAndSet(false))操作:
首先先通过自旋锁(自旋 + CAS)方式获得wakenUp当前的标识,并再将wakenUp标识设置为false。将wakenUp作为参数传入select(boolean oldWakenUp)方法中,注意这个select方法不是JDK NIO的Selector.select方法,是NioEventLoop类自己实现的一个方法,只是方法名一样而已。NioEventLoop的这个select方法还做了一件很重要的时,就是解决“JDK NIO类库的epoll bug”问题。
- 解决 JDK NIO 类库的 epool bug
下面我们来对这个“JDK NIO类库的epoll bug”问题已经Netty是如何解决这个问题进行一个说明:
JDK NIO类库最著名的就是 epoll bug了,它会导致Selector空轮询,IO线程CPU 100%,严重影响系统的安全性和可靠性。
SUN在解决该BUG的问题上不给力,只能从NIO框架层面进行问题规避,下面我们看下Netty是如何解决该问题的。
Netty的解决策略:
- 根据该BUG的特征,首先侦测该BUG是否发生;
- 将问题Selector上注册的Channel转移到新建的Selector上;
- 老的问题Selector关闭,使用新建的Selector替换。
下面具体看下代码,首先检测是否发生了该BUG:
红色框中的代码,主要完成了是否发生“epoll-bug”的检测。
『if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos)』返回false,即『time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) < currentTimeNanos』 的意思是:int selectedKeys = selector.select(timeoutMillis)在timeoutMillis时间到期前就返回了,并且selectedKeys==0,则说明selector进行了一次空轮询,这违反了Javadoc中对Selector.select(timeout)方法的描述。epoll-bug会导致无效的状态选择和100%的CPU利用率。也就是Selector不管有无感兴趣的事件发生,select总是不阻塞就返回。这会导致select方法总是无效的被调用然后立即返回,依次不断的进行空轮询,导致CPU的利用率达到了100%。
int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512);
if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) {
selectorAutoRebuildThreshold = 0;
}
SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold;
SELECTOR_AUTO_REBUILD_THRESHOLD默认为512,也就是当Selector连续执行了512次空轮询后,Netty就会进行Selector的重建操作,即rebuildSelector()操作。
绿色框中代码主要说明了,当有定时/周期性任务即将到达执行时间(<0.5ms),或者NioEventLoop的线程收到了新提交的任务上来等待着被处理,或者有定时/周期性任务到达了可处理状态等待被处理,那么则退出select方法转而去执行任务。这也说明Netty总是会尽最大努力去保证任务队列中的任务以及定时/周期性任务能得到及时的处理。
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
该段代码会计算scheduledTaskQueue中是否有即将要执行的任务,即在0.5ms内就可执行的scheduledTask,如果有则退出select方法转而去执行任务。
‘selectDeadLineNanos’的初始值通过‘currentTimeNanos + delayNanos(currentTimeNanos);’而来。delayNanos方法会返回最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差(若,scheduledTaskQueue为空,也就是没有任务的定时/周期性任务,则返回1秒)。因此selectDeadLineNanos就表示最近一个待执行的定时/周期性任务的可执行时间。
‘selectDeadLineNanos - currentTimeNanos’就表示:最近一个待执行的定时/周期性任务还差多少纳秒就可以执行的时间差。我们用scheduledTaskDelayNanos来表示该差值。
‘(selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L’表示:(scheduledTaskDelayNanos + 0.5ms) / 1ms。如果该结果大于0,则说明scheduledTaskDelayNanos >= 0.5ms,否则scheduledTaskDelayNanos < 0.5ms。
因此,就有了上面所说的结论,scheduledTaskQueue中有在0.5ms内就可执行的任务,则退出select方法转而去执行任务。
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
在了解👆代码的用意之前,我们先来说下,当有任务提交至EventLoop时的一些细节补充
a) 成员变量addTaskWakesUp为false。
这里,在构造NioEventLoop对象时,通过构造方法传进的参数’addTaskWakesUp’正是false,它会赋值给成员变量addTaskWakesUp。因此该条件满足。
b)当提交上来的任务不是一个NonWakeupRunnable任务
// NioEventLoop#wakeup(boolean inEventLoop)
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
c) 执行提交任务的线程不是EventLoop所在线程
d) 当wakenUp成员变量当前的值为false
// NioEventLoop#wakeup(boolean inEventLoop)
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
}
只有同时满足上面4个条件的情况下,Selector的wakeup()方法才会的以调用。
现在,我们来说明这段代码块的用意
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
如果一个任务在wakenUp值为true的情况下被提交上来,那么这个任务将没有机会去调用Selector.wakeup()(即,此时’d)’条件不满足)。所以我们需要去再次检测任务队列中是否有待执行的任务,在执行Selector.select操作之前。如果我们不这么做,那么任务队列中的任务将等待直到Selector.select操作超时。如果ChannelPipeline中存在IdleStateHandler,那么IdleStateHandler处理器可能会被挂起直到空闲超时。
首先,这段代码是在每次要执行Selector.select(long timeout)之前我们会进行一个判断。我们能够确定的事,如果hasTasks()为true,即发现当前有任务待处理时。wakenUp.compareAndSet(false, true)会返回true,因为在每次调用当前这个select方法时,都会将wakenUp标识设置为false(即,‘wakenUp.getAndSet(false)’这句代码)。而此时,wakenUp已经被置位true了,在此之后有任务提交至EventLoop,那么是无法触发Selector.wakeup()的。所以如果当前有待处理的任务,就不会进行下面的Selector.select(long timeout)操作,而是退出select方法,继而去处理任务。
因为如果不这么做的话,如果当前NioEventLoop线程上已经有任务提交上来,这会使得这些任务可能会需要等待Selector.select(long timeout)操作超时后才能得以执行。再者,假设我们的ChannelPipeline中存在一个IdleStateHandler,那么就可能导致因为Selector.select(long timeout)操作的timeout比IdleStateHandler设置的idle timeout长,而导致IdleStateHandler不能对空闲超时做出即使的处理。
同时,我们注意,在执行‘break’退出select方法前,会执行‘selector.selectNow()’,该方法不会阻塞,它会立即返回,同时它会抵消Selector.wakeup()操作带来的影响(关于NIO 相关的知识点,欢迎参阅关于 NIO 你不得不知道的一些“地雷”)。
所以,① 如有有非NioEventLoop线程提交了一个任务上来,那么这个线程会执行『selector
.wakeup()』方法,那么NioEventLoop在『if (hasTasks() && wakenUp.compareAndSet(false, true))』的后半个条件会返回false,程序会执行到『int selectedKeys = selector.select(timeoutMillis);』,但是此时select不会阻塞,而是直接返回,因为前面已经先执行了『selector.wakeup()』;② 因为提交任务的线程是非NioEventLoop线程,所以也可能是由NioEventLoop线程成功执行了『if (hasTasks() && wakenUp.compareAndSet(false, true))』,退出了select方法转而去执行任务队列中的任务。注意,这是提交任务的非NioEventLoop线程就不会执行『selector.wakeup()』。
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
同时,除了在每次Selector.select(long timeout)操作前进行任务队列的检测外,在每次Selector.select(long timeout)操作后也会检测任务队列是否已经有提交上来的任务待处理,以及是由有定时或周期性任务准备好被执行。如果有,也不会继续“epoll-bug”的检测,转而去执行待处理的任务。
好了,我们在来看下如果经过检测,我们已经确认发生了“epoll-bug”,这时我们就需要进行Selector的重构操作:
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
重构操作主要的流程:首先,通过openSelector()先构造一个新的SelectorTuple。然后,遍历oldSelector中的所有SelectionKey,依次判断其有效性,如果有效则将其重新注册到新的Selector上,并将旧的SelectionKey执行cancel操作,进行相关的数据清理,以便最后oldSelector好进行关闭。在将所有的SelectionKey数据移至新的Selector后,将newSelectorTuple的selector和unwrappedSelector赋值给相应的成员属性。最后,调用oldSelector.close()关闭旧的Selector以进行资源的释放。
接下我们继续讨论NioEventLoop.select操作的流程②
② 处理select操作得到的已经准备好处理的I/O事件,以及处理提交到当前EventLoop的任务(包括定时和周期任务):
a) 首先先将成员变量cancelledKeys和needsToSelectAgain重置,即,cancelledKeys置为0,needsToSelectAgain置为false;
b) 成员变量ioRatio的默认值为50
private volatile int ioRatio = 50;
ioRatio在事件循环中期待用于处理I/O操作时间的百分比。默认为50%。也就是说,在事件循环中默认情况下用于处理I/O操作的时间和用于处理任务的时间百分比都为50%,即,用于处理I/O操作的时间和用于处理任务的时间时一样的。
这里做个简单的证明吧:
当ioRatio不为100%时,我们假设在事件循环中用于处理任务时间的百分比为taskRatio,I/O操作的时间为ioTime,处理任务的时间为taskTime,求taskTime:
ioTime/taskTime = ioRatio/taskRatio; 并且 ioRatio + taskRatio = 100;
带入,ioTime/taskTime = ioRatio/(100-ioRatio); ==> taskTime = ioTime*(100 - ioRatio) / ioRatio;
所以runAllTasks(ioTime * (100 - ioRatio) / ioRatio);传入的参数就为可用于运行任务的时间。
c) processSelectedKeys():处理Selector.select操作返回的待处理的I/O事件。
注意,『selectedKeys.keys[i] = null;』操作相当于我们在NIO编程中在处理已经触发的感兴趣的事件时,要将处理过的事件充selectedKeys集合中移除的步骤。
该方法会从selectedKeys中依次取出准备好被处理的SelectionKey,并对相应的待处理的I/O事件进行处理。
在Netty 源码解析 ——— 服务端启动流程 (下)说到过,再将ServerSocketChannel注册到Selector的时候,是会将其对应的NioServerSocketChannel作为附加属性设置到SelectionKey中。所有这里从k.attachment()获取到的Object对象实际就是NioServerSocketChannel,而NioServerSocketChannel就是一个AbstractNioChannel的实现类。
当SelectionKey.OP_CONNECT(连接事件)准备就绪时,我们执行如下操作:
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
将SelectionKey.OP_CONNECT事件从SelectionKey所感兴趣的事件中移除,这样Selector就不会再去监听该连接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了。
unsafe.finishConnect():该方法会调用SocketChannel.finishConnect()来标识连接的完成,如果我们不调用该方法,就去调用read/write方法,则会抛出一个NotYetConnectedException异常。在此之后,触发ChannelActive事件,该事件会在该Channel的ChannelPipeline中传播处理。
具体的关于SelectionKey.OP_CONNECT、SelectionKey.OP_WRITE、SelectionKey.OP_READ、SelectionKey.OP_ACCEPT的处理流程可以参阅Netty 源码解析 ——— 基于 NIO 网络传输模式的 OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE 事件处理流程
d) 处理任务队列中的任务以及定时/周期性任务。
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
首先,这里将执行任务的语句写在了finally块中,这是为了确保即便处理SelectedKeys出现了异常,也要确保任务中的队列总能得到执行的机会。
① 获取系统启动到当前的时间内已经过去的定时任务(即,延迟的时间已经满足或者定时执行任务的时间已经满足的任务)放入到taskQueue中。
从taskQueue中获取任务,如果taskQueue已经没有任务了,则依次执行tailTasks队列里的所有任务。
a) 『fetchFromScheduledTaskQueue()』
// SingleThreadEventExecutor#fetchFromScheduledTaskQueue()
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
获取从系统启动到当前系统的时间间隔。从scheduledTaskQueue中获取在该时间间隔内已经过期的任务(即延迟周期或定时周期已经到时间的任务),将这些任务放入到taskQueue中,如果taskQueue满了无法进入添加新的任务(taskQueue队列的容量限制最大为2048),则将其重新放回到scheduledTaskQueue。
默认情况下,taskQueue是一个MpscUnboundedArrayQueue实例。
// AbstractScheduledEventExecutor#pollScheduledTask(long nanoTime)
protected final Runnable pollScheduledTask(long nanoTime) {
assert inEventLoop();
Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
if (scheduledTask == null) {
return null;
}
if (scheduledTask.deadlineNanos() <= nanoTime) {
scheduledTaskQueue.remove();
return scheduledTask;
}
return null;
}
根据给定的nanoTime返回已经准备好被执行的Runnable。你必须是用AbstractScheduledEventExecutor的nanoTime()方法来检索正确的nanoTime。
scheduledTaskQueue是一个PriorityQueue实例,它根据任务的deadlineNanos属性的升序来维护一个任务队列,每次peek能返回最先该被执行的定时任务。deadlineNanos表示系统启动到该任务应该被执行的时间点的时间差,如果“scheduledTask.deadlineNanos() <= nanoTime”则说明该任务的执行时间已经到了,因此将其从scheduledTaskQueue移除,然后通过该方法返回后放入到taskQueue中等待被执行。
因此,可知每次执行taskQueue前,taskQueue中除了有用户自定义提交的任务,系统逻辑流程提交至该NioEventLoop的任务,还有用户自定义或者系统设置的已经达到运行时间点的定时/周期性任务会一并放入到taskQueue中,而taskQueue的初始化容量为1024,最大长度限制为2048,也就是一次事件循环最多只能处理2048个任务。
b) 然后从taskQueue中获取一个待执行的任务,如果获取的task为null,说明本次事件循环中没有任何待执行的任何,那么就执行“afterRunningAllTasks()”后返回。afterRunningAllTasks()方法会依次执行tailQueue中的任务,tailTasks中是用户自定义的一些列在本次事件循环遍历结束后会执行的任务,你可以通过类似如下的方式来添加tailTask:
((NioEventLoop)ctx.channel().eventLoop()).executeAfterEventLoopIteration(() -> {
// add some task to execute after eventLoop iteration
});
② 通过“系统启动到当前的时间差”+“可用于执行任务的时间”=“系统启动到可用于执行任务时间的时间段(deadline)”。从taskQueue中依次出去任务,如果task为null则说明已经没有待执行的任务,那么退出for循环。否则,同步的执行task,每执行64个任务后,就计算“系统启动到当前的时间”是否大于等于了deadline,如果是则说明已经超过了分配给任务执行的时间,此时就不会继续执行taskQueue中的任务了。
a)『safeExecute(task)』
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
通过直接调用task的run方法来同步的执行任务。
b) 『runTasks & 0x3f』:
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
63的16进制表示为0x3f(二进制表示为’0011 1111’),当已经执行的任务数量小于64时,其与0x3f的位与操作会大于0,当其等于64(64的16进制表示为0x40,二进制表示为’0100 0000’)时,runTasks & 0x3f的结果为0。所以是每执行64个任务后就进行一次时间的判断,以保证执行任务队列的任务不会严重的超过我们所设定的时间。
③ 则依次执行tailTasks队列里的所有任务。赋值全局属性lastExecutionTime为最后一个任务执行完后的时间。
到此为止,整个事件循环的流程就已经分析完了。
后记
若文章有任何错误,望大家不吝指教:)
参考
http://www.infoq.com/cn/articles/netty-reliability
《Java 并发编程的艺术》