如果说Netty的核心是它的reactor模式,那么EventLoop就是reactor的核心。通过EventLoop的轮询,netty能够高效的在任务中切换。前面几节都讲的Nio相关类,这里就以NioEventLoop为核心分析下,它是如何执行的。
NioEventLoop的继承关系
本质上,EventLoop还是一个Executor,既然是Executor,那么就从execute()函数看起
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//判断eventLoop线程是否启动
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();//启动eventLoop线程
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
private void startThread() {
//判断线程状态
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
...
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();//当前线程作为整个EventLoop的主线程
...
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
...
} finally {
...
try {
// Run all remaining tasks and shutdown hooks.
// 执行剩余的所有任务
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
...
}
}
}
}
});
}
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
...
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
//轮询完SelectKey后执行task
//ioRatio用于设置执行task时间,其语义是io执行时间与任务执行时间之比,如果ioRatio是50(默认),则表示任务执行时间是io执行时间的一半
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
...
}
}
整个run函数可以抽象为select-> processSelectedKeys->runTasks->select...
再看select()做了什么:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;//计数器
long currentTimeNanos = System.nanoTime();
//最近一个任务的截止时间
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {//如果截止时间小于0.5ms
if (selectCnt == 0) {//并且一次select都没有执行过,那么执行一次select并中断
selector.selectNow();
selectCnt = 1;
}
break;
}
//如果有新增任务,则执行select并中断
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
//阻塞式select,超时时间为最近一个任务截止时间
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
//如果有事件上报或有任务/定时任务则退出
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
...
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
...
//用于解决nio空轮询bug
//如果select次数超过阈值则重新生成selector
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
...
} catch (CancelledKeyException e) {
...
}
}
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
processSelectedKey用于具体事件操作
int readyOps = k.readyOps();
//connect事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
//写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
//读事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
runAllTasks用于执行所有可执行任务,包括定时任务
//这里说明runAllTasks(long timeoutNanos)多了一个截止时间,如果运行超过该时间自动中断
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
//找到所有定时任务中可以执行的
fetchedAll = fetchFromScheduledTaskQueue();
//执行所有任务
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll);
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
//把所有可执行的定时任务放到非定时任务的队列中
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {//如果放入taskQueue失败,则重新放回定时任务队列,返回false中断,等下次循环继续执行
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
总结下
1,EventLoop本身是个线程池,拥有线程池属性。EventLoop可以包含定时任务和非定制任务2种
2,EventLoop本身也是一个循环线程,该线程流程是
1)捕捉是否有io事件上报
2)处理这些事件
3)处理线程池中业务逻辑任务
4)返回第一步