概述
Netty的IO事件分别为读事件(OP_READ)、写事件(OP_WRITE)、接收事件(OP_ACCEPT)、连接事件(OP_CONNECT)。其中读、写事件可以发生在客户端与服务端。接收事件只发生在服务端,服务端启动后会注册接收事件监听客户端连接。连接事件只发生在客户端,客户端启动时会连接服务端。Netty任务分为普通任务(通过execute(Runnable task) 执行)与定时任务(通过schedule(Runnable task,long delay,TimeUnit unit)执行)。无论是IO事件还是任务,都是通过NioEventLoop中对应的线程来进行处理。
NioEventLoop创建过程
在实例化NioEventLoopGroup时,默认会创建2倍CPU核心数的NioEventLoop。对于bossGroup来说,虽然会创建这么多NioEventLoop,但是如果只绑定一个端口进行事件监听,实际上只会用到一个NioEventLoop,也就是说只有一个线程在循环处理事件与任务。
EventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup UML图:
NioEventLoopGroup无参构造方法最终会调到下面的构造方法:
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
/**
* nThreads:创建线程的数量,如果不传,后续会默认为2倍CPU核心数
* executor:默认为null,在NioEventLoopGroup父类中会进行初始化
* selectorProvider:用于创建Java NIO的Selector对象
* selectStrategyFactory:IO多路复用器策略工厂,值为DefaultSelectStrategyFactory
* RejectedExecutionHandlers.reject():拒绝策略,当线程池任务队列满了后在往其中添加任务会触发该拒绝策略
*/
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
NioEventLoopGroup构造方法中会调用其父类MultithreadEventLoopGroup的构造方法,该构造方法会初始化默认的线程数量,常量DEFAULT_EVENT_LOOP_THREADS值为2倍CPU核心数:
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
MultithreadEventLoopGroup构造方法中会调用其父类MultithreadEventExecutorGroup的构造方法,该构造方法会初始化线程执行选择器工厂,常量DefaultEventExecutorChooserFactory.INSTANCE值为EventExecutorChooserFactory:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
最终会调用MultithreadEventExecutorGroup如下构造方法,在该方法中会循环创建NioEventLoop:
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (executor == null) {
// 创建线程执行器
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 创建EventExecutor数组,用来保存NioEventLoop
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
// ...
// 创建NioEventLoop
children[i] = newChild(executor, args);
// ...
}
// 创建线程选择器
chooser = chooserFactory.newChooser(children);
// ...
}
创建线程执行器
线程执行器ThreadPerTaskExecutor#execute方法内部使用ThreadFactory来创建并启动线程,其中ThreadFactory就是调用其构造方法传入的DefaultThreadFactory,DefaultThreadFactory#newThread方法会创建线程,并设置线程属性,如线程名称等:
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
// 使用线程工厂创建并启动线程
threadFactory.newThread(command).start();
}
}
创建NioEventLoop
调用NioEventLoopGroup#newChild方法进行NioEventLoop的创建:
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
方法内部会调用NioEventLoop的构造方法进行创建:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
// 调用父类构造方法,创建任务队列以及初始化父类属性
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
// 创建IO多路复用器
selector = openSelector();
selectStrategy = strategy;
}
该构造方法中有两个比较重要的操作:一是调用父类构造方法创建任务队列等,二是调用openSelector方法创建IO多路复用器,我们先看openSelector方法:
private Selector openSelector() {
final Selector selector;
try {
// 创建Java Nio的Selector对象
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
// 是否禁用对Java Selector的优化,如果禁用则标识不优化直接返回Java的Selector,默认为false
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
// 创建SelectedSelectionKeySet,底层是数组实现,用于替换Java Selector底层selectedKeys的数据结构
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 通过反射获取到Selector的实现类
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (ClassNotFoundException e) {
// ...
}
}
});
// ...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
// 获取selectedKeys、publicSelectedKeys属性
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
// 通过反射将新的key结构替换原生结构
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
return null;
} catch (NoSuchFieldException e) {
// ...
}
}
});
// 保存selectedKeys属性,后续处理IO事件信息时可以直接通过该属性获取事件信息
selectedKeys = selectedKeySet;
return selector;
}
该方法会创建Java Selector,并通过反射替换其对应的selectedKeys、publicSelectedKeys属性。这里Netty对原生Selector数据结构进行了优化,由原本HashSet实现的数据结构替换为了Netty基于数组实现的数据结构:SelectedSelectionKeySet。
接下来继续跟进,该构造方法中会调用父类的构造方法,对父类属性进行初始化,先看下NioEventLoop UML图:
NioEventLoop父类为SingleThreadEventLoop,在其父类构造方法中会创建一个MpscQueue类型的队列:
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 用来存放收尾工作的任务队列
tailTasks = newTaskQueue(maxPendingTasks);
}
SingleThreadEventLoop构造方法中会调用其父类SingleThreadEventExecutor的构造方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
// 最终会调用AbstractEventExecutor构造函数保存parent,此处parent即为NioEventLoopGroup
super(parent);
// 默认false,当且仅当调用addTask(Runnable)将唤醒执行线程
this.addTaskWakesUp = addTaskWakesUp;
// 最大等待任务数,默认值为Integer.MAX_VALUE
this.maxPendingTasks = Math.max(16, maxPendingTasks);
// 保存线程执行器
this.executor = ObjectUtil.checkNotNull(executor, "executor");
// 创建MpscQueue类型任务队列
taskQueue = newTaskQueue(this.maxPendingTasks);
// 保存拒绝策略
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
NioEventLoop线程启动
服务端NioEventLoop中的线程启动是在channel注册时触发的,我们再来回顾下:
#AbstractChannel#AbstractUnsafe
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ...
// 判断当前线程是否为IO线程,因为当前线程为主线程,且此时IO线程还未创建,所以会走到else方法
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 异步执行任务
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// ...
}
}
}
NioEventLoop#execute方法实现在其父类SingleThreadEventExecutor中:
#SingleThreadEventExecutor
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 当前线程为主线程,且此时IO线程还未创建,inEventLoop方法返回false
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 启动线程
startThread();
// 添加任务
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
// 唤醒线程
wakeup(inEventLoop);
}
}
execute方法会主要分为三步,第一步:调用startThread方法创建并启动线程;第二步:将任务添加到任务队列中;第三步:唤醒线程执行任务。先看下startThread方法:
private void startThread() {
// 线程是否尚未启动
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
// 将ST_NOT_STARTED设置为ST_STARTED
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
// 启动线程
doStartThread();
}
}
}
startThread方法最终会调用doStartThread方法来启动线程:
private void doStartThread() {
assert thread == null;
// 使用线程执行器创建并启动线程
executor.execute(new Runnable() {
@Override
public void run() {
// 保存创建的线程
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 处理IO事件与异步任务
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ...
}
}
});
}
调用executor#execute方法创建并启动线程,这里的executor类型为:ThreadPerTaskExecutor,是实例化NioEventLoopGroup时,在其父类MultithreadEventExecutorGroup中创建的,ThreadPerTaskExecutor#execute方法内部通过线程工厂创建并启动线程。线程启动后主要做两件事情,一:保存创建的线程,二:处理IO事件与异步任务(后面会讲到)。
线程启动后,会调用addTask方法,将任务放到任务队列中,等待线程处理:
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
addTask方法会尝试将任务放入taskQueue中,如果放入失败则会触发拒绝策略。
任务添加到taskQueue中后,会调用wakeup方法唤醒线程,因为此时线程可能因为没有任务而进入到阻塞状态,使用wakeup方法可以将线程从阻塞中唤醒,处理任务。
IO事件与任务处理流程
NioEventLoop中的线程启动后,会一直循环处理IO事件与异步任务:
SingleThreadEventExecutor.this.run();
该方法会调用其子类NioEventLoop的run方法,看下NioEventLoop的run方法:
protected void run() {
for (;;) {
try {
// 检测IO事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
// ioRatio控制IO事件与非IO事件执行时间占比
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
// 处理IO事件
processSelectedKeys();
} finally {
// 处理普通任务与定时任务
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// 根据比率计算非IO事件任务处理事件
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
IO事件与任务处理主要分为三步,第一:检测IO事件,第二:处理IO事件,第三:处理普通任务与定时任务。
检测IO事件
通过SelectStrategy#calculateStrategy方法计算走什么策略,SelectStrategy是在创建NioEventLoop时通过IO多路复用器策略工厂DefaultSelectStrategyFactory进行创建的:
#DefaultSelectStrategy
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
hasTasks值为true,标识taskQueue或者tailQueue中有任务,则调用IntSupplier#get方法,该方法内部会调用Selector#selectNow方法,selectNow方法是一个非阻塞方法,不管有没有IO事件都会立即返回。如果任务队列中没有任务,则直接返回SelectStrategy.SELECT。
如果SelectStrategy#calculateStrategy方法返回SelectStrategy.SELECT,则会尝试将wakenUp属性设置为false,并调用select方法。因为Selector#wakeup方法是一个比较耗时的操作,而用户线程和IO线程都有可能操作该属性,因此使用原子操作防止多个线程重复唤醒。接着看下select方法:
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 计算最近一次要执行的定时任务的最后期限,如果定时任务队列中没有任务则返回当前事件+1s
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
// 计算定时任务将要执行的事件与当前时间的时间差
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
// 如果时间差<0.5s,且没有调用select方法阻塞过,则调用selectNow方法,然后退出循环
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果任务队列有任务,且尝试设置wakenUp属性为true成功,则调用selectNow方法,然后退出循环
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞等待IO事件
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果有其中一种情况则退出循环:有IO事件、外部线程唤醒、任务对立有任务、定时任务队列有任务
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
if (Thread.interrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
// 重建selector,解决JDK空轮询bug
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}
} catch (CancelledKeyException e) {
// ...
}
}
IO事件处理
processSelectedKeys方法,用来处理所有IO事件:
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
Netty默认会开启对Selector的优化,所以会进入processSelectedKeysOptimized方法处理IO事件:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
// 获取事件
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys[i] = null;
// 获取attachment
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
// 处理IO事件
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
// ...
}
}
最终会调用processSelectedKey方法进行IO事件的处理:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// ...
try {
int readyOps = k.readyOps();
// 处理OP_CONNECT事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 处理OP_WRITE事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 处理OP_READ、OP_ACCEPT事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
方法内部会调用Unsafe类的方法进行处理,以OP_ACCEPT事件为例,我们看下其read方法,OP_ACCEPT事件的read方法是在AbstractNioMessageChannel类的内部类NioMessageUnsafe中:
#AbstractNioMessageChannel
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 处理OP_ACCEPT事件
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 触发channelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
// 触发channelReadComplete事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
NioMessageUnsafe#read方法主要做了两件事,一是:调用NioServerSocketChannel#doReadMessages方法处理事件,二是:调用ChannelPipeline发送channelRead、channelReadComplete事件。
NioServerSocketChannel#doReadMessages方法中会调用Java的ServerSocketChannel方法建立连接:
#NioServerSocketChannel
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
到此IO事件处理流程就结束了,真正的事件处理还是由不同的Unsafe类调用对应的channel中的方法来进行处理。
任务处理
runAllTasks方法用来处理普通任务与定时任务:
protected boolean runAllTasks() {
assert inEventLoop();
boolean fetchedAll;
boolean ranAtLeastOne = false;
do {
// 从定时任务队列中获取可调度的任务放入到taskQueue中
fetchedAll = fetchFromScheduledTaskQueue();
// 从taskQueue中取出任务,执行其run方法
if (runAllTasksFrom(taskQueue)) {
ranAtLeastOne = true;
}
} while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.
if (ranAtLeastOne) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
afterRunningAllTasks();
return ranAtLeastOne;
}
runAllTasks方法还是比较容易理解,方法主要做了两件事,一是:将可调度的定时任务从scheduledTaskQueue队列放入到taskQueue队列中,然后循环取出taskQueue中的任务,执行其run方法。