导读
原创文章,转载请注明出处。
本文源码地址:netty-source-code-analysis
本文所使用的netty版本4.1.6.Final:带注释的netty源码
Pipeline这个词翻译过来就是“流水线”的意思,读到这里有了解过设计模式的同学应该已经想到了,这里用到的是“责任链模式”。本文我们将以DefaultChannelPipeline为例看一下Pipeline的构造以及其中重要的数据结构。
1 和Pipeline相关的其他组件
1.1 ChannnelHandler
这是ChannelHandler中的注释,翻译过来就是“处理IO事件或者拦截IO操作,并且将其向ChannelPipeline中的下一个handler传递”,说白了就是在责任链中注册的一系列回调方法。
Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
its ChannelPipeline
这里的I/O event就是很多书中提到的“入站事件”,而I/O operation就是很多书中提到的“出站事件”,前面我说过,这里我并不准备这么叫,按我的理解我习惯把这两者称之为“事件”和“命令”。很显然这里event和operation的含义是不一样的,event更多地多表示事件发生了,我们被动地收到,而operation则表示我们主动地发起一个动作或者命令。
1.2 ChannelHandlerContext
每一个ChannelHandler在被添加进ChannelPipeline时会被包装进一个ChannelHandlerContext。有两个特殊的ChannelHandlerContext除外,分别是HeadContext和TailContext,HeadContext继承了ChannelInBoundHandler和ChannelOutBoundHandler,而TailContext继承了ChannelInBoundHandler。
每个ChannelHandlerContext中有两个指针next和prev,这是用来将多个ChannelHandlerContext构成双向链表的。
2 Pipeline的构造方法
我们以DefaultChannelPipeline为例,从它的构造方法开始。这里首先将Channel保存到Pipeline的属性中,又初始化了两个属性succeedFuture和voidPromise。这是两个特殊的可以共享的Promise,这两个Promise不是重点,不理解也没关系。
接下来的tail和head是两个特殊的ChannelHandlerContext,这两个是Pipeline中的重要组件。
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Pipeline在执行完构造方法以后的结构如下图所示,head和tail构成了最简单的双向链表。

图中蓝色填充的就是ChannelHandlerContext,目前只有HeadContext和TailContext,ChannelHandlerContext中的较窄的矩形表示ChannelHandler,由于HeadContext和TailContext并没有包含ChannelHandler,而是继承ChannelHandler,所以这里我们用虚线表示。上下贯通的ChannelHandler表示既是ChannelInBoundHandler又是ChannelOutBoundHandler,只有上半部分的表示是ChannelInBoundHandler,只有下半部分的表示是ChannelOutBoundHandler。
3 添加ChannelHandler
在ChannelPipeline中有很多以add开头的方法,这些方法就是向ChannelPipeline中添加ChannelHandler的方法。
-
addAfter:向某个ChannelHandler后边添加 -
addBefore:向某个ChannelHandler前面添加 -
addFirst:添加到头部,不能在head的前面,而是紧挨着head,在head的后面 -
addLast:添加到尾部,不能在tail的后面,而是紧挨着tail,在tail的前面
我们以最常用的的addLast方法为例来分析一下Pipeline中添加ChannelHandler的操作。
这里所贴出的addLast方法其实我们已经在“服务端启动流程”这篇文章中打过照面了。方法参数中的EventExecutorGroup意味着我们可以为这个ChannelHandler单独设置Excutor而不使用Channel所绑定的EventLoop,一般情况下我们不这么做,所以group参数为null。
这里先把ChannelHandler包装成ChannelHandlerContext,再添加到尾部,随后调用ChannelHandler的HandlerAdded方法。
在调用HandlerAdded方法时有一点问题,添加ChannelHandler的操作不需要在EventLoop线程中进行,而HandlerAdded方法则必须在EventLoop线程中进行。也就是说存在添加Handler时还未绑定EventLoop的情况,此时则调用newCtx.setAddPending()将当前HandlerContext设置为ADD_PENDING状态,并且调用callHandlerCallbackLater(newCtx, true)将一个异步任务添加到一个单向队链表中,即pendingHandlerCallbackHead这个链表。
如果当前已经绑定了EventLoop,则看当前调用线程是否为EventLoop线程,如果不是则向EventLoop提交一个异步任务调用callHandlerAdded0方法,否则直接调用callHandlerAdded0方法。
下面咱们依次分析一下newContext,callHandlerCallbackLater和callHandlerAdd0方法。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//先把`handler`包装成`HandlerContext`
newCtx = newContext(group, filterName(name, handler), handler);
//添加到尾部
addLast0(newCtx);
//如果还未绑定`EventLoop`则稍后再发起对`HandlerAdded`方法的调用。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//如果已经绑定了EventLoop,并且当前线程非EventLoop线程的话就提交一个异步任务,就发起一个异步任务去调用HandlerAdded方法。
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
//如果当前线程是EventLoop线程,就直接调用HandlerAdded方法。
callHandlerAdded0(newCtx);
return this;
}
3.1 newContext
先看来一下newContext方法,这里直接调用了DefaultChannelHandlerContext的构造方法,咱们跟进去看看。
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
在DefaultChannelHandlerContext的构造方法中又调用了父类AbstractChannelHandlerContext的构造方法,保存了handler属性。在调用父类构造方法之前调用了isInboud和isOutbound方法判断当前的Handler是否为ChannelInBoundHandler或者ChannelOutBoundHandler,这两个方法很简单,不再展开。
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
接下来看AbstractChannelHandlerContext的构造方法,这里非常简单,保存了几个属性,咱们看一下ordered这个属性。ordered表示EventExecutor在执行异步任务时是否按添加顺序执行,这里一般情况下executor为null,表示使用Channel所绑定的EventLoop线程,而EventLoop线程都是OrderedEventExecutor的实现类。所以这里我们不考虑ordered为false的情况。
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
上面提到了ChannelHandlerContext可以在构造方法里单独指定EventExecutor,如果没有单独指定的话就使用Channel所绑定的EventLoop,代码在哪里呢,就在AbstractChannelHandlerContext#executor方法,非常简单,如果没有为当前ChannelHandler指定excutor则返回Channel所绑定的EventLoop。
@Override
public EventExecutor executor() {
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
3.2 callHandlerCallbackLater
在添加完ChannelHandler之后将调用ChannledHandler的handlerAdded方法,但是此时有可能还未绑定EventLoop,而handlerAdded方法的调用必须在EventLoop线程内执行,此时就需要调用callHandlerCallbackLater方法在pendingHandlerCallbackHead链表中添加一个PendingHandlerAddedTask。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
接下来咱们看一下PendingHandlerAddedTask的代码,逻辑在execute方法里,这里直接调用了callHandlerAdded0。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
super(ctx);
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
}
}
}
}
3.3 callHandlerAdded0
不管是在未绑定EventLoop的情况下延迟调用handlerAdded还是在已经绑定了EventLoop的情况下立即调用HandlerAdded,最终都会调用到callHandlerAdded0方法。这里干了两件事,一是调用ChannelHandler的handlerAdded方法,二是将HandlerContext的状态设置为ADD_COMPLETE状态。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
}
3.4 添加多个ChannelHandler后的Pipeline
还记得咱们的“Netty整体架构图”吗,在这里咱们把Pipeline部分单独放大拿出来看一下,在添加完多个ChannelHandler之后,Pipeline的结构是这样的。

