netty4用最简单的协议解决一个半包问题

有时候简化实现别人的代码,有助于你更好的理解代码,不要一味地读源代码。

问题来源

客户端往服务器发送小文件

解决思路

1、使用netty(废话)
2、只是用ByteBuf
3、自定义一种协议,用最小的网络代价完成数据传送

实现

其实netty有很多的定义好的协议来解决各种各样的问题,这篇文章来自《netty权威指南》作者李林峰,详细介绍了netty的编解码框架,以及一些常用的编解码协议。

在解决这个问题的时候,我遇到的一个主要问题就是我在客户端发送一个数据包,这个数据包的大小可以很大,但是如果只用简单的channelRead去读取数据的话得到的数据并不是完整的。具体原因参考netty用户指南中的tcp stream-based传输的问题。

我先做了一个简单的协议设计:
packet = |文件名长度|文件名|文件字节长度|文件字节流|

于是就有了客户端发送的简单代码

            String name = "diagram.png";
            FileInputStream fileInputStream = new FileInputStream(new File("src/main/resources/diagram.png"));
            byte[] bytes = new byte[fileInputStream.available()];
            fileInputStream.read(bytes);

            ByteBuf byteBuf = Unpooled.buffer();

            byteBuf.writeInt("diagram.png".getBytes().length);
            byteBuf.writeBytes("diagram.png".getBytes());

            byteBuf.writeInt(bytes.length);
            byteBuf.writeBytes(bytes);
            channelFuture.channel().writeAndFlush(byteBuf);

这样发送没有问题,因为byteBuf是动态扩展的。但是接受的时候就有问题了。如果我们接受比较小的,比如一个int,我们可以直接这样写

 @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            if(byteBuf.readableBytes() > 4)
            {
                int result = byteBuf.readInt();
            }
        }
}

但是当长度很大的时候,我们就需要解决读半包的问题了。直到读到完整的数据才进行处理。但是每次收到的数据怎么去判断是不是和上一个数据是连续的,如何在没有收集到完整数据时不处理数据而继续接受呢?这是我一直困扰的问题。因为我把每一个byteBuf当成一个message来想了,其实不是的,ByteBuf中有两个指针readIndex和writeIndex,readIndex永远小于writeIndex。大概如下图所示


ByteBuf示例图

在netty的设计中ByteBuf是可以被重用的,所以可能针对这一个ChannelRead一直读取的是同一个ByteBuf。这其中readrIndex之前的是已经读取过的,就是已经被调用readXXX()之后的数据,可以重新去读取,readerIndex和writerIndex之前的是当前的readableBytes,writerIndex到capacity的是writeableBytes,当writerIndex超过capacity时就会扩展。同时为了重用这部分空间,当调用discardBytes时,会把readerIndex和writerIndex拷贝到开头,这样前面废弃的部分就被重用了,也一定程度场避免了扩容,节省了空间。

那如何针对上面的输入写ByteBuf的解码呢?
先看看netty自带的解码器怎么解决这个问题,其中LengthFieldBasedFrameDecoder就是用来解决这一类的问题的。在李林峰的文章中有详细介绍,这里就不赘述了。
我在之前代码的基础上添加了两行代码。

//在服务器的pipeline中添加的这个解码器,然后用4个字节表示整个包的长度,并且废弃掉这四个字节。
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024*1024, 0, 4, 0, 4));

//在发送的byteBuf头部添加真个包的长度
byteBuf.writeInt(4+ name.getBytes().length +4+ bytes.length);

然后我再在ChannelRead中处理剩下的数据
packet = |文件名长度|文件名|文件字节长度|文件字节流|

       if(msg instanceof ByteBuf)
        {
            ByteBuf byteBuf = (ByteBuf)msg;
            int nameSize = byteBuf.readInt();
            String name = new String(byteBuf.readBytes(nameSize).array(), "UTF-8");
            int fileSize = byteBuf.readInt();
            FileOutputStream fileOutputStream = new FileOutputStream(new File(name));
            fileOutputStream.write(byteBuf.readBytes(fileSize).array());
            System.out.println(name + " " + fileSize);
        }

