Netty原理与基础(五)

1.Decoder原理

1.1什么叫作Netty的解码器呢?

首先,它是一个InBound入站处理器,解码器负责处理“入站数据”。其次,它能将上一站Inbound入站处理器传过来的输入(Input)数据,进行数据的解码或者格式转换,然后输出(Output)到下一站Inbound入站处理器。一个标准的解码器将输入类型为ByteBuf缓冲区的数据进行解码,输出一个一个的Java POJO对象。Netty内置了这个解码器,叫作ByteToMessageDecoder,位在Netty的io.netty.handler.codec包中。


image.png
  • ByteToMessageDecoder仅仅提供了一个流程性质的框架:它仅仅将子类的decode方法解码之后的Object结果,放入自己内部的结果列表List<Object>中,最终,父类会负责将List<Object>中的元素,一个一个地传递给下一个站

1.2代码示例

//解码
public class Byte2IntegerDecoder extends ByteToMessageDecoder {
    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in,
                       List<Object> out) {
        while (in.readableBytes() >= 4) {
            int i = in.readInt();
            Logger.info("解码出一个整数: " + i);
            out.add(i);
        }
    }
}
//处理程序
public class IntegerProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Integer integer = (Integer) msg;
        Logger.info("打印出一个整数: " + integer);
    }
}
//测试类
public class Byte2IntegerDecoderTester {
    /**
     * 整数解码器的使用实例
     */
    @Test
    public void testByteToIntegerDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new Byte2IntegerDecoder());
                ch.pipeline().addLast(new IntegerProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);

        for (int j = 0; j < 100; j++) {
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(j);
            channel.writeInbound(buf);
        }

        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  • ByteBuf缓冲区由谁负责进行引用计数和释放管理

基类ByteToMessageDecoder负责解码器的ByteBuf缓冲区的释放工作,它会调用ReferenceCountUtil.release(in)方法,将之前的ByteBuf缓冲区的引用数减1。

1.3 ReplayingDecoder解码器

image.png

ReplayingDecoder类是ByteToMessageDecoder的子类。其作用是:
· 在读取ByteBuf缓冲区的数据之前,需要检查缓冲区是否有足够的字节。
· 若ByteBuf中有足够的字节,则会正常读取;反之,如果没有足够的字节,则会停止解码。

  • ReplayingDecoder进行长度判断的原理,其实很简单:它的内部定义了一个新的二进制缓冲区类,对ByteBuf缓冲区进行了装饰,这个类名为ReplayingDecoderBuffer。该装饰器的特点是:在缓冲区真正读数据之前,首先进行长度的判断:如果长度合格,则读取数据;否则,抛出ReplayError。ReplayingDecoder捕获到ReplayError后,会留着数据,等待下一次IO事件到来时再读取。
  • ReplayingDecoder的作用,远远不止于进行长度判断,它更重要的作用是用于分包传输的应用场景

1.4整数分包解码器

  • 底层通信协议是分包传输的,一份数据可能分几次达到对端。发送端出去的包在传输过程中会进行多次的拆分和组装。接收端所收到的包和发送端所发送的包不是一模一样的


    image.png

    在Java OIO流式传输中,不会出现这样的问题,因为它的策略是:不读到完整的信息,就一直阻塞程序,不向后执行。但是,在Java的NIO中,由于NIO的非阻塞性,就会出现上述情况

