自顶向下深入分析Netty(七)--ChannelPipeline源码实现

7.2 源码分析

7.2.1 ChannelPipeline

首先看ChannelPipeline接口的关键方法,相似方法只列出一个:

    ChannelPipeline addLast(String name, ChannelHandler handler);
    ChannelPipeline remove(ChannelHandler handler);
    ChannelHandler first();
    ChannelHandlerContext firstContext();
    ChannelHandler get(String name);
    ChannelHandlerContext context(ChannelHandler handler);
    Channel channel();
    ChannelPipeline fireChannelRegistered();
    ChannelFuture bind(SocketAddress localAddress);

DefaultChannelPipeline是ChannelPipeline的一个子类,回忆ChannelHandler的事件处理顺序,与双向链表的正向遍历和反向遍历顺序相同,可推知DefaultChannelPipeline使用了双向链表。事实上如此,所不同的是:链表中的节点并不是ChannelHandler而是ChannelHandlerContext。明白了这些,先看其中的字段:

    final AbstractChannelHandlerContext head;   // 双向链表头
    final AbstractChannelHandlerContext tail;   // 双向链表尾
    private final Channel channel;  // 对应Channel
    // 线程池中的线程映射,记住这个映射是为了保证执行任务时使用同一个线程
    private Map<EventExecutorGroup, EventExecutor> childExecutors;
    private MessageSizeEstimator.Handle estimatorHandle;    // 消息大小估算器,内部没有使用
    private boolean firstRegistration = true;   // 对应Channel首次注册到EventLoop
    // ChannelHandler添加任务队列链表头部
    private PendingHandlerCallback pendingHandlerCallbackHead;
    // 注册到EventLoop标记,该值一旦设置为true后不再改变
    private boolean registered; 

此外还需要注意一个static字段:

    private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
         initialValue() -> {  return new WeakHashMap<Class<?>, String>(); };

这是一个Netty内部定义的FastThreadLocal变量,以后会分析它的实现,现在先了解这样的事实:nameCaches是一个线程本地(局部)变量,也就是说每个线程都存有一份该变量,该变量是一个WeakHashMap,其中存放的是ChannelHandler的Class与字符串名称的映射关系。简单说就是每个线程都有一份Handler的Class与字符串名称的映射关系,之所以这样是为了避免使用复杂的CurrentHashMap也能实现并发安全。
首先看我们常用的addLast()方法:

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, 
                                                            ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查Handler是否重复添加
            checkMultiplicity(handler);
            // 新建一个Context
            newCtx = newContext(group, filterName(name, handler), handler);
            // 实际的双向链表插入操作
            addLast0(newCtx);
            
            if (!registered) {
                // 此时Channel还没注册的EventLoop中,而Netty的原则是事件在同一个EventLoop执行,
                // 所以新增一个任务用于注册后添加
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                // 当前线程不是EventLoop线程
                newCtx.setAddPending();
                executor.execute( () -> { callHandlerAdded0(newCtx); } );
                return this;
            }
        }
        // 当前线程为EventLoop线程且已注册则直接触发HandlerAdd事件
        callHandlerAdded0(newCtx);
        return this;
    }
    
    public final ChannelPipeline addLast(String name, ChannelHandler handler) {
        // 线程池为null则使用channel注册到的EventLoop
        return addLast(null, name, handler);
    }

先看其中的checkMultiplicity()方法,功能是保证ChannelPipeline中至多只有一个同一类型的非共享Handler,代码如下:

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) { // 为什么只对Adapter?
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                // 非共享且已被添加到pipeline中
                throw new ChannelPipelineException("...");
            }
            h.added = true;
        }
    }

filterName()方法对Handler名称进行重复检查,generateName()生成形如:HandlerClassName#0、HandlerClassName#1的Handler字符串名称,checkDuplicateName()检查名称是否已使用,也就是说pipeline中Handler的名称也要求满足唯一性。代码如下:

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }

checkDuplicateName()代码如下:

    private void checkDuplicateName(String name) {
        if (context0(name) != null) {
            throw new IllegalArgumentException("Duplicate handler name: " + name);
        }
    }
    
    // 双向链表中查找是否已有该名称的context
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }

generateName()代码如下:

    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get(); // 获得ThreadLocal变量
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);  // 生成HandlerClassName#0
            cache.put(handlerType, name);
        }
        
        // HandlerClassName#0已有,则末尾编号加1
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1);
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }
    
    // 生成HandlerClassName#0
    private static String generateName0(Class<?> handlerType) {
        return StringUtil.simpleClassName(handlerType) + "#0";
    }

