本文分析NioServerSocketChannel构造函数传入的兴趣集SelectionKey.OP_ACCEPT参数是在何处使用的,以完善对NioEventLoop的分析。
构造函数
NioServerSocketChannel类层次结构如下图所示,下面自顶向下分析各类的构造函数。
- NioServerSocketChannel构造函数如下所示,可见其调用了父类构造函数。
public NioServerSocketChannel() { this(newSocket(DEFAULT_SELECTOR_PROVIDER)); } public NioServerSocketChannel(SelectorProvider provider) { this(newSocket(provider)); } public NioServerSocketChannel(ServerSocketChannel channel) { super(null, channel, SelectionKey.OP_ACCEPT); config = new NioServerSocketChannelConfig(this, javaChannel().socket()); }
- NioServerSocketChannel的父类AbstractNioMessageChannel只有一个构造函数,也是简单地调用了它的父类的构造函数。
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent, ch, readInterestOp); }
- AbstractNioChannel是AbstractNioMessageChannel的父类,构造函数中为成员变量赋值并将通道设置成非阻塞模式。
public abstract class AbstractNioChannel extends AbstractChannel { private final SelectableChannel ch; protected final int readInterestOp; volatile SelectionKey selectionKey; private ChannelPromise connectPromise; private ScheduledFuture<?> connectTimeoutFuture; private SocketAddress requestedRemoteAddress; protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) { super(parent); this.ch = ch; this.readInterestOp = readInterestOp; try { ch.configureBlocking(false); } catch (IOException e) { // 省略一些代码 } } // 省略一些代码 }
- AbstractNioChannel类的构造函数接着调用了其父类AbstractChannel的构造函数,新建了Unsafe实例和流水线。
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }
引导过程
让我们回顾引导绑定的过程,以下代码为AbstractBootstrap类的doBind方法,该方法主要做了如下工作:
- initAndRegister()方法初始化通道,并将通道注册到EventLoopGroup的一个EventLoop上,在这个过程中会调用AbstractChannel的doRegister()抽象方法,NioServerSocketChannel的doRegister()方法定义在AbstractNioChannel类中:
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
- 将通道绑定到配置的本地地址上:
private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }
- 在AbstractBootstrap绑定本地地址的过程中,if分支表示注册恰好结束马上就调用了doBind0方法,而else分支则是在注册结束后调用doBind0方法。doBind0方法会在与通道绑定的EventLoop中调用Channel的bind方法,接下来就与引导没有直接关系了。
private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }
为通道绑定本地地址
上文调用Channel的bind方法实际上调用的是AbstractChannel的bind方法,该方法接着调用了DefaultChannelPipeline的bind方法:
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
以下代码为DefaultChannelPipeline的bind方法,可以看到是从流水线的尾节点执行绑定操作。
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
流水线的尾节点是TailContext类型,bind方法在它的父类AbstractChannelHandlerContext中定义如下,从前面的文章可知findContextOutbound()是找到该上下文前面的第一个出站处理器(出站的前面是沿着流水线的双向链表往左找),因此final AbstractChannelHandlerContext next = findContextOutbound(); 这一句中的next变量指向了流水线的头节点。
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
头节点类型是HeadContext,调用invokeBind方法,其handler()方法调用返回其自己,紧接着调用了HeadContext类的bind方法。
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
HeadContext类的bind方法如下所示,unsafe引用通道的unsafe。
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
NioServerSocketChannel的unsafe是AbstractUnsafe类型,其bind方法如下所示。isActive方法在doBind方法执行之前会返回false,执行之后则会返回true,因此触发通道激活事件。
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// 省略一些代码
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
DefaultChannelPipeline的fireChannelActive()方法如下所示,激活事件从头节点开始传播,接着会调用头节点自身的handler()方法返回自己,然后执行HeadContext类的channelActive回调函数。
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
头节点的激活事件回调函数如下所示,自动读被默认配置成开启,因此会执行对应通道的读操作。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
AbstractChannel的读方法如下所示,委托给了流水线DefaultChannelPipeline:
@Override
public Channel read() {
pipeline.read();
return this;
}
而DefaultChannelPipeline的读操作代码表明读从尾节点开始:
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
尾节点的读操作定义在其父类AbstractChannelHandlerContext中,与前文类似,findContextOutbound()往前找到第一个出站处理器,因此next变量指向头节点,头节点调用handler()返回自己,触发读操作read:
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
头节点HeadContext的读操作如下所示,依然是委托给了unsafe:
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
AbstractUnsafe类的beginRead()方法如下,其中doBeginRead()是AbstractChannel类的抽象方法。
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
NioServerSocketChannel的doBeginRead()方法定义在其父类AbstractNioChannel中,readInterestOp成员变量正是构造函数传入的SelectionKey.OP_ACCEPT,调用带参数的interestOps方法为通道重新设置了兴趣集,即对可接受事件感兴趣。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}