4 删除ChannelHandler
在Pipeline中有几个以remove开头的方法,这些方法的作用就是删除ChannelHandler。
-
remove(ChannelHandler handler):从head向tail查找,用==判断是否为同一实例,只删除第1个。 -
remove(Class<T> handlerType):从head向tail查找,用isAssignableFrom方法判断是否为符合条件的类型,只删除第1个。 -
remove(String name):从head向tail查找,用name精确匹配查找,只删除第1个,因为name不能重复,所以这里删除第1个也是唯一的1个。 -
removeFirst:删除head的后一个,不能删除tail。 -
removeLast:删除tail的前一个,不能删除head。
上述无论哪种删除方式在查找到对应的HandlerContext后都会调用到remove(final AbstractChannelHandlerContext ctx)方法,查找过程比较简单,咱们不再展开,直接看remove(final AbstractChannelHandlerContext ctx)方法。
看看这个方法的实现,是不是和addLast(EventExecutorGroup group, String name, ChannelHandler handler)很相似,非常相似。首先从双向链表中删除ChannelHandlerContext,再调用callHandlerRemoved0方法,callHandlerRemoved0方法内会调用handlerRemoved方法,这个调用必须在EventLoop线程内进行。如果删除时还未绑定EventLoop则添加一个异步任务到链表pendingHandlerCallbackHead中。
如果已经绑定了EventLoop并且当前线程非EventLoop线程则向EventLoop提交一个异步任务,否则直接调用callHandlerRemoved0方法。
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
synchronized (this) {
//从双向链表中删除`ChannelHandlerContext`
remove0(ctx);
//如果还未绑定`EventLoop`,则稍后调用`handlerRemoved`方法
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
//如果已经绑定了`EventLoop`,但是当前线程非`EventLoop`线程的话,就发起一个异步任务调用callHandlerRemoved0方法
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
//如果当前线程就是`EventLoop`线程,则直接调用callHandlerRemoved0方法。
callHandlerRemoved0(ctx);
return ctx;
}
callHandlerCallbackLater方法咱们前面已经分析过,和添加ChannelHandler时不同的是,这里向链表添加的是PendingHandlerRemovedTask,这个类也很简单,不再展开。
这里咱们只看一下callHandlerRemoved0方法。这个方法很简单,调用handlerRemoved方法,再把ChannelHandlerContext的状态设置为REMOVE_COMPLETE。
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
4 pendingHandlerCallbackHead链表中的任务什么时候调用
在AbstractUnsafe的register0方法中,在绑定EventLoop以后,会调用pipeline.invokeHandlerAddedIfNeeded()方法,我们看一下pipeline.invokeHandlerAddedIfNeeded()方法。
private void register0(ChannelPromise promise) {
try {
// 去完成那些在绑定EventLoop之前触发的添加handler操作,这些操作被放在pipeline中的pendingHandlerCallbackHead中,是个链表
pipeline.invokeHandlerAddedIfNeeded();
}
invokeHandlerAddedIfNeeded方法调用了callHandlerAddedForAllHandlers方法,咱们接着看下去。
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
callHandlerAddedForAllHandlers方法的逻辑咱就不再展开来说了,非常简单,就是遍历pendingHandlerCallbackHead这个单向链表,依次调用每个元素的execute方法,并且清空这个单向链表。
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
5 总结
Pipeline中的最重要的数据结构就是由多个ChannelHandlerContext组成的双向链表,而每个ChannelHandlerContext中包含一个ChannelHandler,ChannelHandler既可以添加也可以删除。在Pipeline中有两个特殊的ChannelHandlerContext分别是HeadContext及TailContext,这两个ChannelHandlerContext中不包含ChannelHandler,而是采用继承的方式。HeadContext实现了ChannelOutBoundHandler和ChannelInBoundHandler,而TailContext实现了ChannelInBoundHandler。
关于作者
王建新,转转架构部资深Java工程师,主要负责服务治理、RPC框架、分布式调用跟踪、监控系统等。爱技术、爱学习,欢迎联系交流。
原创文章,码字不易,点赞分享,手有余香。