五分钟就能看懂pipeline模型 -Netty 源码解析

一、pipeline介绍

1. 什么是pipeline

    
pipeline 有管道,流水线的意思,最早使用在 Unix 操作系统中,可以让不同功能的程序相互通讯,使软件更加”高内聚,低耦合”,它以一种”链式模型”来串起不同的程序或组件,使它们组成一条直线的工作流。

2. Netty的ChannelPipeline

    
ChannelPipeline 是处理或拦截channel的进站事件和出站事件的双向链表,事件在ChannelPipeline中流动和传递,可以增加或删除ChannelHandler来实现对不同业务逻辑的处理。通俗的说,ChannelPipeline是工厂里的流水线,ChannelHandler是流水线上的工人。
    
ChannelPipeline在创建Channel时会自动创建,每个Channel都拥有自己的ChannelPipeline

3. Netty I/O事件的处理过程

image

    
如图所示,入站事件是由I/O线程被动触发,由入站处理器按自下而上的方向处理,在中途可以被拦截丢弃,出站事件由用户handler主动触发,由出站处理器按自上而下的方向处理
image

二、ChannelHandlerContext

1. 什么是ChannelHandlerContext

    
ChannelHandlerContext是将ChannelHandlerChannelPipeline关联起来的上下文环境,每添加一个handler都会创建ChannelHandlerContext实例,管理ChannelHandlerChannelPipeline中的传播流向。

2. ChannelHandlerContext和ChannelPipeline以及ChannelHandler之间的关系

    
ChannelPipeline依赖于Channel的创建而自动创建,保存了channel,将所有handler组织起来,相当于工厂的流水线。
    
ChannelHandler拥有独立功能逻辑,可以注册到多个ChannelPipeline,是不保存channel的,相当于工厂的工人。
    
ChannelHandlerContext是关联ChannelHandlerChannelPipeline的上下文环境,保存了ChannelPipeline,控制ChannelHandlerChannelPipeline中的传播流向,相当于流水线上的小组长。

三、传播Inbound事件

1. Inbound事件有哪些?

image

    
(1) channelRegistered 注册事件,channel注册到EventLoop上后调用,例如服务岗启动时,pipeline.fireChannelRegistered();
    
(2) channelUnregistered 注销事件,channelEventLoop上注销后调用,例如关闭连接成功后,pipeline.fireChannelUnregistered();
    
(3) channelActive 激活事件,绑定端口成功后调用,pipeline.fireChannelActive();
    
(4) channelInactive
非激活事件,连接关闭后调用,pipeline.fireChannelInactive();     
(5) channelRead 读事件,channel有数据时调用,pipeline.fireChannelRead();
    
(6) channelReadComplete 读完事件,channel读完之后调用,pipeline.fireChannelReadComplete();
    
(7) channelWritabilityChanged 可写状态变更事件,当一个Channel的可写的状态发生改变的时候执行,可以保证写的操作不要太快,防止OOMpipeline.fireChannelWritabilityChanged();
    
(8) userEventTriggered 用户事件触发,例如心跳检测,ctx.fireUserEventTriggered(evt);
    
(9) exceptionCaught 异常事件
说明:我们可以看出,Inbound事件都是由I/O线程触发,用户实现部分关注的事件被动调用
    
说明 : 我们可以看出,Inbound事件都是由I/O线程触发,用户实现部分关注的事件被动调用

2. 添加读事件

    
从前面《Netty 源码解析-服务端启动流程解析》《Netty 源码解析-客户端连接接入及读I/O解析》我们知道,当有新连接接入时,我们执行注册流程,注册成功后,会调用channelRegistered,我们从这个方法开始

   public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
          initChannel((C) ctx.channel());
          ctx.pipeline().remove(this);
          ctx.fireChannelRegistered();
}

    
initChannel是在服务启动时配置的参数childHandler重写了父类方法

private class IOChannelInitialize extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel");
        ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
        ch.pipeline().addLast(new IOHandler());
    }
}

    
我们回忆一下,pipeline是在哪里创建的

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

    
当创建channel时会自动创建pipeline

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

    
在这里会创建两个默认的handler,一个InboundHandler --> TailContext,一个OutboundHandler --> HeadContext
    
再看addLast方法

@Override
public ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}

    
在这里生成一个handler名字,生成规则由handler类名加 ”#0”

  @Override
public ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    …
    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        addLast(executor, generateName(h), h);
    }
    return this;
}
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);
        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }
    return this;
}

    
由于pipeline是线程非安全的,通过加锁来保证并发访问的安全,进行handler名称重复性校验,将handler包装成DefaultChannelHandlerContext,最后再添加到pipeline

