Netty源码分析之pipeline

承接题意,平铺直叙。Netty中每个channel都有一个pipeline,可以看下channel的层次结构:

其实在之前的客户端和服务端初始化的时候已经说过了,在初始化Channel的时候,同时初始化pipeline;

//AbstractChannel
protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

从DefaultChannelPipeline即可知道初始化pipeline的时候,head和tail是双向链表。可以看下它们的继承关系,再进行其初始化的观察。

HeadContext:

TailContext:

从继承结构可以看出,HeadContext和TailContext都继承了AbstractChannelHandlerContext和实现了ChannelOutboundHandler/ChannelInboundHandler,说明用户双重属性,既是context同时也是handler,按我的理解意味着其即拥有上下文属性也拥有handler属性(处理业务逻辑)。在文章开始的图里已经说明:每个channel包含一个pipeline,而pipeline又维护了一个双向链表。

TailContext和HeadContext


这是TailContext和HeadContext构造函数,需要注意的是TailContext的inbound为true,outbound为false,HeadContext则相反,这两个参数和netty事件流向有关,具体情况下文说明。

重新分析ChannelInitializer和自定义handler的添加

Bootstrap.connect()-->Bootstrap.doResolveAndConnect()-->AbstractBootstrap.initAndRegister()-->Bootstrap.init()

void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());
    。。。
}

config.handler()获取的就是ChannelInitializer,p.addLast(config.handler());就是把ChannelInitializer加入双向链表,看代码:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        newCtx = newContext(group, filterName(name, handler), handler);
        addLast0(newCtx);
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

让我们来看看其中的关键代码

newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }

DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

由上可知,在加入ChannelInitializer的过程中可以知道,为了添加一个 handler 到pipeline中, 会把此handler包装成ChannelHandlerContext。同时addLast0说明ChannelInitializer是添加在tail之前。这个过程中注意下两个有意思的方法:

private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

从源码中可以看到, 当一个handler实现了ChannelInboundHandler接口, 则 isInbound 返回真; 类似地, 当一个handler实现了ChannelOutboundHandler接口, 则isOutbound就返回真。ChannelInitializer是实现了ChannelInboundHandlerAdapter,所以inbound传入的是true。

自定义handler的添加

addLast方法中的另一条关键代码如下:

callHandlerAdded0(newCtx);

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    。。。    
    ctx.handler().handlerAdded(ctx);
    。。。
}

ctx.handler()取到的自然是ChannelInitializer,而handlerAdded(ctx)都做了什么呢:

handlerAdded()-->boolean initChannel()-->void initChannel()

public void  handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        initChannel(ctx);
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            remove(ctx);
        }
        return true;
    }
    return false;
}

ChannelInitializer在加入双向链表后,调用重写initChannel()方法,在initChannel()方法中加入自定义handler,最后remove(ctx);移除ChannelInitializer。
回过头来看下channel的结构层次图,在初始化channel的时候会构建一个pipeline座位channel的属性(pipeline也有一个channel属性),每个pipeline维护了一个由ChannelHandlerContext 组成的双向链表. 这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。


pipeline的传输机制

从上面的分析,我们知道AbstractChannelHandlerContext中有inbound和outbound两个boolean变量, 分别用于标识Context所对应的handler的类型, 即:

  • inbound为真时, 表示对应的ChannelHandler实现了ChannelInboundHandler方法.
  • outbound 为真时, 表示对应的 ChannelHandler 实现了 ChannelOutboundHandler 方法.
    pipieline的事件传输类型有两种:inbound事件和outbound事件两种:
                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

这个是netty官方文档,可以很明显地看出:inbound事件和outbound事件的流向相反。 inbound 的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT() 方法, 而 outbound 方法的的传递方式是通过调用 ChannelHandlerContext.OUT_EVT() 方法。例如ChannelHandlerContext.fireChannelRegistered()调用会发送一个ChannelRegistered 的inbound给下一个ChannelHandlerContext, 而ChannelHandlerContext.bind调用会发送一个bind的outbound事件给下一个 ChannelHandlerContext。
让我们来看下inbound事件传播的方法有哪些:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead()
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught()
ChannelHandlerContext.fireUserEventTriggered()
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