可以使用ReplayingDecoder来解决
要完成以上的例子,需要用到ReplayingDecoder一个很重要的属性——state成员属性。该成员属性的作用就是保存当前解码器在解码过程中的当前阶段

  • ReplayingDecoder源码
    protected ReplayingDecoder() {
        this((Object)null);
    }

    protected ReplayingDecoder(S initialState) {
        this.replayable = new ReplayingDecoderByteBuf();
        this.checkpoint = -1;
        this.state = initialState;
    }

    protected void checkpoint() {
        this.checkpoint = this.internalBuffer().readerIndex();
    }

    protected void checkpoint(S state) {
        this.checkpoint();
        this.state(state);
    }
  • checkpoint(Status)方法有两个作用
    (1)设置state属性的值,更新一下当前的状态。
    (2)还有一个非常大的作用,就是设置“读断点指针”。
    (3)“读断点指针”是ReplayingDecoder类的另一个重要的成员,它保存着装饰器内部ReplayingDecoderBuffer成员的起始读指针,有点儿类似于mark标记。当读数据时,一旦可读数据不够,ReplayingDecoderBuffer在抛出ReplayError异常之前,ReplayingDecoder会把读指针的值还原到之前的checkpoint(IntegerAddDecoder.Status)方法设置的“读断点指针”(checkpoint)。于是乎,在ReplayingDecoder下一次读取时,还会从之前设置的断点位置开始。

1.5分包解码器

在原理上,字符串分包解码和整数分包解码是一样的。有所不同的是:整数的长度是固定的,目前在Java中是4个字节;而字符串的长度不是固定的,是可变长度的,这就是一个小小的难题

  • 如何获取字符串的长度信息呢?
    (1)在协议的Head部分放置字符串的字节长度。Head部分可以用一个整型int来描述即可。
    (2)在协议的Content部分,放置的则是字符串的字节数组。
public class StringReplayDecoder
        extends ReplayingDecoder<StringReplayDecoder.Status> {

    enum Status {
        PARSE_1, PARSE_2
    }

    private int length;
    private byte[] inBytes;

    public StringReplayDecoder() {
        //构造函数中,需要初始化父类的state 属性,表示当前阶段
        super(Status.PARSE_1);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
                          List<Object> out) throws Exception {


        switch (state()) {
            case PARSE_1:
                //第一步,从装饰器ByteBuf 中读取长度
                length = in.readInt();
                inBytes = new byte[length];
                // 进入第二步,读取内容
                // 并且设置“读指针断点”为当前的读取位置
                checkpoint(Status.PARSE_2);
                break;
            case PARSE_2:
                //第二步,从装饰器ByteBuf 中读取内容数组
                in.readBytes(inBytes, 0, length);
                out.add(new String(inBytes, "UTF-8"));
                // 第二步解析成功,
                // 进入第一步,读取下一个字符串的长度
                // 并且设置“读指针断点”为当前的读取位置
                checkpoint(Status.PARSE_1);
                break;
            default:
                break;
        }

    }

public class StringProcessHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String s = (String) msg;
        System.out.println("打印: " + s);
    }
}

public class StringReplayDecoderTester {
    static String content = "smallmartial:Netty知识学习";

    /**
     * 字符串解码器的使用实例
     */
    @Test
    public void testStringReplayDecoder() {
        ChannelInitializer i = new ChannelInitializer<EmbeddedChannel>() {
            protected void initChannel(EmbeddedChannel ch) {
                ch.pipeline().addLast(new StringReplayDecoder());
                ch.pipeline().addLast(new StringProcessHandler());
            }
        };
        EmbeddedChannel channel = new EmbeddedChannel(i);
        byte[] bytes = content.getBytes(Charset.forName("utf-8"));
        for (int j = 0; j < 100; j++) {
            //1-3之间的随机数
            int random = RandomUtil.randInMod(3);
            ByteBuf buf = Unpooled.buffer();
            buf.writeInt(bytes.length * random);
            for (int k = 0; k < random; k++) {
                buf.writeBytes(bytes);
            }
            channel.writeInbound(buf);
        }
        try {
            Thread.sleep(Integer.MAX_VALUE);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
image.png
  • ReplayingDecoder解码器不足
    (1)不是所有的ByteBuf操作都被ReplayingDecoderBuffer装饰类所支持,可能有些ByteBuf操作在ReplayingDecoder子类的decode实现方法中被使用时就会抛出ReplayError异常。
    (2)在数据解析逻辑复杂的应用场景,ReplayingDecoder在解析速度上相对较差。

1.6MessageToMessageDecoder解码器

MessageToMessageDecoder<I>。在继承它的时候,需要明确的泛型实参<I>。这个实参的作用就是指定入站消息JavaPOJO类型。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342