前言
本文将会具体分析NioEventLoop中的thread,它的启动时机,以及所履行的职责。还会分析一些netty的实现细节,比如解决NIO的bug和一些优化等。
thread启动
之前说到NioEventLoop是由一个thread处理I/O事件和提交的任务。先看一下这个thread启动的流程。
execute 简化流程
private void execute(Runnable task, boolean immediate) {
//是当前线程调用,直接加入队列
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//启动线程
startThread();
// ......
}
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
可以看出启动thread是一个延迟加载的过程,在执行第一个任务的时候才会启动thread。跟进去看startThread()
private void startThread() {
//判断线程状态是否已启动
if (state == ST_NOT_STARTED) {
//CAS设置线程状态为已启动
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
boolean success = false;
try {
//真正去启动线程
doStartThread();
success = true;
} finally {
if (!success) {
STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);
}
}
}
}
}
doStartThread
private void doStartThread() {
assert thread == null;
//调用传入参数的executor的execute方法,
//executor会新建一个线程去执行任务
executor.execute(new Runnable() {
@Override
public void run() {
//将执行该任务的线程赋值给thread
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
//执行任务
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ......
}
}
前文分析了executor为ThreadPerTaskExecutor,执行execute方法时候为新建一个线程去执行任务,NioEventLoop的thread就是在此时赋值。
thread的启动流程简化为,首先thread启动是一个懒加载的过程,在第一次执行任务才会启动。在启动的过程中,会有一个CAS的状态判断当前线程是否已经被启动,如果thread没有启动,则通过传入的executor对象去创建thread对象,并执行SingleThreadEventExecutor.this.run()这个方法。
下面分析SingleThreadEventExecutor.this.run()这个方法,
/**
* Run the tasks in the {@link #taskQueue}
*/
protected abstract void run();
可以看见是一个抽象方法,然后找到文本分析的NioEventLoop对于run的实现,这里做一个将代码做一个简化,只有主要流程
protected void run() {
int selectCnt = 0;
for (; ; ) {
//1、检测IO事件
select();
try {
//2、处理准备就绪的IO事件
processSelectedKeys();
} finally {
// 3、执行队列里的任务
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
}
NioEventLoop的职责只有三个,1、检测IO事件 ;2、处理准备就绪的IO事件;3、执行队列里的任务,用一个死循环去不断执行这三件事情。如之前画的图所示:
接下来就着重分析这三个步骤。
select
select步骤的核心是调用通过NIO中的selector的select()方法,返回selector上所监听到IO事件。
case SelectStrategy.SELECT:
// 获取当前任务队列的延迟时间
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) {
curDeadlineNanos = NONE; // nothing on the calendar
}
nextWakeupNanos.set(curDeadlineNanos);
try {
//当前任务队列为空,监听IO事件
if (!hasTasks()) {
strategy = select(curDeadlineNanos);
}
} finally {
// This update is just to help block unnecessary selector wakeups
// so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
select方法
private int select(long deadlineNanos) throws IOException {
if (deadlineNanos == NONE) {
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
流程整体比较简单,如果时间参数deadlineNanos为NONE,就调用selector.select()方法,这个方法会一直阻塞直到有IO事件返回。否则再判断deadlineNanos是否小于等于0,如果是调用selectNow()会立即返回当前selector上准备就绪的IO事件,否则调用selector.select(timeoutMillis)方法,会在指定时间内返回,不管是否有IO事件发生。然后跟select()方法,找到实现类io.netty.channel.nio.SelectedSelectionKeySetSelector,
public int select() throws IOException {
selectionKeys.reset();
return delegate.select();
}
一共有两步操作,第一步是将之前的selectionKeys清空,检测到就绪的IO事件都会放入selectionKeys中,这里表示新的一轮IO循环开始,所以要将之前的清空(selectionKeys后续会在详细介绍)。第二步是调用NIO中的Selector对象的select(),将最后底层的IO实现委托给它。
processSelectedKeys
processSelectedKeys这一步将会处理监测到的IO事件,比如连接、读写的IO操作。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
这里有个细节,处理优化过后的selectedKeys还是处理原生的selectedKeys。所谓优化的selectedKeys就是将原生的selectedKeys的HashSet替换成数组实现,提高空间利用率和遍历的效率,待会儿会详细将到是怎么替换的selectedKeys。
然后跟进去看processSelectedKeysOptimized()的具体实现:
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
整体流程就是在遍历selectedKeys,将绑定在SelectionKey上的Channel取下来,然后做对应的IO操作,最后再判断是否需要重置selectedKeys。下面我会逐步分析里面的细节
第一: selectedKeys.keys[i] = null;
将SelectionKey取出之后把数组这个位置的地方置为null。为什么这么做?https://github.com/netty/netty/issues/2363描述的很清楚,简单来说就是我们并不会去清空selectedKeys数组,这就会导致在Channel关闭之后,依然会保持SelectionKey的强引用。
如上图所示,假如数组原有长度为2,一次高峰期的IO事件导致数组扩容到8,之后新的IO事件的数量又达不到之前数组的位置,为导致上图坐标[6]、[7]位置会长时间持有已经关闭的Channel的引用,所以这里将其置为null,有助于GC。
第二: processSelectedKey
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
首先是将SelectionKey绑定的属性取下来,判断是否是AbstractNioChannel的类型。这里可以追踪一下netty是什么时候将AbstractNioChannel设置进去的。在AbstractNioChannel的doRegister方法
//最后一个参数就是att
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
其Channel注册到底层jdk的组件中,然后将AbstractNioChannel作为参数传递进去,后续轮询出IO事件之后,再将AbstractNioChannel取出做后续操作。
具体处理IO事件
processSelectedKey(SelectionKey k, AbstractNioChannel ch)
这里贴一点核心流程,主要是判断当前Channel的操作类型,是连接还是读、写
int readyOps = k.readyOps();
//连接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
//读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
这里面的内部流程就不具体分析了,大致分为两个部分bossGroup监听的连接事件,将接受到的Channel转交给workGroup,然后workGroup处理读写事件,然后将事件通过ChannelPipeline将事件传播出去。具体细节可以看AbstractNioMessageChannel和AbstractNioByteChannel的read()方法,后续可能会具体分析这里的代码。
第三: needsToSelectAgain
最后一个步骤,重新设置selectedKeys
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
什么时候需要重新select?找到needsToSelectAgain被设置为true的地方,只有唯一的一处cancel
void cancel(SelectionKey key) {
key.cancel();
cancelledKeys ++;
if (cancelledKeys >= CLEANUP_INTERVAL) {
cancelledKeys = 0;
needsToSelectAgain = true;
}
}
然后看cancel被调用的地方doDeregister
protected void doDeregister() throws Exception {
eventLoop().cancel(selectionKey());
}
由上面的两部分代码分析可以知道,channel的关闭是通过移除在selector上的注册实现的,同时会把cancelledKeys加一 。当达到了阈值CLEANUP_INTERVAL(默认256)后将cancelledKeys重置为0、needsToSelectAgain 为true。
当needsToSelectAgain 为true之后,有两个步骤:
1.selectedKeys清空 -> selectedKeys.reset(i + 1);
void reset(int start) {
Arrays.fill(keys, start, size, null);
size = 0;
}
- 再次填充selectedKeys ->selectAgain
private void selectAgain() {
needsToSelectAgain = false;
try {
selector.selectNow();
} catch (Throwable t) {
logger.warn("Failed to update SelectionKeys.", t);
}
}
至于为什么需要重新去填充selectedKeys,可能是需要保持selectedKeys里面的Channel都随时保持的是活跃的。
processSelectedKeys到这就分析完了,总共分为三步
- 遍历selectedKeys
- 处理IO事件
- 是否需要重置selectedKeys
ranTasks
现在分析thread的最后一步工作ranTasks,执行队列里的任务。
1. 任务类型
NioEventLoop里的任务类型分为两部分,一个是由taskQueue(MpscUnboundedArrayQueue)存放普通的任务,还有一个scheduledTaskQueue存放定时任务的队列。之前分析过EventLoop继承自ScheduledExecutorService,所以也需要提供执行定时任务的功能,而这里的定时任务是通过PriorityQueue来实现的。(定时任务的实现方式有很多,优先队列只是其中一种)ranTasks执行的任务其实就是两部分的内容,一个是普通队列中的任务和定时队列中的任务。
2. ioRatio
在分析执行细节之前,在提一个很重要的参数ioRatio,代表设置事件循环中I/O所需时间的百分比,意思就是在一次循环中,处理IO事件的时间与处理队列任务所占时间做一个百分比的分配,范围是1到100,当设置为100时,这个参数就失效了,默认参数为50。下面代码就是对ioRatio的使用
//等于100的时候,参数失效,不再平衡IO事件所占时间的比例
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
//开始执行IO事件的时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 获得IO执行总共耗时
final long ioTime = System.nanoTime() - ioStartTime;
//按照ioRatio计算出将花费多少时间执行ranTasks
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
3. runAllTasks
protected boolean runAllTasks(long timeoutNanos) {
//将scheduledTaskQueue队列中的任务转移到taskQueue中
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
//任务为空结束
if (task == null) {
afterRunningAllTasks();
return false;
}
//计算本次执行任务最迟的时间
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
//执行任务
safeExecute(task);
runTasks ++;
//每执行64个任务之后判断时间是否超出,若超出结束循环
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//没有任务结束循环
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
主要流程为
- scheduledTaskQueue队列中的任务转移到taskQueue中;
- 安全的执行任务(其实就是将任务try catch,以免任务执行发生异常,影响其他任务执行);
protected static void safeExecute(Runnable task) {
try {
task.run();
} catch (Throwable t) {
logger.warn("A task raised an exception. Task: {}", task, t);
}
}
- 每执行64个任务之后判断执行时间是否超出deadline,这里采用64个任务为一个批次,没有每次任务执行去判断,也是对性能的一个优化;
- 执行afterRunningAllTasks方法,其实就是执行tailTasks队列中的任务,然后记录一下最后的执行时间this.lastExecutionTime = lastExecutionTime;
一些细节
selectedKeySet
前面提到过netty将NIO中Selector的selectedKeys替换,这里分析一下为什么需要替换和么去替换的selectedKeys。
- 为什么替换
NIO原生的selectedKeys使用的是HashSet,而NioEventLoop将其替换成了SelectedSelectionKeySet
//SelectorImpl
protected Set<SelectionKey> selectedKeys = new HashSet();
//NioEventLoop
private SelectedSelectionKeySet selectedKeys;
SelectedSelectionKeySet构造函数
SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
SelectedSelectionKeySet使用的是数组存储元素,而HashSet是基于HashMap去存储数据,采用数组使得空间利用率和遍历的效率有所提高。
2.怎么替换
要在运行时替换掉类的属性,很明显是通过反射来做到的。
- 获取sun.nio.ch.SelectorImpl Class对象
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
- 创建selectedKeySet
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
- 设置属性
//获取属性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
......
//将selectedKeySet设置到属性中
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
NIO空轮询bug
NIO有一个很出名的bug就是epoll空轮询的bug,这会导致CPU占有率到100%,java也并没有修复这个bug,netty采用了一个很巧妙的方法来绕过这个bug。
主要思想就是,通过检测发生空轮询的次数,当超过一定的阈值之后,netty将会重新创建一个selector,并将之前selector上的channel转移到新的selector上。通过重新创建selector的方式来解决NIO空轮询的bug。
unexpectedSelectorWakeup
//空轮询的次数超过阈值,默认为512
if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
//重新构建selector
rebuildSelector();
return true;
}
跟进去找到具体的实现方法rebuildSelector0
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
try {
//创建新的selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
//将旧的selector上的channel全部注册到新的selector上
}
//赋值
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// 关闭旧的selector
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
总结
本文分析NioEventLoop中所对应的唯一的thread,启动是一个懒加载的过程,当第一次任务执行的时候才会初始化。后续thread开始循环处理三件事件
- 检测IO事件 ;
- 处理准备就绪的IO事件;
- 执行队列里的任务
本文也对具体的代码进行了分析,还有一些netty对NIO的优化和bug处理,当然netty的精妙之处远不止本文分析的这些,更多的还需要自己去探索和学习。