NettyClient与NettyServer通信流程

NettyClient端的channel为NioSocketChannel,通过writeAndFlush方法将数据发送到NettyServer端。

NioSocketChannel.writeAndFlush->AbstractChannel.writeAndFlush->pipeline.writeAndFlush->tail.writeAndFlush->AbstractChannelHandlerContext.writeAndFlush->AbstractChannelHandlerContext.write

private void write(Object msg, boolean flush, ChannelPromise promise) {
    AbstractChannelHandlerContext next = findContextOutbound();(1)
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(msg, promise);(2)
        } else {
            next.invokeWrite(msg, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, msg, promise);
        }  else {
            task = WriteTask.newInstance(next, msg, promise);
        }
        safeExecute(executor, task, promise, msg);
    }
}

(1)处从TailContext向前找第一个outbounnd context,业务handler一般为inbound,因此会找到HeadContext。
(2)执行HeadContext.invokeWriteAndFlush方法。

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
    if (invokeHandler()) {
        invokeWrite0(msg, promise);
        invokeFlush0();
    } else {
        writeAndFlush(msg, promise);
    }
}

private void invokeWrite0(Object msg, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).write(this, msg, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

private void invokeFlush0() {
    try {
        ((ChannelOutboundHandler) handler()).flush(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

最终会调用HeadContext的write和flush方法,如下:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }

    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        unsafe.flush();
    }

先看unsafe的write方法:

@Override
    public final void write(Object msg, ChannelPromise promise) {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            // If the outboundBuffer is null we know the channel was closed and so
            // need to fail the future right away. If it is not null the handling of the rest
            // will be done in flush0()
            // See https://github.com/netty/netty/issues/2362
            safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
            // release message now to prevent resource-leak
            ReferenceCountUtil.release(msg);
            return;
        }

        int size;
        try {
            msg = filterOutboundMessage(msg);
            /**
             * pipeline.estimatorHandle()会得到DefaultMessageSizeEstimator,再调用size(msg)方法用于计算该msg的可写到网络的字节的数量,即该buf的可读字节数量
             * 如果msg为ByteBuf,那么返回该buf的可读字节数,即可以发送出去的字节数
             */
            size = pipeline.estimatorHandle().size(msg);
            if (size < 0) {
                size = 0;
            }
        } catch (Throwable t) {
            safeSetFailure(promise, t);
            ReferenceCountUtil.release(msg);
            return;
        }

        outboundBuffer.addMessage(msg, size, promise);(3)
    }

(3)处的方法如下,主要是将msg放到ChannelOutboundBuffer的双向链表里:

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        /**
         * 添加第1个msg时,将添加的msg包成entry,作为链表tail
         * 因为entry还没flush,所以flushedEntry设为null
         */
        flushedEntry = null;
        tailEntry = entry;
    } else {
        /**
         * tail不为null,说明已经有msg在之前add进来了
         * 将新add的entry连在之前tail的后面,作为新的tail
         */
        Entry tail = tailEntry;
        tail.next = entry;
        tailEntry = entry;
    }
    if (unflushedEntry == null) {
        /**
         * 第1次add的entry,作为unflushedEntry
         */
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

放入第1个entry1后,双向链表为:

    entry1->null
    指针指向为:
    flushedEntry->null
    unflushedEntry->entry1(本次添加的entry)
    tailEntry->entry1

放入第2个entry2后,双向链表为:

    entry1->entry2->null
    指针指向为:
    flushedEntry->null
    unflushedEntry->entry1
    tailEntry->entry2(本次添加的entry)

再看unsafe的flush方法:

    @Override
    public final void flush() {
        assertEventLoop();

        ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null) {
            return;
        }

        outboundBuffer.addFlush();(4)
        flush0();(5)
    }  

(4)处的方法为:

public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

执行addFlush前后,双向链表的指针情况如下:

    addFlush前指针指向为:
    flushedEntry->null
    unflushedEntry->entry1
    tailEntry->entry1
    
    addFlush后指针指向为:
    flushedEntry->entry1
    unflushedEntry->null
    tailEntry->entry1

addFlush操作主要是从unflushedEntry开始链表后面扫,直到扫到null,并且置unflushedEntry为null,将扫过的entry的promise的result都置为UNCANCELLABLE。

