本文以常见的NioEventLoop为切入点分析Netty的EventLoop,NioEventLoop的类层次结构如下图所示,下面将按照类层次结构自底向上依次分析。
AbstractEventExecutor类
AbstractEventExecutor类实现了EventExecutor接口,比较重要的是parent成员变量,用于引用该EventExecutor所属的EventExecutorGroup,以schedule开头的调度方法都抛出了UnsupportedOperationException,子类需要覆盖这些方法。
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
private final EventExecutorGroup parent;
// 省略一些代码
protected AbstractEventExecutor() {
this(null);
}
protected AbstractEventExecutor(EventExecutorGroup parent) {
this.parent = parent;
}
@Override
public EventExecutorGroup parent() {
return parent;
}
@Override
public EventExecutor next() {
return this;
}
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay,
TimeUnit unit) {
throw new UnsupportedOperationException();
}
// 省略一些代码
}
AbstractScheduledEventExecutor类
AbstractScheduledEventExecutor类继承了AbstractEventExecutor类,实现了调度方法以支持任务调度。
SingleThreadEventExecutor类
SingleThreadEventExecutor类继承了AbstractScheduledEventExecutor类,从类名可以看出该EventExecutor是单线程的。
成员变量和构造函数
该类重要的成员变量和构造函数如下所示:
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16,
SystemPropertyUtil.getInt("io.netty.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
private final Queue<Runnable> taskQueue;
private volatile Thread thread;
private volatile ThreadProperties threadProperties;
private final Executor executor;
private volatile boolean interrupted;
// 省略一些代码
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
}
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
// 省略一些代码
}
- taskQueue是任务队列,用来存放需要调度执行的任务;
- thread用来引用支撑该EventExecutor的线程,用来处理I/O事件和执行任务,叫支撑线程或者I/O线程均可;
- executor用来引用线程池,thread所引用的线程即来自这里。
inEventLoop方法
其父类AbstractEventExecutor类中的inEventLoop()实现调用了inEventLoop(Thread.currentThread()),以下是其实现:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
可以看到是用该EventExecutor所绑定的线程与参数去做比较,所以inEventLoop()的语义较为明晰:如果当前运行的线程是EventExecutor的支撑线程则返回true,否则返回false。这一方法在DefaultChannelPipeline和AbstractChannelHandlerContext类中意义重大,如《Netty实战》7.4.1节所述:
Netty线程模型的卓越性能取决于对于当前执行线程的身份的确定,也就是说,确定它是否是分配给当前Channel以及它的EventLoop的那一个线程。
如果(当前)调用线程正是支撑EventLoop的线程,那么所提交的代码块将会被(直接)执行。否则,EventLoop将调度该任务以便稍后执行,并将它放入到内部队列中。当EventLoop下次处理它的事件时,它会执行队列中的那些任务。这也就解释了任何的线程是如何与Channel直接交互而无需在ChannelHandler中进行额外同步的。
任务执行与线程绑定
execute方法实现了Java并发包Executor的接口方法,用来执行任务(如通道注册等):
- addTask将任务添加到队列中,如果不能添加则拒绝;
- 如果当前运行的线程不是所绑定的线程则调用startThread方法为本EventExecutor绑定支撑线程。
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
从下面的部分代码可以看到,如果EventExecutor的状态为ST_NOT_STARTED,那么先修改状态然后调用doStartThread方法为本EventExecutor绑定线程,断言指出此时thread必须为null,表明当前EventExecutor还未绑定任何线程。绑定任务交由线程池调度执行,线程池中执行该任务的线程被绑定到EventExecutor上,绑定的线程会运行SingleThreadEventExecutor的run抽象方法。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 省略一些代码
}
}
});
}
protected abstract void run();
从上述线程绑定过程不难理解《Netty实战》3.1.2所述:
一个EventLoop在它的生命周期内只和一个线程绑定;
所有由EventLoop处理的I/O事件都将在它专有的线程上被处理。
SingleThreadEventLoop类
SingleThreadEventLoop类继承了SingleThreadEventExecutor类并实现了EventLoop接口,增加了与通道注册有关的register等方法,这正是前文末尾提到的将通道注册到EventLoop上的关键。
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
register方法会调用Channel接口的内部接口Unsafe的register方法,以服务端的NioServerSocketChannel为例,它的unsafe方法会返回AbstractNioMessageChannel类的内部类NioMessageUnsafe,其register方法定义在其父类AbstractUnsafe中:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// 省略一些代码
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
// 省略一些代码
}
}
}
AbstractChannel.this.eventLoop = eventLoop; 这一行将Channel注册到EventLoop。此时不难理解《Netty实战》3.1.2节所述:
一个Channel在它的生命周期内只注册于一个EventLoop;
一个EventLoop可能会被分配给一个或多个Channel。
《Netty实战》4.2节提到了Channel的线程安全性,这是因为对Channel的I/O操作都在Channel注册的EventLoop上运行且一个EventLoop只和一个线程绑定:
Netty的Channel是线程安全的,因此你可以存储一个到Channel的引用。
《Netty实战》7.4.2节提到了一个潜在的陷阱:
需要注意的是,EventLoop的分配方式对ThreadLocal的使用的影响。因为一个EventLoop通常会被用于支撑多个Channel,所以对于所有相关的Channel来说,ThreadLocal都将是一样的。
通道注册
将通道注册到EventLoop后接着会调用register0方法完成通道注册过程,AbstractChannel的内部类AbstractUnsafe的register0方法如下:
private void register0(ChannelPromise promise) {
try {
// 省略一些代码
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 省略一些代码
} catch (Throwable t) {
// 省略一些代码
}
}
- doRegister是AbstractChannel的方法,由子类覆盖实现;
- NioServerSocketChannel的doRegister方法在父类AbstractNioChannel中定义如下,通道利用该方法注册到NioEventLoop的Selector上并将通道自身关联到结果键上。
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { // 省略一些代码 } } }
NioServerSocketChannel的感兴趣集
由上面的方法可以看到,注册时提供的感兴趣集是0,但NioServerSocketChannel构造函数传入的参数明明是SelectionKey.OP_ACCEPT,那么可连接是如何被设置到Selector的感兴趣集上的呢?答案是在通道绑定本地地址时,这个问题留在后面的文章分析。
NioEventLoop类
NioEventLoop类继承了SingleThreadEventLoop类,添加了与NIO和Selector相关的代码,以实现将通道注册到Selector上。
成员变量与构造函数
NioEventLoop类重要的成员变量和构造函数如下所示:
public final class NioEventLoop extends SingleThreadEventLoop {
private static final boolean DISABLE_KEYSET_OPTIMIZATION =
SystemPropertyUtil.getBoolean("io.netty.noKeySetOptimization", false);
// 省略一些代码
/**
* The NIO {@link Selector}.
*/
private Selector selector;
private Selector unwrappedSelector;
private SelectedSelectionKeySet selectedKeys;
private final SelectorProvider provider;
/**
* Boolean that controls determines if a blocked Selector.select should
* break out of its selection process. In our case we use a timeout for
* the select method and the select method will block for that time unless
* waken up.
*/
private final AtomicBoolean wakenUp = new AtomicBoolean();
private final SelectStrategy selectStrategy;
private volatile int ioRatio = 50;
private int cancelledKeys;
private boolean needsToSelectAgain;
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
}
- selector和unwrappedSelector分别表示优化过的Selector和未优化过的Selector,selectedKeys表示优化过的SelectionKey。Netty在该类中对Java NIO的Selector做了优化,可以通过设置系统属性io.netty.noKeySetOptimization进行修改,设置为true、yes或者1关闭优化,设置为false、no或者0开启优化,默认开启优化。
- 如果启用优化,那么selector和unwrappedSelector相同,selectedKeys为null;
- 如果关闭优化,那么selector和unwrappedSelector不同,selectedKeys不为null。
- 在构造函数的参数中,parent即为创建NioEventLoop的NioEventLoopGroup实例,其余均由NioEventLoopGroup的newChild方法提供,newChild提供的参数则是来自于NioEventLoopGroup的构造函数。
多路复用
上文提到通道利用AbstractNioChannel类的doRegister方法注册到NioEventLoop的Selector上并将通道自身关联到结果键上,那么NioEventLoop的Selector是如何处理通道的呢?在分析SingleThreadEventExecutor类时,我们知道绑定的线程会运行SingleThreadEventExecutor的run抽象方法,NioEventLoop恰恰重写了SingleThreadEventExecutor类的抽象方法run:
@Override
protected void run() {
for (;;) {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// 省略一些代码
}
}
- 死循环会使支撑线程(I/O线程)一直运行run方法;
- select(wakenUp.getAndSet(false)); 执行真正的Java NIO select操作;
- processSelectedKeys方法处理select返回的已选择键。
processSelectedKeys方法代码如下所示。该方法判断是否启用了Selector优化,如果启用则交由processSelectedKeysOptimized方法,否则交由processSelectedKeysPlain方法。
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
以processSelectedKeysPlain为例,上文提到通道会将自己关联到结果键上,用处是在这里用SelectionKey的attachment方法取出通道:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
通道对象被取出后便可以根据按位与执行相应的操作,当通道可读或者可连接时,调用Unsafe的read方法,其他操作类似:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略一些代码
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
实例 NioServerSocketChannel
以NioServerSocketChannel为例,当可连接时会调用AbstractNioMessageChannel的内部类NioMessageUnsafe的read方法:
@Override
public void read() {
// 省略一些代码
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// 省略一些代码
} finally {
// 省略一些代码
}
}
doReadMessages是AbstractNioMessageChannel类的抽象方法,NioServerSocketChannel重写了该方法:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// 省略一些代码
}
return 0;
}
doReadMessages方法首先接受了连接,然后将已连接套接字通道封装成消息加到了buf中,返回了数量1。在上文的read方法中,pipeline.fireChannelRead(readBuf.get(i)) 触发了入站可读事件,这也就可以解释之前提到的ServerBootstrapAcceptor的channelRead方法的消息参数为什么是一个通道的问题了。