outbound事件传播的方法有:

ChannelHandlerContext.bind()
ChannelHandlerContext.connect()
ChannelHandlerContext.write()
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect()
ChannelHandlerContext.close()

让我们具体来看看这两类事件

outbound事件

outbound事件是请求事件,inbound事件是通知事件,这个要区分清楚。请求事件就是请求某件事即将发生,然后outbound事件进行通知。outbound事件的流向是:

tail -> customContext -> head

让我们用connect事件代码来证明:
当调用Bootstrap.connect()的时候,会触发一个outbound事件。以下是调用链

Bootstrap.connect -> Bootstrap.doConnect -> AbstractChannel.connect
让我们看看AbstractChannel.connect

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}
//pipeline.connect的实现如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

当 outbound 事件(这里是 connect 事件)传递到 Pipeline 后, 它其实是以 tail 为起点开始传播的。而 tail.connect 其实调用的是AbstractChannelHandlerContext.connect 方法。继续跟进,在AbstractChannelHandlerContext中connect方法:

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    。。。
    final AbstractChannelHandlerContext next = findContextOutbound()
    next.invokeConnect(remoteAddress, localAddress, promise);
    。。。
}

让我们看下其中的关键代码:

private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

顾名思义,findContextOutbound就是找出以this context(tail)为基本节点,找出第一个outbound为true的context,然后通过ctx调用invokeConnect方法,如果

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

从tail往head方向获取handler并且调用其connect,如果用户没有从写这个方法,那么会调用ChannelOutboundHandlerAdapter实现的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

connect又会调用AbstractChannelHandlerContext中connect方法找到下一个outbound为true的handler调用其connect,这样的循环中,直到connect事件传递到DefaultChannelPipeline的双向链表的头节点, 即 head 中(head的outbound设置为true)。

Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect

outbound事件传到head后,因为head本身也是handler,handler()返回的的就它本身,让我们看看它connect方法的实现:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

到这边outbound事件就结束了。

inbound事件

inbound是通知事件,就是说某件事情已经发生了,然后利用inbound事件进行通知。inbound事件的传输方向和outbound刚好相反:

head -> customcontext -> tail

沿着connect继续走,在之后会有inbound事件,我们就以这个为例子进行inbound事件讲解。
承接上文,之前看到head的connect方法:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

这里unsafe.connect调用的是AbstractNioChannel.connect(),关键代码如下:

@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        ···
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } 
        ···
}

在doConnect完成连接之后调用了fulfillConnectPromise,

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    if (promise == null) {
        return;
    }
    boolean active = isActive();
    boolean promiseSet = promise.trySuccess();
    if (!wasActive && active) {
        pipeline().fireChannelActive();
    }
    if (!promiseSet) {
        close(voidPromise());
    }
}

让我们看pipeline().fireChannelActive();pipeline().fireChannelActive()将通道激活的消息(即 Socket 连接成功)发送出去。这里就是inbound事件的起点,往下走看这个过程是怎么样的:

public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

很明显,以head(HeadContext)为起点,让我们看下在invokeChannelActive做了什么

static void invokeChannelInactive(final AbstractChannelHandlerContext next) {
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelInactive();
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelInactive();
            }
        });
    }
}
//next.invokeChannelInactive()实现
private void invokeChannelInactive() {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelInactive(this);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelInactive();
    }
}

这个方法和 Outbound 的对应方法(例如 invokeConnect) 如出一辙. 同Outbound一样, 如果用户没有重写channelActive方法, 那么会调用ChannelInboundHandlerAdapter 的 channelActive 方法:

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

public ChannelHandlerContext fireChannelActive() {
    invokeChannelActive(findContextInbound());
    return this;
}

和outbound事件一样,一样的循环,最后事件传输到tail。tail 本身既实现了ChannelInboundHandler接口, 又实现了ChannelHandlerContext接口,因此当channelActive消息传递到tail后,会将消息转递到对应的ChannelHandler中处理,tail的handler()返回的就是tail本身,最后的channelActive即是tail中的。
inbound事件到这里也就结束了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容