private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

    
这里分三步
    
(1)DefaultChannelHandlerContext进行重复性校验,如果DefaultChannelHandlerContext不是可以在多个pipeline中共享的,且已经被添加到pipeline中,则抛出异常
    
(2) 修改pipeline中的指针
       
添加IdleStateHandler之前
         
HeadContext --> IOChannelInitialize --> TailContext

image

       
添加IdleStateHandler之后
         
HeadContext --> IOChannelInitialize --> IdleStateHandler --> TailContext

image

    
(3)handler名和DefaultChannelHandlerContext建立映射关系
    
(4) 回调handler添加完成监听事件
    
最后删除IOChannelInitialize

image

    
最后事件链上的顺序为:
        
HeadContext --> IdleStateHandler --> IOHandler --> TailContext

3. pipeline.fireChannelRead()事件解析

    
在这里我们选一个比较典型的读事件解析,其他事件流程基本类似

 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    
    …
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
    }
    …
}

    
boss线程监听到读事件,会调用unsafe.read()方法

@Override
public final void read() {
    …
    pipeline.fireChannelRead(byteBuf);
    …
}

    
入站事件从head开始,tail结束

@Override
public ChannelPipeline fireChannelRead(Object msg) {
    head.fireChannelRead(msg);
    return this;
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRead(msg);
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRead(msg);
            }
        });
    }
    return this;
}

    
查找pipeline中下一个Inbound事件

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

image

    
HeadContext的下一个Inbound事件是IdleStateHandler

private void invokeChannelRead(Object msg) {
    try {
        ((ChannelInboundHandler) handler()).channelRead(this, msg);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
        reading = true;
        firstReaderIdleEvent = firstAllIdleEvent = true;
    }
    ctx.fireChannelRead(msg);
}

    
将这个channel读事件标识为true,并传到下一个handler

image
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    super.channelRead(ctx, msg);
    System.out.println(msg.toString());
}

    
这里执行IOHandler重写的channelRead()方法,并调用父类channelRead方法

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

    
继续调用事件链上的下一个handler

image
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        ReferenceCountUtil.release(msg);
    }
}

    
这里会调用TailContextRead方法,释放msg缓存
    
总结:传播Inbound事件是从HeadContext节点往上传播,一直到TailContext节点结束

四、传播Outbound事件

1. Outbound事件有哪些?

image

    
(1) bind 事件,绑定端口
    
(2) close事件,关闭channel
    
(3) connect事件,用于客户端,连接一个远程机器
    
(4) disconnect事件,用于客户端,关闭远程连接
    
(5) deregister事件,用于客户端,在执行断开连接disconnect操作后调用,将channelEventLoop中注销
    
(6) read事件,用于新接入连接时,注册成功多路复用器上后,修改监听为OP_READ操作位
    
(7) write事件,向通道写数据
    
(8) flush事件,将通道排队的数据刷新到远程机器上

2. 解析write事件

    ByteBuf resp = Unpooled.copiedBuffer("hello".getBytes());
    ctx.channel().write(resp);

    
我们在项目中像上面这样直接调用write写数据,并不能直接写进channel,而是写到缓冲区,还要调用flush方法才能将数据刷进channel,或者直接调用writeAndFlush
    
在这里我们选择比较典型的write事件来解析Outbound流程,其他事件流程类似

@Override
public ChannelFuture write(Object msg) {
    return pipeline.write(msg);
}

    
通过上下文绑定的channel直接调用write方法,调用channel相对应的事件链上的handler

@Override
public ChannelFuture write(Object msg) {
    return tail.write(msg);
}

    
写事件是从tailhead调用,和读事件刚好相反

@Override
public ChannelFuture write(Object msg) {
    return write(msg, newPromise());
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    ...
     write(msg, false, promise);
    ...
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeWrite(msg, promise);
        if (flush) {
            next.invokeFlush();
        }
    ...
}
...
}

    
经过多次跳转,获取上一个Ounbound事件链的handler

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

image

    
IdleStateHandler既是Inbound事件,又是Outbound事件
    
继续跳转到上一个handler

image

    
上一个是HeadContext处理

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    unsafe.write(msg, promise);
}
@Override
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    ...
    outboundBuffer.addMessage(msg, size, promise);
...
}

    
从这里我们看到,最终是把数据丢到了缓冲区,自此nettypipeline模型我们解析完毕
    
有关inbound事件和outbound事件的传输, 可通过下图进行归纳:

image

觉得对您有帮助请点 "赞"

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容