NioEventLoop是实现Reactor模型的非常重要的一个类。它是一个Loop循环线程,Loop的核心可以看下它的run()
方法:1)执行Selector的select;2)执行一些task。下边的这张图片可以简单的描述Loop循环的操作。
1.私有变量及构造函数
简单看一下NioEventLoop的私有变量以及它的构造函数:
public final class NioEventLoop extends SingleThreadEventLoop {
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
具体分析一下,selector
和unwrappedSelector
、selectedKeys
通过在构造函数中调用openSelector()
产生,简单说一下这会跟netty的select优化有关系,后边会单独分析一下。provider
提供的是select的方式,类型是java.nio.channels.spi.SelectorProvider
,实现方式有KQueueSelectorProvider /PollSelectorProvider等。selectStrategy
和ioRatio
会在下边讲解run()
方法时进行介绍。
下面摘抄了一下openSelector()
方法的实现。首先可以看到:用户可以通过io.netty.noKeySetOptimization开关决定是否启用该优化项,默认是关闭的,此时selector和unwrappedSelector 都指向同一个Selector对象。如果开启优化,那么会在该方法中实例化selector
和unwrappedSelector
、`selectedKeys这些实例变量。
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
//如果开启了优化开关,需要通过反射的方式从Selector实例中获取selectedKeys和publicSelectedKeys,
//将上述两个成员变量设置为可写,通过反射的方式使用Netty构造的selectedKeys包装类selectedKeySet将原JDK的selectedKeys替换掉。
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
......
return new SelectorTuple(unwrappedSelector);
}
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
......
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (Exception e) {
return e;
}
}
});
......
selectedKeys = selectedKeySet;
return new SelectorTuple(unwrappedSelector,
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
}
2.run()方法
首先贴一下run()
方法的核心代码,后边我们会结合着代码一步一步分析其到底做了哪些工作。总括一下:
1.ioEventLoop执行的任务分为两大类:IO任务和非IO任务。IO任务即selectionKey中ready的事件,譬如accept、connect、read、write等;非IO任务则为添加到taskQueue中的任务,譬如register0、bind、channelActive等任务
2.两类任务的执行先后顺序为:IO任务->非IO任务。
3.两类任务的执行时间比由变量ioRatio控制,譬如:ioRatio=50(该值为默认值),则表示允许非IO任务执行的时间与IO任务的执行时间相等。
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
2.1selectStrategy
首先关注下selectStrategy做了什么事情。下边的代码标注了selectNowSupplier
的实例化方法以及DefaultSelectStrategy
的实现逻辑:如果taskQueue中有元素,执行 selectNow()
方法,最终执行selector.selectNow()
,该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp)
方法。
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
final class DefaultSelectStrategy implements SelectStrategy {
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
}
public interface SelectStrategy {
/**
* Indicates a blocking select should follow.
*/
int SELECT = -1;
/**
* Indicates the IO loop should be retried, no blocking select to follow directly.
*/
int CONTINUE = -2;
/**
* The {@link SelectStrategy} can be used to steer the outcome of a potential select
* call.
*
* @param selectSupplier The supplier with the result of a select result.
* @param hasTasks true if tasks are waiting to be processed.
* @return {@link #SELECT} if the next step should be blocking select {@link #CONTINUE} if
* the next step should be to not select but rather jump back to the IO loop and try
* again. Any value >= 0 is treated as an indicator that work needs to be done.
*/
int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception;
}
那么 selectNow()
和 select(oldWakenUp)
之间有什么区别呢? 首先来看一下selectNow()
的源码。方法中:首先调用了 selector.selectNow()
方法检查当前是否有就绪的 IO 事件, 如果有, 则返回就绪 IO 事件的个数; 如果没有, 则返回0. 注意, selectNow() 是立即返回的, 不会阻塞当前线程. 当selectNow()
调用后, finally 语句块中会检查 wakenUp 变量是否为 true, 当为 true 时, 调用 selector.wakeup()
唤醒 select()
的阻塞调用.
int selectNow() throws IOException {
try {
return selector.selectNow();
} finally {
// restore wakeup state if needed
if (wakenUp.get()) {
selector.wakeup();
}
}
}
我们再来看下另外的分支select(oldWakenUp)
方法.其实select(oldWakenUp)
方法的处理逻辑比较复杂, 这里就不用列出,简单指出一下在这个 select 方法中, 调用了 selector.select(timeoutMillis)
, 而这个调用是会阻塞住当前线程的,重要的是Netty这个方法解决了Nio中臭名昭著的bug:selector的select方法导致cpu100%。
2.2I/O任务
在NioEventLoop的run()
方法中执行I/O任务的主要函数就是processSelectedKeys()
,会根据是否采用Selector优化选择执行不同的方法。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
首先看一下没有做优化的process处理流程.大致流程是遍历唤醒的SelectionKey,然后取出对应key注册的attachment。在这里attachment可能会有两种类型:1是AbstractNioChannel
,说明对应的Channel有相应的状态变更,接下来会根据SelectionKey的readyOps值进行相应的accpet、read、write操作;2是NioTask<SelectableChannel>
,说明是一个NioTask,也会进行相应的操作。
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
关于做过优化的processSelectedKeysOptimized的处理流程与上边的大致是一致的,只是要处理的SelectedKeys对象有所不同,processSelectedKeysOptimized处理的SelectedKeys对象是构造函数初始化的时候通过openSelector实例化的selectedKeys。
2.3非I/O任务
如果 ioRatio 不为100时,方法runAllTasks的执行时间只能为ioTime * (100 - ioRatio) / ioRatio,其中ioTime 是方法processSelectedKeys的执行时间。
方法fetchFromScheduledTaskQueue把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行。
依次从taskQueue任务task执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}