netty源码分析(19)- 添加ChannelHandler过程

上一节学习了pipeline初始化的过程。初始化了HeadContextTailContext,并构建了pipeline双向链表,每个节点存储ChannelHandlerContext

本节研究添加ChannelHandler的过程。在学习之前先整理一些之前学到的内容。

  • 服务端channel初始化channle的过程中,bossGroup服务端channelpipeline添加了一个特殊的ChannelHandler:ServerBootstrapAcceptor,如下这幅图。
image.png
  • 客户端channel所在的workGroup处理所有childHandler,这里的Handler属于客户端channel

  • 因此在bossGroup在初始化channel的过程中,会调用addLast()增加handler,促发HandlerAdded事件,而引导配置的时候触发的是ChannelInitializer#handlerAdded()

入口

在引导的过程当中,重写了ChannelInitializer#initChannel(),其中获取了pipeline并调用了addLast()方法,同事将ChannelHandler作为参数传入addLast方法中。

class DataServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ch.pipeline()
                .addLast(new DefaultEventExecutorGroup(2),
                        new DataServerHandler(),
                        new DataServerOutHandler2(),
                        new DataServerOutHandler(),
                        new DataServerHandler2());
    }
}
  • DefaultChannelPipeline#addLast()
    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        //循环添加
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }  
具体的addLast方法做了以下几件事
  1. 检查Handler是否重复添加
  2. 创建数节点ChannelHandlerContext
  3. 添加数据节点
  4. 触发handlerAdded事件,执行对应Handler的回调函数
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //检查是否重复添加
            checkMultiplicity(handler);

            //group:异步线程
            //创建数据节点
            //filterName:从头节点开始遍历检查名字是否重复,如果没有传入name则生成一个name
            newCtx = newContext(group, filterName(name, handler), handler);
            //添加数据节点
            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            //触发事件回调函数:handlerAdded
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //触发事件回调函数:handlerAdded
        callHandlerAdded0(newCtx);
        return this;
    }
  • checkMultiplicity(handler);检查Handler是否重复添加
    逻辑主要时判断是否有@Sharable注解,从而判断是否允许从夫,其次通过成员变量added属性判断是否被添加过。如果重复的话则排除异常。
    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //判断是否是共享 同时  是否时被添加过的
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }

    public boolean isSharable() {
        /**
         * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
         * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
         * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
         * {@link Thread}s are quite limited anyway.
         *
         * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
         */
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }

  • newCtx = newContext(group, filterName(name, handler), handler);,创建数据节点
    实例化DefaultChannelHandlerContext的过程中将handler作为成员变量持有,并且标记了handlerinbound还是outbound,并将一些重要的组件pipeline,executor保存起来。
    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        //实例化DefaultChannelHandlerContext
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
    private final ChannelHandler handler;

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        //标记类型
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        //持有handler
        this.handler = handler;
    }

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                                  boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

指的注意的是childExecutor,该方法处理添加handler时,传入的EventExecutorGroup。该Executor可用于用户在回调方法中处理异步计算。

    private EventExecutor childExecutor(EventExecutorGroup group) {
        if (group == null) {
            return null;
        }
        Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
        if (pinEventExecutor != null && !pinEventExecutor) {
            return group.next();
        }
        //存储异步线程的容器:管理异步线程
        Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
        if (childExecutors == null) {
            // Use size of 4 as most people only use one extra EventExecutor.
            //新建一个Map存储异步线程
            childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
        }
        // Pin one of the child executors once and remember it so that the same child executor
        // is used to fire events for the same channel.
        //存储异步线程
        EventExecutor childExecutor = childExecutors.get(group);
        if (childExecutor == null) {
            childExecutor = group.next();
            childExecutors.put(group, childExecutor);
        }
        //返回异步线程
        return childExecutor;
    }

用户配置异步线程,处理异步计算。


class DataServerInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel ch) throws Exception {

        ch.pipeline()
                //配置异步线程处理异步计算
                .addLast(new DefaultEventExecutorGroup(2),
                        new DataServerHandler(),
                        new DataServerOutHandler2(),
                        new DataServerOutHandler(),
                        new DataServerHandler2());
    }
}

class DataServerHandler extends SimpleChannelInboundHandler<Object> {
    public DataServerHandler() {
        super(false);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println(MessageFormat.format("---> receive client msg = \"{0}\"", byteBuf.toString(CharsetUtil.UTF_8)));

        System.out.println("<--- send to client msg = server msg");
        ByteBuf serverMsg = Unpooled.copiedBuffer("server msg", CharsetUtil.UTF_8);

        ctx.write(serverMsg);

        ctx.executor().execute(new Runnable() {
            public void run() {
                try {
                    TimeUnit.SECONDS.sleep(20);
                    System.out.println("io async end ");
                } catch (InterruptedException e) {
                    //ignore
                }
            }
        });

        System.out.println("<--- send to client channel msg = server channel msg");
        ByteBuf serverChannelMsg = Unpooled.copiedBuffer("server channel msg", CharsetUtil.UTF_8);
        //tail.write
        ctx.channel().write(serverChannelMsg);

        System.out.println("<--- send to client msg = server msg2");
        ByteBuf serverMsg2 = Unpooled.copiedBuffer("server msg2", CharsetUtil.UTF_8);
        ctx.write(serverMsg2);

        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }
}

结合初始化NioEventLoopGroup的过程实际上这里保存的executor就是ThreadPerTaskExecutor,因此同样时新增了一个EventLoop数组并初始化一个chooser,当group.next()的时候调用chooser选择一个EventLoop执行操作。

    protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
        this(nThreads, threadFactory == null ? null : new ThreadPerTaskExecutor(threadFactory), args);
    }
  • 添加数据节点addLast0(newCtx);
    其逻辑主要是链表的操作:将新的数据节点添加到尾节点(TailContext)之前
    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

  • callHandlerAdded0(newCtx);:触发handlerAdded事件
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            //调用回调函数
            ctx.callHandlerAdded();
        } catch (Throwable t) {
           //省略代码
    }

    final void callHandlerAdded() throws Exception {
        // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
        // any pipeline events ctx.handler() will miss them because the state will not allow it.
        if (setAddComplete()) {
            //获取handler调用handlerAdded触发事件
            handler().handlerAdded(this);
        }
    }
整理以下整个添加handler的过程
  • bossGroup初始化的时候将对服务端channelpipeline进行了添加ChannelInitializer的操作,促发用户initChannel回调方法,添加具体的handler,之后删除ChannelInitializer

  • 其中bossGroup添加具体的handler,包括两个:引导调用handler()配置的,另一个是ServerBootstrapAcceptor

  • 当新连接接入时候,会创建客户端channel并初始化,当处理读取事件的时候触发ServerBootstrap.ServerBootstrapAcceptor#channelRead(),调用如下代码

            //添加childHandler
            child.pipeline().addLast(childHandler);

childHandler也是引导配置的时候调用childHandler()方法,传入ChannelInitializer实例作为参数。因此同样的逻辑。不一样的是,childHandler的处理逻辑已经交给了childGroup进行处理了,脱离了bossGroup,查看如下代码。

                 //选择NioEventLoop并注册Selector
                childGroup.register(child)
                        .addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容