netty的编解码器

编解码器指的是转换两种不同格式的数据,在网络编程中几乎是必不可少的。比如将String转成ByteBuf,将Student转成String等等。

netty中,编码器处理的是出站消息,因而会继承ChannelOutboundHandler,解码器处理的是入站消息,因而会继承CHannelnboundHandler。

编码器的抽象类MessageToMessageDecoder<I>,其中I是要对什么类型的消息编码。解码器的抽象类MessageToMessageDecoder<I>,其中I是要对什么类型的消息进行解码。还有一个抽象类MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>,表示既可以对消息进行编码,也可以进行解码。

在应用程序中,自定义的编码器或者解码器一般直接继承上述三者即可。比如netty内置的StringEncoder是将String类型转换为ByteBuf;StringDecoder是将ByteBuf转换为String。

下面用两个例子来演示下编解码器的处理方式

  • 编码器的例子
    应用程序将往channel写入一个Student对象,我们先定义一个编码器,将Student转换成String,再用netty内置的StringEncoder,将String转换成ByteBuf,再写入channel。


    编码器.png
        EmbeddedChannel channel = new EmbeddedChannel(
//记录日志
                new LoggingHandler(LogLevel.DEBUG),
//字符串编码器,将String编码成ByteBuf
                new StringEncoder(StandardCharsets.UTF_8),
//Student编码器,将Student转成String
                new MessageToMessageEncoder<Student>() {
                    @Override
                    protected void encode(ChannelHandlerContext ctx, Student msg, List<Object> out) throws Exception {
                        out.add(new Gson().toJson(msg));
                    }
                }
        );
        Student student = new Student(10, "zhangsan");
        channel.writeOutbound(student);
  • 解码器的例子
    应用程序从channel接收数据后,先用netty内置的StringDecoder,将ByteBuf转换为String,再用自定义的解码器,将String转换为Student,这样应用程序的其他handler就可以直接对Student对象处理了


    decoder.png
EmbeddedChannel channel = new EmbeddedChannel(
        new LoggingHandler(LogLevel.DEBUG),
        new StringDecoder(StandardCharsets.UTF_8),
        new MessageToMessageDecoder<String>() {
            @Override
            protected void decode(ChannelHandlerContext ctx, String msg, List<Object> out) throws Exception {
                out.add(new Gson().fromJson(msg, Student.class));
            }
        },
        new SimpleChannelInboundHandler<Student>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, Student msg) throws Exception {
                System.out.println("receive new msg " + msg);
            }
        }
);
Student student = new Student(10, "zhangsan");
String studentStr = new Gson().toJson(student);
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes(studentStr.getBytes(StandardCharsets.UTF_8)));

接下来,我们来看下编码器的源码

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
    CodecOutputList out = null;
    try {
//判断msg的类型编码器是否支持
        if (acceptOutboundMessage(msg)) {
            out = CodecOutputList.newInstance();
            @SuppressWarnings("unchecked")
//类型强转,因为在声明中已经明确指定I类型了
            I cast = (I) msg;
            try {
//调用具体实现,对消息进行编码,并放在out中
                encode(ctx, cast, out);
            } finally {
//cast如果是ReferenceType的需要将引用计数-1,因为传递给下一个handler已经不是cast对象了
                ReferenceCountUtil.release(cast);
            }

            if (out.isEmpty()) {
                throw new EncoderException(
                        StringUtil.simpleClassName(this) + " must produce at least one message.");
            }
//如果当前msg的类型,编码器不支持对其进行编码,那么直接传递给下一个handler来处理
        } else {
            ctx.write(msg, promise);
        }
    } catch (EncoderException e) {
        throw e;
    } catch (Throwable t) {
        throw new EncoderException(t);
    } finally {
        if (out != null) {
            try {
                final int sizeMinusOne = out.size() - 1;
                if (sizeMinusOne == 0) {
//这里将编码后的对象传递给下一个handler处理
                    ctx.write(out.getUnsafe(0), promise);
                } else if (sizeMinusOne > 0) {
                    // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                    // See https://github.com/netty/netty/issues/2525
                    if (promise == ctx.voidPromise()) {
                        writeVoidPromise(ctx, out);
                    } else {
                        writePromiseCombiner(ctx, out, promise);
                    }
                }
            } finally {
                out.recycle();
            }
        }
    }
}

编码器对out的处理,有个地方不太理解,就是当out元素个数大于1的时候。
下面是解码器的源码

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    CodecOutputList out = CodecOutputList.newInstance();
    try {
        if (acceptInboundMessage(msg)) {
            @SuppressWarnings("unchecked")
            I cast = (I) msg;
            try {
                decode(ctx, cast, out);
            } finally {
                ReferenceCountUtil.release(cast);
            }
        } else {
            out.add(msg);
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception e) {
        throw new DecoderException(e);
    } finally {
        try {
            int size = out.size();
            for (int i = 0; i < size; i++) {
                ctx.fireChannelRead(out.getUnsafe(i));
            }
        } finally {
            out.recycle();
        }
    }
}

解码器与编码器的逻辑大致相同,先判断要解码的类型当前的解码器是否支持,若支持的话,进行类型的转换。若不支持,直接将msg添加到out结果列表中,最后再对out列表循环,将列表元素传递给下一个handler处理。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容