上篇分析Netty服务端的处理流程,本篇我们来看看Netty是如何通过ChannelPipeline来处理SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件接收的消息体。
一、先看看几个典型执行器的类继承图
包含两类处理器,分别继承自ChannelInboundHandler和ChannelOutboundHandler。
用户自定义处理器一般继承ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,避免用户直接继承接口需要实现每一个抽象方法。
二、NioServerSocketChannel对应的pipeline中处理器链
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
NioServerSocketChannel初始化完毕后处理器链为:
head ==》ServerBootstrapAcceptor==》tail
找到触发的起始代码:
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
其中readBuf为List<NioSocketChannel>集合,pipeline.fireChannelRead方法逐个处理每一个新建立的连接。
接下来我们按函数调用顺序看执行逻辑:
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
//第一步:AbstractChannelHandlerContext.invokeChannelRead为静态方法
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
//此处就是我们找到的关键逻辑,该方法为head<HeadContext>继承自AbstractChannelHeadlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
//找到下一个处理器,回到第一步
invokeChannelRead(findContextInbound(), msg);
return this;
}
//查找下一个inbound处理器,从这里也可以看出inbound处理器的执行顺序为从head到tail,即addLast添加顺序
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
在此顺便延伸一下outbound的执行顺序,可以看出与inbound相反,代码如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
三、NioSocketChannel对应的pipeline中处理器链
在ServerBootstrapAcceptor中对每一个新建立的连接进行初始化
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
。。。
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
这里的childHandler就是最开始我们在启动引导类配置的bootstrap.childHandler(new ChannelInitializer)。
NioSocketChannel初始化完毕后处理器链为:
head ==》ChannelInitializer==》tail
我们跟踪register方法,最终定位到AbstractChannel.register0函数:
private void register0(ChannelPromise promise) {
try {
。。。
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
。。。
} catch (Throwable t) {
。。。
}
}
这里我们只关注pipeline.invokeHandlerAddedIfNeeded();从注释中我们看到最终应该是调用的handlerAdded(...),
我们继续跟踪代码
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
// We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
// that were added before the registration was done.
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
// This Channel itself was registered.
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
// Null out so it can be GC'ed.
this.pendingHandlerCallbackHead = null;
}
// This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
// holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
// the EventLoop.
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
接下来看看this.pendingHandlerCallbackHead;的实现逻辑
private final class PendingHandlerAddedTask {
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
remove0(ctx);
ctx.setRemoved();
}
}
}
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
boolean removed = false;
try {
remove0(ctx);
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
initChannel(ctx);
}
}
initChannel就是执行的就是我们在引导类覆盖实现的逻辑:
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decode", new MessageDecoder());
pipeline.addLast("encode", new ClientMessageEncoder());
}
});
这个时候NioSocketChannel的处理器链为:
head ==》ChannelInitializer==》MessageDecoder==》ClientMessageEncoder==》tail
会发现ChannelInitializer的作用只是用来添加处理器链,并不涉及消息体的处理,应该移除。
这就是remove0的处理,移除自身,代码如下:
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
最终NioSocketChannel的处理器链为:
head ==》MessageDecoder==》ClientMessageEncoder==》tail
至此我们基本理清了ChannelPipeline的处理机制。
转载请备注原文链接。