本文是Netty文集中“Netty in action”系列的文章。主要是对Norman Maurer and Marvin Allen Wolfthal 的 《Netty in action》一书简要翻译,同时对重要点加上一些自己补充和扩展。
本章含盖
- 解码器、编码器、编解码器综述
- Netty 的编解码类
Netty提供可以简化各种协议的自定义编解码器创建的组件。
什么是编解码器?
每个网络应用都会定义端之间传输的二进制字节该如何被解析和转换,从发送端到目标程序的数据类型。这个转换逻辑通过编解码器来完成,编解码器包含了一个编码器和一个解码器,每个编解码器将一个字节流从一个格式转换为另一个格式。那么怎么区分它们了?
一个编码器转换消息为一个适当的格式用于传输(大部分情况下是一个字节流);对应的解码器转换网络流为一个程序的消息格式。一个编码器操作一个出站数据(outbound data),一个解码器处理一个入站数据(inbound data)。
Decoders
Decoder 实现了ChannelInboundHandler。
解码器类包含了两个不同的使用场景:
- 解码字节到消息 —— ByteToMessageDecoder 和 ReplayingDecoder
- 解码一种消息类型到另外一种消息类型 —— MessageToMessageDecoder
因为解码器的责任是转换入站数据从一种格式到另一种格式,Netty的解码器实现了ChannelInboundHandler。
因为Netty的ChannelPipeline的设计,你能够链接多个解码器去实现任意复杂的转换逻辑,这是Netty支持代码模块化和重用的一个很好的例子。
ByteToMessageDecoder 抽象类
由于你不知道远端是否会一次性发送一个完整的数据,ByteToMessageDecoder类缓存入站数据直到数据准备好可用于处理。
注意,decodeLast()方法是在当ByteBuf还有可读数据时,默认调用decode()方法。
/**
* Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the
* {@link #channelInactive(ChannelHandlerContext)} was triggered.
*
* By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may
* override this for some special cleanup operation.
*/
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
decodeRemovalReentryProtection(ctx, in, out);
}
}
示例:
尽管ByteToMessageDecoder使得此模式实现简单,你可能发现有一点比较烦人,就是在调用readInt()前必须校验input ByteBuf是否有足够的数据。下一章我们将讨论ReplayingDecoder,一个特殊的解码器,它能够消除这个一步骤,只需要消耗很小的性能损耗。
编解码器中的引用计数
正如我们在第五章和第六章所提到的,引用计数是需要特别注意的。在编码器和解码器情况下,这个过程是相当简单的:一旦一个消息被编码或解码,它将自动被释放通过调用ReferenceCountUtil.release(message)。如果你需要持有该引用以便后面使用,你能调用ReferenceCountUtil.retain(message)。该调用会增加引用计数,防止消息被释放。
ReplayingDecoder 抽象类
ReplayingDecoder继承了ByteToMessageDecoder,并将我们从调用readableBytes()中解放。它实现这个通过使用一个自定义ByteBuf的实现(ReplayingDecoderBuffer)来封装入站ByteBuf。ReplayingDecoderBuffer在内部执行时调用。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
参数S指定用于状态管理的类型,Void表示不使用状态管理。
示例:基于ReplayingDecoder来实现ToIntegerDecoder
请注意ReplayingDecoder的这些方面:
- 不是所有的ByteBuf操作都支持。如果一个不支持的方法被调用了,那么将抛出一个UnsupportedOperationException异常。
- ReplayingDecoder略慢与ByteToMessageDecoder
在现实环境总,在复杂情况下是使用ByteToMessageDecoder还是ReplayingDecoder的区别是很大的。
更多关于解码器
下面的类处理更复杂的使用情况:
- io.netty.handler.codec.LineBasedFrameDecoder —— 这个类用于Netty内部,使用'结束换行'控制字符( \n or \r\n )来解析消息数据
- io.netty.handler.codec.http.HttpObjectDecoder —— 用于Http数据的解码器
MessageToMessageDecoder 抽象类
使用MessageToMessageDecoder进行消息格式间的转换。
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter
参数 “I" 指定了输入消息的类型,作为decode的输入消息类型参数,decode()是你唯一需要实现的方法。
一个更复杂的例子,请看io.netty.handler.codec.http.HttpObjectAggregator,该类间接继承了MessageToMessageDecoder<HttpObject>.
TooLongFrameException 类
因为Netty是一个异步框架,你需要去缓存字节到内存中直到你能够去解析它。所以,你不应该允许你的解码器去缓存足够的数据来耗尽可用内存。为了解决这个共同关心的问题,Netty提供了一个TooLongFrameException,如果一个帧的大小超过了指定大小则抛出该异常。
为了避免内存被耗尽,你能够设置一个最大字节数的阈值,如果超过了这个阈值,将导致一个TooLongFrameException异常抛出( 并被ChannelHandler.exceptionCaught()捕获)。然后由解码器的用户来决定如果处理该异常。一些协议,例如HTTP,允许你返回一个特殊的响应。在其他情况下,唯一的选择可能就是关闭连接。
Encoders
Encoder 实现了ChannelOutboundHandler。
Netty提供了一个集合的类来帮助你写支持如下功能的编码器:
- 将一消息编码为字节
- 将一个消息编码为另一个消息
MessageToByteEncoder 抽象类
你可能已经注意到,这个类只有一个方法,但decoder有两个。这是因为解码器经常需要产生一个最后消息在channel已经关闭前( 因此有了 decodeLast() 方法 )(注意,decodeLast会在channelInactive之前被调用)。我们清楚的知道编码器是没有这种情况的 —— 没有必要在连接断开后去产生一个消息。
示例:MessageToMessageEncoder 抽象类
如果对特化的MessageToMessageEncoder感兴趣,可以查看io.netty.handler.codec.protobuf.ProtobufEncoder类
codec抽象类
Netty的codec抽象类,将一个编码器和解码器捆绑成一对用于同时管理入站和出站消息的转换。codec同时实现了ChannelInboundHandler 和 ChannelOutboundHandler。
为什么我们不是用这个复合类在所有时候,而是更倾向于将解码和编码分开了?因为将这两个功能分开,无论何时都能最大程度上来保持代码的重用性和可扩展性,这是Netty的一个基本理念。
ByteToMessageCodec 抽象类
ByteToMessageCodec 合并了ByteToMessageDecoder 和 MessageToByteEncoder。
任何 请求/响应 协议都适合使用ByteToMessageCodec。
MessageToMessageCodec 抽象类
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN>
INBOUND_IN消息作为写操作发送出去的类型,OUTBOUND_IN消息作为被应用处理的类型。
CombinedChannelDuplexHandler 类
如我们早前说的,合并一个解码器和一个编码器可能会对复用性造成影响。然后,这里提供了一个方式去避免这个损失且不用牺牲配置一个解码器和一个编码器为一个单元的便利性。使用CombinedChannelDuplexHandler来解决这个问题
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler, O extends ChannelOutboundHandler>
该类扮演一个包含有ChannelInboundHandler和一个ChannelOutboundHandler的容器。通过分别提供一个docoder类和一个encoder类,我们能够实现编解码器而不需要直接继承一个codec抽象类。
也就是说,CombinedChannelDuplexHandler使用组合的方式,复用已经存在的Decoder和Encoder来实现编解码器,这样就保持了代码的重用性和可扩展性。而如果是直接实现一个Codec抽象类的话,则是通过直接实现相关的encode、decode方法来实现编解码器,这使得程序失去了代码的重用性和可扩展性。
示例:
扩展
Q:ReplayingDecoder是如何做到,不用判断字节数是否足够就直接调用readXXX操作并能保证正确的逻辑了?
A:我们来简单看下ReplayingDecoder中的一些实现:
callDecode()是一个写循环实现,每次都会先记录ByteBuf in当前的读索引位置,然后将ByteBuf in封装成一个ReplayingDecoderByteBuf对象(这是一个特殊的ByteBuf实现,它重写了ByteBuf的各种readXXX、getXXX。这些方法在获取真实的数据前会先判断字节是否足够,如果不足够则会抛出一个Signal异常。)。然后将封装好的ReplayingDecoderByteBuf对象传递给decodeRemovalReentryProtection方法(decodeRemovalReentryProtection方法底层会调用decode()方法),这样一来当readXXX操作的时候数据不足的话就会抛出一个Signal异常。在catch{}语块中就会将ByteBuf的readerIndex重置为本次解码前的位置。
但也正是因为如此,在某些情况下ReplayingDecoder可能存在较差的性能。如,在网络很慢且消息格式较复杂的情况下。比如,有个一消息格式为:“消息头”+“消息体”两部分组成一个完整的消息包。我们需要根据消息头获取消息体数据长度以获取我们所需的数据。 但是了,因为网络比较慢的关系,我们读取到的ByteBuf可能不是一个完整的消息格式包(可能包含了消息头以及部分的消息体),本次decode就无法解析出一个消息包(但是我们已经成功解码处理消息头的数据了),那么就会在catch中将ByteBuf的readerIndex重置。那么下次decdoe的时候,又需要重新解析一次消息(即,消息头数据又需要重新进行一次解析)。如果依旧无法获取一个完整的消息包,那么前面的操作将再执行一次。。。
当然,我们也是有办法来解决这个问题的,那就是使用ReplayingDecoderByteBuf的checkpoint(T)方法来管理解码器的状态。嗯,这里举个java doc中的例子来说明checkpoint(T)的使用。
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
首先,我们根据自定协议声明好协议的各个状态。这里就是“READ_LENGTH”和“READ_CONTENT”两个状态。然后在IntegerHeaderFrameDecoder解码器的时候,设置初始状态为“MyDecoderState.READ_LENGTH”。在decode方法中,我们根据不同的状态来进行相应的操作:
一开始state为READ_LENGTH,则先进行消息头部分的数据获取,如果此时ByteBuf中的数据不足以获取到消息头的数据那么就会抛出一个Signal异常,由基类根据上面的逻辑进行readerIndex的重置;如果ByteBuf的数据足以获取到消息头,那么在获取到消息头的值后,执行『checkpoint(MyDecoderState.READ_CONTENT);』这步非常的重要,checkpoint方法会完成两个操作:① 将调用decode方法前记录的readerIndex初始值修改为当前ByteBuf的readerIndex值,② 将state状态修改为MyDecoderState.READ_CONTENT。然后继续state为MyDecoderState.READ_CONTENT情况的处理(注意,这里你会发现switch-case中没有break语句,所以流程会走到下一个状态)。这样一来,当ByteBuf中的数据不足以读取到完整的消息体的内容,基类在重置readerIndex的时候,不再是重置到读取消息头之前的位置了,而是重置到读取完消息头之后的位置。这样,当decode再次被调用时,我们就无需再解码一次消息头了,这时state()方法返回的值已经是MyDecoderState.READ_CONTENT(因为我们上面在解码完消息头后通过checkpoint方法设置了状态值为MyDecoderState.READ_CONTENT),流程也会从解码消息体开始继续进行。
后记
若文章有任何错误,望大家不吝指教:)
参考
《Netty in action》
圣思园《精通并发与Netty》