#Netty入门——拆粘包与编解码(三)

Netty入门——拆粘包与编解码(三)

回顾一下上一篇文章中MyClientInitializer类中的代码。

        //(1)加入拆包器
        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,0,4,0,4));  
        //(2)加入粘包器
        pipeline.addLast(new LengthFieldPrepender(4));             
        //字符串解码 (3)
        pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8));
        //字符串编码 (4)
        pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));

上面代码中用到了Netty的拆包器、粘包器、编解码器。本文会对Netty 是如何拆包进行分析。

TCP的粘包与拆包

在TCP/IP协议中,数据是以二进制流的方式传播的,数据包映射到数据链路层、IP 层和 TCP 层分别叫Frame、Packet、Segment,我们这边不死磕如何翻译,下面都统一用英文表示。

粘包

粘包是TCP在传输过程中,为了提高有效负载,把多个数据包合并成一个数据包发送的现象。如何理解?比如10字节的数据,每次发送1个字节,需要10次TCP传输,10 ACK 确认。如果合并成一个数据包一起发送,可以提高有效负载,节省带宽(前提是对数据实时性要求不高的场景)。
但是粘包会引发语义级别的message识别问题。比如下面这张图:


image.png

ABC+DEF+GHI分3个message, 也就是3个Frame 发送出去,接收端收到4个Frame,不在是原来的3个message 对应的3 个Frame。这就是TCP的粘包与半包现象。AB、H、I的情况是半包,CDEFG的情况是粘包。虽然顺序是和原来一样,但是分组不再是原来的3个分组,这个时候就需要语义上message识别,即拆包。

拆包

发送端把4个数据包粘成2个就需要接收端把这2个数据包拆分成4个。按照如下步骤进行拆包:


image.png

1、读取数据,根据协议判断是否可以构成一个完整的包
2、如果能够构成一个完整的数据包,那么和之前接收到的数据一起拼接成一个完整的数据包给业务逻辑层,多余的数据等待下一次的拼接。
3、如果不能,那么继续从缓存中读取数据。

那么如何判断是否是一个完整的包?
有两种方式:
方式 1:分隔符。为人熟知的SMTP、POP3、IMAP、Telnet等等。下图显示的是使用“\r\n”分隔符的处理过程。


image.png

图中的数字说明:1、字节流。2、第一帧。3、第二帧
方式 2:固定长度。大家最熟悉的HTTP协议就是这种方式:Header+Content。

  • Header : 协议头部,放置一些Meta信息。
  • Content : 应用之间交互的信息主体。
    在HTTP header中 通过Content-Length告知message有多长,应用层才能识别到这条message。比如下图的HTTP1.1协议。


    image.png

Netty拆包流程

首先在Netty的拆包流程中有两个重要的变量cumulationcumulator。cumulation 是Netty中自定义的ByteBuf,与Java原生的ByteBuf还不一样,这个我们之后再讲,我们就直接理解成一个字节容器,cumulator 是一个累加器。

    ByteBuf cumulation;
    private Cumulator cumulator = MERGE_CUMULATOR;

累加器的代码如下。简单讲就是通过调用API buffer.writeBytes(in); 把in数据通过内存拷贝的方式合并到cumulation中,在合并前判断是否要对cumulation 进行扩容。

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            final ByteBuf buffer;
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
                // Expand cumulation (by replace it) when either there is not more room in the buffer
                // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                // duplicate().retain() or if its read-only.
                //
                // See:
                // - https://github.com/netty/netty/issues/2327
                // - https://github.com/netty/netty/issues/1764
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            buffer.writeBytes(in);
            in.release();
            return buffer;
        }
    };

理解了这两个变量后我们在看ByteToMessageDecoder 中的channelRead方法,该方法是每次从缓冲区读到数据时自动调用。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
               //1、合并数据到字节容器中
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                //2、把字节容器中的数据拆包并添加到业务数据容器out中
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                //3、清理字节容器
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }
                //4、把拆包后的数据交给后面的Handler解码
                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

主要步骤已经在代码中注释。下面我们来具体分析一下。
1、合并数据到字节容器中。先判断字节容器cumulation中是否有数据,没有就直接赋值,有的话,有的话就调用累加器进行累加。
2、把字节容器中的数据拆包并添加到业务数据容器out中。
我们来看一下callDecode

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
                int outSize = out.size();
               ............
               //1、读取字节累加容器中的可读字节数
                int oldInputLength = in.readableBytes();
                //2、交给业务拆包器进行拆包
               decode(ctx, in, out);
              .............
                 //3、还没有解出数据包
                if (outSize == out.size()) {
                   //4、如果字节累加容器中的可读字节数没变跳出循环接着读数据
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                    //5、还没有解出数据包,继续解包
                        continue;
                    }
                }
                //6、解出来了,但是累加容器实际没有读到数据抛异常
                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                            ".decode() did not read anything but decoded a message.");
                }
       ............
    }

3、清理字节容器。如果累加容器不为空并且没有可读数据,那么直接释放掉。否则如果还有未读数据,并且次数大于16,discardAfterReads的默认值为16。调用discardSomeReadBytes,discardSomeReadBytes源码如下表示当读索引超过容量的一半时,进行数据前移。

    @Override
    public ByteBuf discardSomeReadBytes() {
     ......
        if (readerIndex >= capacity() >>> 1) {
            setBytes(0, this, readerIndex, writerIndex - readerIndex);
            writerIndex -= readerIndex;
            adjustMarkers(readerIndex);
            readerIndex = 0;
        }
     ......
        return this;
    }

丢弃前


image.png

丢弃后


image.png