问题解决,但是自己如何实现这个解码器呢?先看看netty怎么实现的。

protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            long bytesToDiscard = this.bytesToDiscard;
            int localBytesToDiscard = (int) Math.min(bytesToDiscard, in.readableBytes());
            in.skipBytes(localBytesToDiscard);
            bytesToDiscard -= localBytesToDiscard;
            this.bytesToDiscard = bytesToDiscard;

            failIfNecessary(false);
        }

        if (in.readableBytes() < lengthFieldEndOffset) {
            return null;
        }

        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "negative pre-adjustment length field: " + frameLength);
        }

        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            in.skipBytes(lengthFieldEndOffset);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than lengthFieldEndOffset: " + lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            long discard = frameLength - in.readableBytes();
            tooLongFrameLength = frameLength;

            if (discard < 0) {
                // buffer contains more bytes then the frameLength so we can discard all now
                in.skipBytes((int) frameLength);
            } else {
                // Enter the discard mode and discard everything received so far.
                discardingTooLongFrame = true;
                bytesToDiscard = discard;
                in.skipBytes(in.readableBytes());
            }
            failIfNecessary(true);
            return null;
        }

        // never overflows because it's less than maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            in.skipBytes(frameLengthInt);
            throw new CorruptedFrameException(
                    "Adjusted frame length (" + frameLength + ") is less " +
                    "than initialBytesToStrip: " + initialBytesToStrip);
        }
        in.skipBytes(initialBytesToStrip);

        // extract frame
        int readerIndex = in.readerIndex();
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

好长。。里面对于不合理的协议做了很多假设,并使不合理的输入快速失败。但是让我一个初学者写还是写不出来。所以我假设协议就是我设计的那样,简化这部分代码,便于理解。
变量给一个固定值

    private ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
    private int maxFrameLength = 1024*10;
    private int lengthFieldLength = 4;
    private int initialBytesToStrip = 0;
    private long tooLongFrameLength;
    private long bytesToDiscard;
    private boolean failFast = true;

然后写decode函数,就这么简单。。

 protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {

        int frameLength = (int) in.getUnsignedInt(0);//获取头部
        if(in.readableBytes() < frameLength)//当ByteBuf没有达到长度时,return null
        {
            return null;
        }
        in.skipBytes(4);//舍弃头部
        int index =  in.readerIndex();
        ByteBuf frame = in.slice(index, frameLength).retain();//取出自己定义的packet包返回给ChannelRead

        in.readerIndex(frameLength);//这一步一定要有,不然其实bytebuf的readerIndex没有变,netty会一直从这里开始读取,将readerIndex移动就相当于把前面的数据处理过了废弃掉了。
        return  frame;
    }

所以其实我们只要不处理bytebuf的数据知道可以读的数据达到我们需要的长度在处理就可以了。当然包的顺序不会出错是由底层tcp保证的,不用关心。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容

  • 前奏 https://tech.meituan.com/2016/11/04/nio.html 综述 netty通...
    jiangmo阅读 5,846评论 0 13
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,633评论 18 139
  • 国家电网公司企业标准(Q/GDW)- 面向对象的用电信息数据交换协议 - 报批稿:20170802 前言: 排版 ...
    庭说阅读 10,934评论 6 13
  • 前言 问题 现如今我们使用通用的应用程序或者类库来实现系统之间地互相访问。例如,我们经常使用一个HTTP客户端来从...
    Kohler阅读 768评论 0 2
  • 文/乡土依旧 都知道冬吃萝卜对健康有益,所以当地人们习惯种植晚秋季萝卜,可赶在大地封冻之前收获。 一来供自家食用,...
    小小有梦阅读 235评论 0 2