netty学习系列五:write&flush

一、ChannelOutboundBuffer

1、定义

是AbstractUnsafe使用的数据结构,用来存储待发送的数据。
在channel.unsafe实例化时,ChannelOutboundBuffer一起被初始化。每个channel都有一个自己的ChannelOutboundBuffer。

2、ChannelOutboundBuffer中的field

    Channel channel           -->    所绑定的Channel
    Entry flushedEntry        -->    表示下一个要被flush的Entry
    Entry unflushedEntry    -->    表示下一次要flush截止的Entry
    Entry tailEntry
    int flushed
    int nioBufferCount
    int nioBufferSize
    long totalPendingSize   -->    已存储的需要被write到socket发送缓存中的byte大小
    int unwritable                 -->    表示当前channel的待发送缓存是否可以继续写入数据

ChannelOutboundBuffer中维护了节点元素为Entry的单向链表。
Entry为待发送数据的抽象,实际待发送数据保存在Entry的Object msg中。

二、write的过程

1、入口

AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的write(Object msg)方法,最终都会由HeadContext.write方法执行,最终交由AbstractUnsafe.write(Object msg,ChannelPromise promise)实现。

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);
        }

2、取得ChannelOutboundBuffer

ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;

3、对要进行write操作的数据msg,如果是ByteBuf类型则统一转换为DirectByteBuf实现

msg = filterOutboundMessage(msg);

内部调用AbstractNioByteChannel.filterOutboundMessage(Object msg)方法,如果msg是ByteBuf类型,则将其转换为DirectByteBuf的实现。(调用ByteBufAllocator.directBuffer(initialCapacity)分配一块直接缓存空间并将原msg中的字节流放入)

4、 计算msg的大小

int size = pipeline.estimatorHandle().size(msg);

对于ByteBuf类型的msg,直接调用readableBytes()方法。

5、将ByteBuf msg存入ChannelOutboundBuffer中

outboundBuffer.addMessage(msg, size, promise);

1)将msg封装成Entry对象,并放入单向链表的尾部tailEntry

Entry entry = Entry.newInstance(msg, size, total(msg), promise);
tailEntry = entry;

netty使用基于thread-local的轻量级对象池Recycler对Entry进行回收,避免多次实例化的垃圾回收和开销。
2)更新ChannelOutboundBuffer的totalPendingSize,累加上本次新增的大小

incrementPendingOutboundBytes(size, false);

若totalPendingSize超过了channel的高水位线:
-将unwritable状态更新为不可写;
-执行pipeline.fileChannelWritabilityChanged()

三、flush过程

1、入口

AbstractChannel、DefaultChannelPipeline或AbstractChannelHandlerContext提供的flush()方法,都会由HeadContext.flush(ChannelHandlerContext ctx)方法执行,最终交由AbstractUnsafe.flush()实现。

        @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }

            outboundBuffer.addFlush();
            flush0();
        }

2、更新ChannelOutboundBuffer中本次将要flush的Entry区间

outboundBuffer.addFlush();

将ChannelOutboundBuffer的unflushedEntry向后不断移动到tailEntry,操作结束后本次要flush的链表区间就是flushedEntry->unflushedEntry。

3、检查ChannelOutboundBuffer是否有待flush的数据,如果没有则直接返回,终止flush过程

4、将ChannelOutboundBuffer的要flush的链表区间数据写入TCP发送缓冲区

NioByteUnsafe.doWrite(ChannelOutboundBuffer in)

对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以下逻辑--->
1)如果当前flushedEntry为空,则将OP_WRITE事件从对应Channel的interestOp中移除,跳出遍历直接到步骤5)

            if (msg == null) {
                // Wrote all messages.
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

2)采用类似自旋锁的逻辑不断调用NioSocketChannel.doWriteBytes(ByteBuf buf),将Entry.msg(即ByteBuf)中的数据写入套接字的发送缓冲区
-内部调用ByteBuf.readBytes(Channel out, int length)接口;
-实际底层最终调用nio.SocketChannel.write(ByteBuffer src)
-如果自旋过程中出现nio.SocketChannel.write(ByteBuffer src)返回结果为0,说明此时TCP发送缓冲队列已满,则退出自旋write并将OP_WRITE添加到ch.selectionKey.interestOps中,等待TCP发送缓冲队列可写时重新出发write操作;
-如果Entry的自旋write达到一定次数还没有将Entry中的数据写完,则直接跳出链表遍历操作,执行最后的incompleteWrite;
3)构造ChannelPromise通知当前已write的数据进度

in.progress(flushedAmount);

4)如果当前flushedEntry中的数据已写完,将Entry从ChannelOutboundBuffer中清理回收

in.remove();

-将当前entry从Entry链表中删除;
-从totalPendingSize中减去entry已write出去的字节数;
-若totalPendingSize小于了channel的低水位线,将unwritable状态更新为可写,并调用pipeline.fileChannelWritabilityChanged()产生ChannelWritabilityChanged事件。
<---对Entry链表区间中的每个Entry.msg(即ByteBuf)执行以上逻辑
5)根据当前socket的可写状态,进行后续操作

incompleteWrite(setOpWrite);

-若当前TCP发送缓冲区已满,则将OP_WRITE添加到ch.selectionKey.interestOps中,等待TCP发送缓冲队列可写时重新触发write操作;
-若当前TCP发送缓冲区未满,构造一个flush()事件,等待EventLoop的下一个循环重新检测ChannelOutboundBuffer中有无待flush的数据。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容