在之前的服务端启动中我们有看到在创建Channel的时候 pipeline被创建
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
在默认情况下 Pipeline新建时会创建两个节点,一个是tailContext 一个是HeadContext
最后通过head.next和tail.prev 将这两个节点变成一个双向链表的形式
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
public ChannelHandler handler() {
return this;
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }
public void channelActive(ChannelHandlerContext ctx) throws Exception {
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
此处可发现TailContext实现了接口ChannelInboundHandler 说明此节点会传播Inbound事件,此类同时继承了AbstractChannelHandlerContext类 说明此类是个pipeline的数据结构 进入AbstractChannelHandlerContext类
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
implements ChannelHandlerContext, ResourceLeakHint {}
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {
* Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
Channel channel();
* Returns the {@link EventExecutor} which is used to execute an arbitrary task.
EventExecutor executor();
* The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
* was added to the {@link ChannelPipeline}. This name can also be used to access the registered
* {@link ChannelHandler} from the {@link ChannelPipeline}.
String name();
* The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
ChannelHandler handler();
* Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
* from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
* {@link EventLoop}.
boolean isRemoved();
ChannelHandlerContext fireChannelRegistered();
ChannelHandlerContext fireChannelUnregistered();
ChannelHandlerContext fireChannelActive();
ChannelHandlerContext fireChannelInactive();
ChannelHandlerContext fireExceptionCaught(Throwable cause);
ChannelHandlerContext fireUserEventTriggered(Object evt);
ChannelHandlerContext fireChannelRead(Object msg);
ChannelHandlerContext fireChannelReadComplete();
ChannelHandlerContext fireChannelWritabilityChanged();
ChannelHandlerContext read();
ChannelHandlerContext flush();
* Return the assigned {@link ChannelPipeline}
ChannelPipeline pipeline();
* Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
ByteBufAllocator alloc();
* @deprecated Use {@link Channel#attr(AttributeKey)}
<T> Attribute<T> attr(AttributeKey<T> key);
* @deprecated Use {@link Channel#hasAttr(AttributeKey)}
<T> boolean hasAttr(AttributeKey<T> key);
AbstractChannelHandlerContext 此类实现了ChannelHandlerContext方法 而ChannelHandlerContext接口代表了PipeLine的节点数据结构
ChannelHandlerContext继承了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
public interface AttributeMap {
* Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
* an {@link Attribute} which does not have a value set yet.
<T> Attribute<T> attr(AttributeKey<T> key);
* Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
<T> boolean hasAttr(AttributeKey<T> key);
ChannelInboundInvoker:会触发一些事件,从类名可知 此接口是Inbound事件的传播 主要是传播读事件和注册事件
ChannelOutboundInvoker:同上 outBound事件的传播 主要是传播写事件
public void run()throws Exception{
EventLoopGroup bossGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ObjectDecoder(1024*1024,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
ch.pipeline().addLast(new ObjectEncoder());
ch.pipeline().addLast(new DubboServerHandler());
ChannelFuture f = b.bind(port).sync();
}finally {
用户Handler添加调用的方法是pipeline中的addLast方法 下面我们进入addLast中看看
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
addLast(executor, null, h);
return this;
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
newCtx = newContext(group, filterName(name, handler), handler);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
callHandlerCallbackLater(newCtx, true);
return this;
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
public void run() {
return this;
return this;
newContext(group, filterName(name, handler), handler);->创建节点
checkMultiplicity newContext addLast0比较简单 就不列代码了
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
} catch (Throwable t) {
boolean removed = false;
try {
try {
} finally {
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
此处就是通过CAS将状态设置为ADD_COMPLETE 然后回调到用户代码中 最常见的回调就是ChannelInitializer 下面进入ChannelInitializer handlerAdded方法,ChannelInitializer 被添加到pipeline后会调用这个方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
return true;
return false;
此处可以看到 当ChannelInitializer 被添加到pipeline后会调用initChannel方法 initChannel方法就是外面自己定义的那个InitChannel方法 执行用户代码,添加完成后执行remove方法 将自身删除
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
} finally {
public final ChannelPipeline remove(ChannelHandler handler) {
//要么获取context 要么抛出异常
return this;
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we remove the context from the pipeline and add a task that will call
// ChannelHandler.handlerRemoved(...) once the channel is registered.
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
public void run() {
return ctx;
return ctx;
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
// Notify the complete removal.
try {
try {
} finally {
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
此处的操作跟新增操作相同 不进行详解了
接着看看channelpipeline中的事件传播 先看看channelHandler的继承关系
public interface ChannelHandler {
* Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
void handlerAdded(ChannelHandlerContext ctx) throws Exception;
* Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
* anymore.
void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
* Gets called if a {@link Throwable} was thrown.
* @deprecated is part of {@link ChannelInboundHandler}
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
* Indicates that the same instance of the annotated {@link ChannelHandler}
* can be added to one or more {@link ChannelPipeline}s multiple times
* without a race condition.
* <p>
* If this annotation is not specified, you have to create a new handler
* instance every time you add it to a pipeline because it has unshared
* state such as member variables.
* <p>
* This annotation is provided for documentation purpose, just like
* <a href="http://www.javaconcurrencyinpractice.com/annotations/doc/">the JCIP annotations</a>.
@interface Sharable {
// no value
handlerAdd、handlerRemove刚才我们说过 是新增或删除时间完成后进行的回调
public interface ChannelInboundHandler extends ChannelHandler {
* The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
* The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
* The {@link Channel} of the {@link ChannelHandlerContext} is now active
void channelActive(ChannelHandlerContext ctx) throws Exception;
* The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
* end of lifetime.
void channelInactive(ChannelHandlerContext ctx) throws Exception;
* Invoked when the current {@link Channel} has read a message from the peer.
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
* Invoked when the last message read by the current read operation has been consumed by
* {@link #channelRead(ChannelHandlerContext, Object)}. If {@link ChannelOption#AUTO_READ} is off, no further
* attempt to read an inbound data from the current {@link Channel} will be made until
* {@link ChannelHandlerContext#read()} is called.
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
* Gets called if an user event was triggered.
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
* Gets called once the writable state of a {@link Channel} changed. You can check the state with
* {@link Channel#isWritable()}.
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
* Gets called if a {@link Throwable} was thrown.
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
channelRead如果是服务器的话MSG参数代表一个链接 如果是客户端的话MSG代表消息
Inbound中 事件的传播是从head节点一直传播到tail节点中的 最后在节点中进行内存释放 但是若你在自定义的handler中定义了channelRead方法 并且未向后进行传播 最后未传播到tail节点 的话 内存就不会进行释放 最后会造成内存泄露 Netty中提供了SimpleInBoundHandler若使用了这个类 则会在Read之后自动释放内存 会比自己去释放内存要方便得多,这里不进行介绍
而OutBound中事件的传播是和Inbound相反的从tail节点开始一步步 往前传播 最终传播到Head节点