
在之前的服务端启动中我们有看到在创建Channel的时候 pipeline被创建

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);

protected DefaultChannelPipeline(Channel channel) { = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this); = tail;
        tail.prev = head;

在默认情况下 Pipeline新建时会创建两个节点,一个是tailContext 一个是HeadContext
最后通过head.next和tail.prev 将这两个节点变成一个双向链表的形式

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, true, false);

        public ChannelHandler handler() {
            return this;

        public void channelRegistered(ChannelHandlerContext ctx) throws Exception { }

        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { }

        public void channelActive(ChannelHandlerContext ctx) throws Exception {

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

        public void handlerAdded(ChannelHandlerContext ctx) throws Exception { }

        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

此处可发现TailContext实现了接口ChannelInboundHandler 说明此节点会传播Inbound事件,此类同时继承了AbstractChannelHandlerContext类 说明此类是个pipeline的数据结构 进入AbstractChannelHandlerContext类

abstract class AbstractChannelHandlerContext extends DefaultAttributeMap
        implements ChannelHandlerContext, ResourceLeakHint {}

public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker {

     * Return the {@link Channel} which is bound to the {@link ChannelHandlerContext}.
    Channel channel();

     * Returns the {@link EventExecutor} which is used to execute an arbitrary task.
    EventExecutor executor();

     * The unique name of the {@link ChannelHandlerContext}.The name was used when then {@link ChannelHandler}
     * was added to the {@link ChannelPipeline}. This name can also be used to access the registered
     * {@link ChannelHandler} from the {@link ChannelPipeline}.
    String name();

     * The {@link ChannelHandler} that is bound this {@link ChannelHandlerContext}.
    ChannelHandler handler();

     * Return {@code true} if the {@link ChannelHandler} which belongs to this context was removed
     * from the {@link ChannelPipeline}. Note that this method is only meant to be called from with in the
     * {@link EventLoop}.
    boolean isRemoved();

    ChannelHandlerContext fireChannelRegistered();

    ChannelHandlerContext fireChannelUnregistered();

    ChannelHandlerContext fireChannelActive();

    ChannelHandlerContext fireChannelInactive();

    ChannelHandlerContext fireExceptionCaught(Throwable cause);

    ChannelHandlerContext fireUserEventTriggered(Object evt);

    ChannelHandlerContext fireChannelRead(Object msg);

    ChannelHandlerContext fireChannelReadComplete();

    ChannelHandlerContext fireChannelWritabilityChanged();

    ChannelHandlerContext read();

    ChannelHandlerContext flush();

     * Return the assigned {@link ChannelPipeline}
    ChannelPipeline pipeline();

     * Return the assigned {@link ByteBufAllocator} which will be used to allocate {@link ByteBuf}s.
    ByteBufAllocator alloc();

     * @deprecated Use {@link Channel#attr(AttributeKey)}
    <T> Attribute<T> attr(AttributeKey<T> key);

     * @deprecated Use {@link Channel#hasAttr(AttributeKey)}
    <T> boolean hasAttr(AttributeKey<T> key);

AbstractChannelHandlerContext 此类实现了ChannelHandlerContext方法 而ChannelHandlerContext接口代表了PipeLine的节点数据结构
ChannelHandlerContext继承了AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker

public interface AttributeMap {
     * Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
     * an {@link Attribute} which does not have a value set yet.
    <T> Attribute<T> attr(AttributeKey<T> key);

     * Returns {@code} true if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
    <T> boolean hasAttr(AttributeKey<T> key);

ChannelInboundInvoker:会触发一些事件,从类名可知 此接口是Inbound事件的传播 主要是传播读事件和注册事件
ChannelOutboundInvoker:同上 outBound事件的传播 主要是传播写事件

public void run()throws Exception{
        EventLoopGroup bossGroup = new NioEventLoopGroup();
            ServerBootstrap b = new ServerBootstrap();
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new ObjectDecoder(1024*1024,ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader())));
                            ch.pipeline().addLast(new ObjectEncoder());
                            ch.pipeline().addLast(new DubboServerHandler());
            ChannelFuture f = b.bind(port).sync();
        }finally {

用户Handler添加调用的方法是pipeline中的addLast方法 下面我们进入addLast中看看

public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);

public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
            addLast(executor, null, h);

        return this;

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {

            newCtx = newContext(group, filterName(name, handler), handler);


            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                callHandlerCallbackLater(newCtx, true);
                return this;

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    public void run() {
                return this;
        return this;

newContext(group, filterName(name, handler), handler);->创建节点
checkMultiplicity newContext addLast0比较简单 就不列代码了

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
            // any pipeline events ctx.handler() will miss them because the state will not allow it.
        } catch (Throwable t) {
            boolean removed = false;
            try {
                try {
                } finally {
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " +, t2);

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));

final void setAddComplete() {
        for (;;) {
            int oldState = handlerState;
            // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
            // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
            // exposing ordering guarantees.
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {

此处就是通过CAS将状态设置为ADD_COMPLETE 然后回调到用户代码中 最常见的回调就是ChannelInitializer 下面进入ChannelInitializer handlerAdded方法,ChannelInitializer 被添加到pipeline后会调用这个方法

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if ( {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
            return true;
        return false;

此处可以看到 当ChannelInitializer 被添加到pipeline后会调用initChannel方法 initChannel方法就是外面自己定义的那个InitChannel方法 执行用户代码,添加完成后执行remove方法 将自身删除

private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
        } finally {

public final ChannelPipeline remove(ChannelHandler handler) {
//要么获取context 要么抛出异常        
        return this;

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
        assert ctx != head && ctx != tail;

        synchronized (this) {

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we remove the context from the pipeline and add a task that will call
            // ChannelHandler.handlerRemoved(...) once the channel is registered.
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    public void run() {
                return ctx;
        return ctx;

private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next =; = next;
        next.prev = prev;


private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        // Notify the complete removal.
        try {
            try {
            } finally {
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));

此处的操作跟新增操作相同 不进行详解了
接着看看channelpipeline中的事件传播 先看看channelHandler的继承关系



public interface ChannelHandler {

     * Gets called after the {@link ChannelHandler} was added to the actual context and it's ready to handle events.
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;

     * Gets called after the {@link ChannelHandler} was removed from the actual context and it doesn't handle events
     * anymore.
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;

     * Gets called if a {@link Throwable} was thrown.
     * @deprecated is part of {@link ChannelInboundHandler}
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

     * Indicates that the same instance of the annotated {@link ChannelHandler}
     * can be added to one or more {@link ChannelPipeline}s multiple times
     * without a race condition.
     * <p>
     * If this annotation is not specified, you have to create a new handler
     * instance every time you add it to a pipeline because it has unshared
     * state such as member variables.
     * <p>
     * This annotation is provided for documentation purpose, just like
     * <a href="">the JCIP annotations</a>.
    @interface Sharable {
        // no value

handlerAdd、handlerRemove刚才我们说过 是新增或删除时间完成后进行的回调

public interface ChannelInboundHandler extends ChannelHandler {

     * The {@link Channel} of the {@link ChannelHandlerContext} was registered with its {@link EventLoop}
    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

     * The {@link Channel} of the {@link ChannelHandlerContext} was unregistered from its {@link EventLoop}
    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

     * The {@link Channel} of the {@link ChannelHandlerContext} is now active
    void channelActive(ChannelHandlerContext ctx) throws Exception;

     * The {@link Channel} of the {@link ChannelHandlerContext} was registered is now inactive and reached its
     * end of lifetime.
    void channelInactive(ChannelHandlerContext ctx) throws Exception;

     * Invoked when the current {@link Channel} has read a message from the peer.
    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

     * Invoked when the last message read by the current read operation has been consumed by
     * {@link #channelRead(ChannelHandlerContext, Object)}.  If {@link ChannelOption#AUTO_READ} is off, no further
     * attempt to read an inbound data from the current {@link Channel} will be made until
     * {@link ChannelHandlerContext#read()} is called.
    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

     * Gets called if an user event was triggered.
    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

     * Gets called once the writable state of a {@link Channel} changed. You can check the state with
     * {@link Channel#isWritable()}.
    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

     * Gets called if a {@link Throwable} was thrown.
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;

channelRead如果是服务器的话MSG参数代表一个链接 如果是客户端的话MSG代表消息
Inbound中 事件的传播是从head节点一直传播到tail节点中的 最后在节点中进行内存释放 但是若你在自定义的handler中定义了channelRead方法 并且未向后进行传播 最后未传播到tail节点 的话 内存就不会进行释放 最后会造成内存泄露 Netty中提供了SimpleInBoundHandler若使用了这个类 则会在Read之后自动释放内存 会比自己去释放内存要方便得多,这里不进行介绍
而OutBound中事件的传播是和Inbound相反的从tail节点开始一步步 往前传播 最终传播到Head节点

