第十八节 netty源码分析之 pipleline和handler以及pipeline的数据流向02

补充InBond事件

通过在上一篇分析outbound事件已经两者的关系,在来分析inbound就比较简单了,作为outbound的镜像事件它是怎么运行的呢?
Inbound 的特点是它传播方向是 head -> customContext -> tail
方法中的代码片段中我们只分析了doConnect这个方法,而且也已经知道doConnect方法是最后是通过java nio中socket与服务端建立链接,那么再想想,链接成功后呢,是不是要通知客户端
已经链接成功了呢,所以这里返回true表示链接成功,然后通过fulfillConnectPromise传递链接成功的事件,再结合官网的pipeline流转图我们推测这里应该是将连接成功事件绑定到inbound
上进行传播,那么是不是这样的呢?我们结合代码来

AbstractNioChannel中connect方法片段

    //doConnect这里的实现在子类NioSocketChannel中
                if (doConnect(remoteAddress, localAddress)) {
                //fulfillConnectPromise方法再链接后通知事件
                    fulfillConnectPromise(promise, wasActive);
                } else {
//略
                }

下面是AbstractNioChannel的fulfillConnectPromise具体如下,

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
            if (promise == null) {
                // Closed via cancellation and the promise has been notified already.
                return;
            }

            // Get the state as trySuccess() may trigger an ChannelFutureListener that will close the Channel.
            // We still need to ensure we call fireChannelActive() in this case.
            boolean active = isActive();

            // trySuccess() will return false if a user cancelled the connection attempt.
            boolean promiseSet = promise.trySuccess();

            // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
            // because what happened is what happened.
            //翻译九四无论连接尝试是否被取消,都应触发channelActive()事件,因为发生的事情就是发生了什么。
            if (!wasActive && active) {
                pipeline().fireChannelActive();
            }

            // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().
            if (!promiseSet) {
                close(voidPromise());
            }
        }

其中关键就在pipeline().fireChannelActive();这个方法中,

继续fireChannelActive实现,根据前面的文章分析,再创建channel的时候会创建一个DefaultChannelPipeline,所以这里的fireChannelActive就在DefaultChannelPipeline中

@Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }
  • 注意到invokeChannelActive的入参为head,所以从侧面也印证了inbound事件从head开始
    继续查看源码
 static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

这个方法很熟悉有木有,和之前分析outbound一样,不过这里的next就是head了。且invokeChannelActive的实现在AbstractChannelHandlerContext中

private void invokeChannelActive() {
    //判断是否已添加haddler
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }
  • 这里就和之前outbound分析相同了,唯一不同就是获取的handler()方法获取返回的是head

那么我们分析head的channelActive(其实else中的fireChannelActive也会涉及),我们从HeadContext找到该方法

  @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
 }
  • 看到这ctx.fireChannelActive();这里fireChannelActive();和上面else里的是同一个均属于AbstractChannelHandlerContext类下(.channelActive(this)中的this入参就是AbstractChannelHandlerContext本身)
    所以分析AbstractChannelHandlerContext方法下的fireChannelActive就可以了
  @Override
    public ChannelHandlerContext fireChannelActive() {
        invokeChannelActive(findContextInbound());
        return this;
    }
  • 看到invokeChannelActive(findContextInbound());这个方法和之前outbound类似,不过这里区别还是有的,findContextInbound找到一个可用的inbound handler.这里就是
 private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

继续追踪会看执行会从.channelActive(this)->HeadContext的channelActive(ChannelHandlerContext ctx)->AbstractChannelHandlerContext的ChannelHandlerContext fireChannelActive()
->>AbstractChannelHandlerContext的invokeChannelActive(final AbstractChannelHandlerContext next)->再次回到AbstractChannelHandlerContext的 invokeChannelActive()
总而言之就是inbound事件过来从headContext(inboud和outbound接口均实现)沿着inboud流向自定义inboundHandler依次执行channelActive通知。最后到TailContext(实现的是一个空方法channelActive)。

所以我们写个inbound的handler并重写channelActive。当和服务端建立链接成功后,写入数据到服务端。下面的代码是不是就很清楚了

public class EchoClientHandler extends ChannelInboundHandlerAdapter {

    private final ByteBuf firstMessage;

    /**
     * Creates a client-side handler.
     */
    public EchoClientHandler() {
        firstMessage = Unpooled.buffer(EchoClient.SIZE);
        for (int i = 0; i < firstMessage.capacity(); i ++) {
            firstMessage.writeByte((byte) i);
        }
    }
//当和服务端建立链接成功后,写入数据到pipeline(包含这ChannelHandlerContext的链表)
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
  • 如果你怀疑这个writeAndFlush是否传递的是outbound事件,其实可以追踪源码,会找到AbstractChannelHandlerContext next = findContextOutbound();最后是nio的write

就此其他的事件同理可自行分析
最后借用网友的总结两种事件:

对于 Outbound事件:

    Outbound 事件是请求事件(由 Connect 发起一个请求, 并最终由 unsafe 处理这个请求)

    Outbound 事件的发起者是 Channel

    Outbound 事件的处理者是 unsafe

    Outbound 事件在 Pipeline 中的传输方向是 tail -> head.

    在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.xxx (例如 ctx.connect) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

    Outbound 事件流: Context.OUT_EVT -> Connect.findContextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT

对于 Inbound 事件:

    Inbound 事件是通知事件, 当某件事情已经就绪后, 通知上层.

    Inbound 事件发起者是 unsafe

    Inbound 事件的处理者是 Channel, 如果用户没有实现自定义的处理方法, 那么Inbound 事件默认的处理者是 TailContext, 并且其处理方法是空实现.

    Inbound 事件在 Pipeline 中传输方向是 head -> tail

    在 ChannelHandler 中处理事件时, 如果这个 Handler 不是最后一个 Hnalder, 则需要调用 ctx.fireIN_EVT (例如 ctx.fireChannelActive) 将此事件继续传播下去. 如果不这样做, 那么此事件的传播会提前终止.

    Inbound 事件流: Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext.fireIN_EVT
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容