4、把拆包后的数据交给后面的Handler解码。通过fireChannelRead实现,同时会设置变量decodeWasNull,用来标识是否解出数据包。该变量用在 channelReadComplete 函数中,该函数的源码如下。

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;
        discardSomeReadBytes();
        if (decodeWasNull) {
            decodeWasNull = false;
            if (!ctx.channel().config().isAutoRead()) {
                ctx.read();
            }
        }
        ctx.fireChannelReadComplete();
    }

decodeWasNull 为true,表示没有解出数据包,这个时候如果channel 设置成了非自动读取,调用ctx.read()。这个方法有什么作用呢?
AbstractChannelHandlerContext类中找到了read 方法。read()函数会调用invokeRead(),这个方法的源代码如下:

    private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }

最终传播到HeadContext的read()方法,最后调用unsafe.beginRead()设置关心底层read事件,从而实现激活后自动读取数据。

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

unsafe是Netty内部实现底层IO细节的类,beginRead()方法设定底层Selector关心read事件,如果read事件就绪,则会调用unsafe.read()方法读取数据,然后调用channelPipe.fireChannelRead()方法通知用户已读取到数据,可进行业务处理。

自定义协议的拆包

在上一篇文章中采用的是通用拆包器LengthFieldBasedFrameDecoder,基本上所有的基于长度的二进制协议都可以用他进行拆包。关于LengthFieldBasedFrameDecoder如何使用,我这里依然推荐闪电侠同学的这篇文章,LengthFieldBasedFrameDecoder,这里不在赘述。
那么如何对自定义的协议进行拆包?
我曾在简书上看到过关于这方面的文章,比如这篇文章,但是这篇文章中的例子却没有考虑半包问题,所以我这边以另外一个例子作为讲解内容。
我在学习RPC开源框架的时候,偶然发现了张旭大神一个人写的RPC框架——Navi-pbrpc,这个框架基于Netty网络通信和Protobuf的序列化,非常适合学习RPC的同学入门(推荐学习完Navi-pbrpc源码,再去看Dubbo的源码会更好)。在这个框架中,应用层他自定义了一个协议,该协议基于header+body方式,header内含的body length属性来表明二进制数据长度,body采用经过protobuf压缩后的二进制数据。

NsHead是Navi-pbrpc 内部的header。NsHead + protobuf序列化body包结构示意如下


image.png

NsHead结构如下:


image.png

NsHead的固定长度36个字节,Header各字段中可以看到body-length字段,用来标识消息体长度。
了解了协议结构之后,下面我们看一下Navi-pbrpc是如何进行拆包的。
拆包的源码如下:

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 解决半包问题,此时Nshead还没有接收全,channel中留存的字节流不做处理
        if (in.readableBytes() < NsHead.NSHEAD_LEN) {
            return;
        }

        in.markReaderIndex();

        byte[] bytes = new byte[NsHead.NSHEAD_LEN];
        in.readBytes(bytes, 0, NsHead.NSHEAD_LEN);

        NsHead nsHead = new NsHead();
        nsHead.wrap(bytes);

        // 解决半包问题,此时body还没有接收全,channel中留存的字节流不做处理,重置readerIndex
        if (in.readableBytes() < (int) nsHead.getBodyLen()) {
            in.resetReaderIndex();
            return;
        }

        // 此时接受到了足够的一个包,开始处理
        in.markReaderIndex();

        byte[] totalBytes = new byte[(int) nsHead.getBodyLen()];
        in.readBytes(totalBytes, 0, (int) nsHead.getBodyLen());

        PbrpcMsg decoded = PbrpcMsg.of(nsHead).setData(totalBytes);
        ContextHolder.putContext("_logid", nsHead.getLogId()); // TODO

        if (decoded != null) {
            out.add(decoded);
        }
    }

1、可以看到的是先从ByteBuf 中读取可读的字节数,如果没有达到NsHead的大小,就不做处理,如果达到了就读取36个字节到字节数组中,并且封装成一个nsHead,里面包含了上图NsHead结构里的所有字段。
2、拿到NsHead中body-length,,可以知道body的长度,接着读取ByteBuf中可读字段大小,如果小于body长度就返回,否则读取ByteBuf到字节数组中,解决半包问题。
3、最后利用Protobuf客户端SDK反序列化方法拿到消息体。
这里涉及到Netty的ByteBuf,留在以后的文章中再讲。

总结

本文主要通过源码分析介绍了Netty的拆包流程,主要分为了四个步骤:
1、合并数据到字节容器中。
2、把字节容器中的数据拆包并添加到业务数据容器out中
3、清理字节容器
4、把拆包后的数据交给后面的Handler解码
并且通过一个例子介绍了如何自定义协议拆包。其实不管是netty还是自己去拆包,流程无非是文章开头介绍的流程,万变不离其宗吧。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,848评论 0 13
  • 一、粘包与拆包 1、发送时的粘包与拆包 TCP连接维护了一个发送缓存区。将要发送给对端的数据会由socket AP...
    益文的圈阅读 4,270评论 6 14
  • 为什么要粘包拆包 为什么要粘包 首先你得了解一下TCP/IP协议,在用户数据量非常小的情况下,极端情况下,一个字节...
    简书闪电侠阅读 20,629评论 23 77
  • 1. 有些人无需在意。 本来不需要为这样的人多费口舌和心力,不过也算是一种经历 来英国两年终于清晰的感受到了可能是...
    水仙与恶魔Slog阅读 231评论 0 0
  • 这是一个冷笑话,内容是—— 今天,你向首页投稿了吗? 上周三上午十一点主编在群里发了个好消息:今天首页投稿完全开放...
    Jk不二子阅读 1,874评论 75 71