(5)处的方法为:

    protected void flush0() {
        if (inFlush0) {
            // Avoid re-entrance
            return;
        }

        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        if (outboundBuffer == null || outboundBuffer.isEmpty()) {
            return;
        }

        inFlush0 = true;

        // Mark all pending write requests as failure if the channel is inactive.
        if (!isActive()) {
            try {
                if (isOpen()) {
                    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                } else {
                    // Do not trigger channelWritabilityChanged because the channel is closed already.
                    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                }
            } finally {
                inFlush0 = false;
            }
            return;
        }

        try {
            doWrite(outboundBuffer);(6)
        } catch (Throwable t) {
            if (t instanceof IOException && config().isAutoClose()) {
                /**
                 * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                 * failing all flushed messages and also ensure the actual close of the underlying transport
                 * will happen before the promises are notified.
                 *
                 * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                 * may still return {@code true} even if the channel should be closed as result of the exception.
                 */
                close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
            } else {
                outboundBuffer.failFlushed(t, true);
            }
        } finally {
            inFlush0 = false;
        }
    }

(6)处的方法为AbstractNioByteChannel的doWrite方法:

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    /**
     * 当发送缓冲区已经满,没能将完整数据包发送出去时,需要设置setOpWrite为true,
     * 这样在因为缓冲区满不能将完整数据包发送出去提前退出循环后重新设置写感兴趣事件,
     * 待下次可写事件发生时继续把剩下的数据包发送出去。
     */
    boolean setOpWrite = false;
    for (;;) {
        Object msg = in.current();
        if (msg == null) {
            // Wrote all messages.
            clearOpWrite();
            // Directly return here so incompleteWrite(...) is not called.
            return;
        }

        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            int readableBytes = buf.readableBytes();
            if (readableBytes == 0) {
                in.remove();
                continue;
            }

            boolean done = false;
            long flushedAmount = 0;
            if (writeSpinCount == -1) {
                writeSpinCount = config().getWriteSpinCount();
            }
            for (int i = writeSpinCount - 1; i >= 0; i --) {
                /**
                 * 这里的localFlushedAmount是真正写入到channel的字节数
                 */
                int localFlushedAmount = doWriteBytes(buf);
                if (localFlushedAmount == 0) {
                    /**
                     * 写入的字节数为0,说明channel的写缓冲区已满,不能再写入
                     */
                    setOpWrite = true;
                    break;
                }

                flushedAmount += localFlushedAmount;
                /**
                 * buf不可读说明写入buf的数据已经全部发送到channel
                 */
                if (!buf.isReadable()) {
                    done = true;
                    break;
                }
            }

            in.progress(flushedAmount);

            if (done) {
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else if (msg instanceof FileRegion) {
            FileRegion region = (FileRegion) msg;
            boolean done = region.transfered() >= region.count();

            if (!done) {
                long flushedAmount = 0;
                if (writeSpinCount == -1) {
                    writeSpinCount = config().getWriteSpinCount();
                }

                for (int i = writeSpinCount - 1; i >= 0; i--) {
                    long localFlushedAmount = doWriteFileRegion(region);
                    if (localFlushedAmount == 0) {
                        setOpWrite = true;
                        break;
                    }

                    flushedAmount += localFlushedAmount;
                    if (region.transfered() >= region.count()) {
                        done = true;
                        break;
                    }
                }

                in.progress(flushedAmount);
            }

            if (done) {
                in.remove();
            } else {
                // Break the loop and so incompleteWrite(...) is called.
                break;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
    }
    incompleteWrite(setOpWrite);
}

doWrite方法执行的操作主要为循环从ChannelOutboundBuffer中取出msg写到channel的socket发送缓冲区,直到写完ChannelOutboundBuffer的所有msg,但有时因为socket发送缓冲区满了不能及时发出去,那么设置setOpWrite为true,标识写半包情况,退出循环后要注册channel的write感兴趣事件,等待下次可写了继续发。

对于server端,在NioEventLoop select循环中获取到读事件,解析好client发送来的数据并响应给client,同样是执行NioSocketChannel.writeAndFlush操作,与client端一样,不在详细阐述。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,463评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,868评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,213评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,666评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,759评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,725评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,716评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,484评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,928评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,233评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,393评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,073评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,718评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,308评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,538评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,338评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,260评论 2 352

推荐阅读更多精彩内容