再看newContext()方法,返回一个默认Contenxt,其构造方法需要传入一个EventExecutor用于执行Handler中事件处理代码。childExecutor()正是用来从线程池中分配这个EventExecutor,代码如下:

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

    private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }
        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            // Channel参数配置非同一个线程处理,不建议开启
            return group.next();
        }
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }
        // 保证pipeline中的事件为同一个EventExecutor处理,可视为将EventExecutor绑定到pipeline
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        return childExecutor;
    }

接着看实际的双向链表插入操作addLast0()操作:

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

DefaultChannelPipeline的双向链表初始化持有头部和尾部节点,这两个节点对用户不可见,也就是说,用户addLast只是将节点插入尾部节点之前,addFirst将节点插入头部节点之后。明白了这些,代码便易于理解。
接着看callHandlerCallbackLater()方法,当我们在Channel注册到之前添加或删除Handler时,此时没有EventExecutor可执行HandlerAdd或HandlerRemove事件,所以Netty为此事件生成一个相应任务等注册完成后在调用执行任务。添加或删除任务可能有很多个,DefaultChannelPipeline使用一个链表存储,链表头部为先前的字段pendingHandlerCallbackHead,代码如下:

    // 参数added为True表示HandlerAdd任务,False表示HandlerRemove任务
    private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
        assert !registered; // 必须非注册
        PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
        PendingHandlerCallback pending = pendingHandlerCallbackHead;
        if (pending == null) {
            pendingHandlerCallbackHead = task;  // 链表头部
        } else {    // 插入到链表尾部
            while (pending.next != null) {
                pending = pending.next;
            }
            pending.next = task;
        }
    }

以HandlerAdd任务为例分析任务部分的代码(HandlerRemove可类比):

    private abstract static class PendingHandlerCallback implements Runnable {
        final AbstractChannelHandlerContext ctx;
        PendingHandlerCallback next;

        PendingHandlerCallback(AbstractChannelHandlerContext ctx) { this.ctx = ctx;}

        abstract void execute();
    }

    private final class PendingHandlerAddedTask extends PendingHandlerCallback {
        PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) { super(ctx);}

        @Override
        public void run() {
            callHandlerAdded0(ctx);
        }

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                // 当前线程为EventLoop线程,调用HandlerAdd事件
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this); // 否则提交一个任务,任务执行run()方法
                } catch (RejectedExecutionException e) {
                    logger.warn("...");
                    remove0(ctx);   // 异常时,将已添加的Handler删除
                    ctx.setRemoved();
                }
            }
        }
    }

callHandlerAdded0()方法执行实际的调用事件操作,作为addLast()的最后一个方法,代码如下:

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);    // 调用事件处理
            ctx.setAddComplete();  
        } catch (Throwable t) {
            boolean removed = false;    // 异常时删除Context,尽量恢复现场
            try {
                remove0(ctx);   // 实际双向链表删除操作
                try {
                    ctx.handler().handlerRemoved(ctx);  // 调用事件处理
                } finally {
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                logger.warn("Failed to remove a handler: " + ctx.name(), t2);
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException("handlerAdded() has thrown an exception; removed."));
            } else {
                fireExceptionCaught(new ChannelPipelineException("handlerAdded() has thrown an exception; also failed to remove."));
            }
        }
    }

终于分析完addLast()方法,我们经常使用的不起眼一行代码,背后的流程却很长。分析完这个方法的代码,其他方法的代码,我们可推断:remove()作为addXXX()的逆方法,其处理过程可推断为:找到对应Context节点,执行实际的双向链表删除操作,如果非注册则新增一个HandlerRemove任务并链接到任务链表尾部,如果已注册但Context需求线程非EventLoop,提交一个调用任务到需求线程,如果已注册且需求线程为EventLoop直接调用事件处理。first()返回的是链表头部的下一个Handler即用户可见的首个Handler,last()返回链表尾部的前一个Handler即用户可见的最后一个Handler,firstContext()和lastContext()同理,只是返回Context。get(handlerName)返回相应名称的Handler,context(handerlNmae)返回相应名称的Context,操作都是从双向链表头部进行遍历查找。
再看一下fireXXX方法和bind等事件触发方法的代码:

    @Override
    public final ChannelPipeline fireChannelRegistered() {
        // 入站事件从双向链表头部处理
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
    }
    
    @Override
    public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        // 出站事件从双向链表尾部处理
        return tail.bind(localAddress, promise);
    }

