Channel的注册到EventLoop
前文中介绍了服务端ServerBootStrap
绑定端口号时,很重要的一个方法是initAndRegister
,当Channel
初始化完成后,会进行注册:
ChannelFuture regFuture = config().group().register(channel);
group
方法返回NioEventLoopGroup
即bossGroup
。NioEventLoopGroup
继承了MultithreadEventLoopGroup
, 其中有register
方法。
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
进入子类SingleThreadEventLoop
:
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
可以看到,Channel
被包装为ChannelPromise
,它持有Channel
和EventLoop
。
最终进入AbstractUnsafe
的register
方法:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//1.
doRegister();
neverRegistered = false;
registered = true;
//2.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//3.
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
//4.
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
-
AbstractUnsafe
中的doRegister()
方法为空,进入它的子类AbstractNioChannel
:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// java NIO
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
-
javaChannel
方法返回一个SelectableChannel
对象,然后调用register
方法,注册到Selector
。 -
eventLoop().unwrappedSelector()
的类型是Selector
。 - 可以看到
Channel
的注册流程最终使用Java NIO实现。
- 注册完成后,依次调用Handler的回调方法。
Channel、EventLoop的关系
EventLoop
继承了EventExecutor
,可以提交和执行Runnable
任务一个
EventLoopGroup
中会有多个EventLoop
NioEventLoop
的父类SingleThreadEventExecutor
中持有Thread
对象,它负责处理EventLoop
中所有的I/O事件一个
Channel
只会注册到一个EventLoop
中一个
EventLoop
上可以注册多个Channel
由于
EventLoop
由单线程处理I/O事件,因此在Channel的Handler的回调方法中不能执行耗时较长的任务,否则会阻塞其他Channel中事件的处理。-
因此通常在Channel的Handler的回调方法中新启线程池,在新的线程中执行耗时任务。或者在Handler添加到ChannelPipeline时,指定事件执行组EventExecutorGroup(Netty源码分析系列--8. Channel和ChannelPipeline中有介绍)
如下Channel
注册时的代码示例:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
当方法inEventLoop()
返回true
时,直接执行调用register0
,否则作为任务Task
提交到EventLoop
中稍后执行。
方法inEventLoop()
的实现在NioEventLoop
的父类SingleThreadEventExecutor
中,如下:
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
SingleThreadEventExecutor
的父类AbstractEventExecutor
把当前线程对象传入:
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
综上可以看到,inEventLoop()
的逻辑就是判断当前线程是否是NioEventLoop
(SingleThreadEventExecutor
)所持有的那个Thread
对象。如果不是,就提交给NioEventLoop
,让任务稍后由持有的那个Thread
对象执行。