ReplayingDecoder
的原理
-
ReplayingDecoder
继承了ByteToMessageDecoder
,但是使用ReplayingDecoder
的好处在于:ReplayingDecoder
在处理数据时可以认为所有的数据(ByteBuf) 已经接收完毕,而不用判断接收数据的长度。
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
-
ReplayingDecoder
使用了特殊的ByteBuf
:ReplayingDecoderByteBuf
,当数据不够时会抛出一类特殊的错误,然后ReplayingDecoder
会重置readerIndex
并且再次调用decode
方法。 - 泛型
<S>
使用 枚举Enum
来表示状态,其内部存在状态管理。如果是无状态的,则使用Void
。
继承基类ByteToMessageDecoder
的方式
下面是一个用来解码带有长整型(Long
)数据头head的解码器:
public class LongHeaderFrameDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
//总字节数<8,不够Long的长度,返回
if (buf.readableBytes() < 8) {
return;
}
buf.markReaderIndex();
//读取head的值,例如6,说明body的长度是6个字节
int length = buf.readLong();
//body的总字节数不够6,返回
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
//读取6个长度的body
out.add(buf.readBytes(length));
}
}
从以上代码可以看出,在decode
方法中需要对数据的长度做判断,依据ByteBuf
的readerIndex
来获取真实数据,逻辑比较复杂。
继承基类ReplayingDecoder
的方式
如果以上的例子选择继承ReplayingDecoder
,那逻辑会非常简单。由于不存在状态管理,所以泛型使用Void
。
public class LongHeaderFrameDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
// 读取head的值,例如6,说明body的长度是6个字节
int length = buf.readLong();
// 读取6个长度的body
out.add(buf.readBytes(length));
}
}
状态管理和checkpoint
方法
状态可以使用枚举Enum
来表示,如:
public enum MyDecoderState {
READ_HEAD,
READ_BODY;
}
当调用checkpoint(MyDecoderState state)
时,ReplayingDecoder
会将当前readerIndex
赋值给int
类型的成员变量checkpoint
,在后续数据读取过程中方便重置。
protected void checkpoint(S state) {
checkpoint();
state(state);
}
protected void checkpoint() {
checkpoint = internalBuffer().readerIndex();
}
使用状态管理后的LongHeaderFrameDecoder
:
public class LongHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {
// HEAD的长度
private int length;
public LongHeaderFrameDecoder() {
// 初始状态是读取头部HEAD
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx,
ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_HEAD:
length = buf.readLong();
checkpoint(MyDecoderState.READ_BODY);
case READ_BODY:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_BODY);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}