由于头部和尾部节点都是ChannelHandlerContext,具体的事件触发处理都委托给head和tail处理,将在之后一节进行分析。至此,接口中的方法已分析完毕,是不是还差点什么?仔细回想一下,在addXXX()方法中有待执行的HandlerAdd和HandlerRemove任务,它们怎么执行的呢?DefaultChannelPipeline提供了invokeHandlerAddedIfNeeded()方法:

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // 至此,channel已注册到EventLoop,可以执行任务
            callHandlerAddedForAllHandlers();
        }
    }
    
    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered; // 必须为非注册
            registered = true;  // 至此则说明已注册

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            this.pendingHandlerCallbackHead = null;  // 帮助垃圾回收
        }

        // 用一个局部变量保存任务链表头部是因为以下代码如果在synchronized块内,则当用户在
        // 非EventLoop中执行HandlerAdd()方法而该方法中又新增一个handler时不会发生死锁
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute(); // 遍历链表依次执行
            task = task.next;
        }
    }

invokeHandlerAddedIfNeeded()方法在以下两种情况被调用:(1).AbstractUnsafe的register事件框架,当Channel注册到EventLoop之前会被调用,确保异步注册操作一旦完成就触发HandlerAdd事件;(2).双向链表头部节点的channelRegistered()方法(为什么此时调用,双重保护?)。
DefaultChannelPipeline还有最后一个方法destroy(),将pipeline中的所有节点销毁,顺序由尾部向头部并触发HandlerRemove事件,代码如下:

    private synchronized void destroy() {
        destroyUp(head.next, false);
    }
    
    // 参数inEventLoop应理解为是否直接执行本段代码的for循环部分,也就是说为true时不需要提交
    // 一个destroyUp任务,为False时则需要判断Handler的执行线程是否为EventLoop线程
    private void destroyUp(AbstractChannelHandlerContext ctx, boolean inEventLoop) {
        final Thread currentThread = Thread.currentThread();
        final AbstractChannelHandlerContext tail = this.tail;
        for (;;) {
            if (ctx == tail) {
                destroyDown(currentThread, tail.prev, inEventLoop);
                break;
            }

            final EventExecutor executor = ctx.executor();
            if (!inEventLoop && !executor.inEventLoop(currentThread)) {
                final AbstractChannelHandlerContext finalCtx = ctx;
                // destroyUp()的for循环部分需在executor内执行,所以置True
                executor.execute( () -> { destroyUp(finalCtx, true); } );
                break;
            }

            ctx = ctx.next;
            inEventLoop = false; // 每次都悲观的认为下一个Handler的处理线程会是另外一个线程
        }
    }
    
    
    private void destroyDown(Thread currentThread, AbstractChannelHandlerContext ctx, boolean inEventLoop) {
        // 至此,已经到达双向链表尾部,可确定入站事件已在删除操作进行之前传播完毕
        final AbstractChannelHandlerContext head = this.head;
        for (;;) {
            if (ctx == head) {
                break;
            }

            // 这部分代码实质与up部分一致,采用两种表现形式容易引起困惑
            // 本质上 (!a  && !b) == (a || b)
            final EventExecutor executor = ctx.executor();
            if (inEventLoop || executor.inEventLoop(currentThread)) {
                synchronized (this) {
                    remove0(ctx);
                }
                callHandlerRemoved0(ctx);
            } else {
                final AbstractChannelHandlerContext finalCtx = ctx;
                executor.execute(() -> { destroyDown(Thread.currentThread(), finalCtx, true); });
                break;
            }

            ctx = ctx.prev;
            inEventLoop = false;
        }
    }

这部分代码晦涩难懂,考虑这样一种情况,当我们由尾部向头部删除节点时,有一个入站事件正从头部向尾部传播,由于从尾部开始删除了某些节点,入站事件的处理流程被破坏。这部分代码正是为了处理这种情况,所以首先从头部向尾部遍历,确保没有入站事件,此时才从尾部向头部进行删除销毁操作。
这部分代码还为了保证事件在正确的线程中执行,假设有如下pipeline:

    HEAD --> [E1] H1 --> [E2] H2 --> TAIL

其中E1和E2为两个线程,则必须保证Handler1中的事件在E1执行,Handler2中的事件在E2执行,而Head和Tail的事件在Channel注册到的EventLoop中执行。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容