Netty源码分析系列--13.ReplayingDecoder

ReplayingDecoder的原理

  • ReplayingDecoder继承了ByteToMessageDecoder,但是使用ReplayingDecoder的好处在于:ReplayingDecoder在处理数据时可以认为所有的数据(ByteBuf) 已经接收完毕,而不用判断接收数据的长度。

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder
  • ReplayingDecoder使用了特殊的ByteBufReplayingDecoderByteBuf,当数据不够时会抛出一类特殊的错误,然后ReplayingDecoder会重置readerIndex并且再次调用decode方法。
  • 泛型<S>使用 枚举Enum 来表示状态,其内部存在状态管理。如果是无状态的,则使用 Void

继承基类ByteToMessageDecoder的方式

下面是一个用来解码带有长整型(Long)数据头head的解码器:

public class LongHeaderFrameDecoder extends ByteToMessageDecoder {
  
    @Override
    protected void decode(ChannelHandlerContext ctx,
                         ByteBuf buf, List<Object> out) throws Exception {

        //总字节数<8,不够Long的长度,返回
        if (buf.readableBytes() < 8) {
          return;
        }
        
        buf.markReaderIndex();
        //读取head的值,例如6,说明body的长度是6个字节
        int length = buf.readLong();
        
        //body的总字节数不够6,返回
        if (buf.readableBytes() < length) {
          buf.resetReaderIndex();
          return;
        }
        
        //读取6个长度的body
        out.add(buf.readBytes(length));
    }
}

从以上代码可以看出,在decode方法中需要对数据的长度做判断,依据ByteBufreaderIndex来获取真实数据,逻辑比较复杂。

继承基类ReplayingDecoder的方式

如果以上的例子选择继承ReplayingDecoder,那逻辑会非常简单。由于不存在状态管理,所以泛型使用Void

public class LongHeaderFrameDecoder extends ReplayingDecoder<Void> {
  
    @Override
    protected void decode(ChannelHandlerContext ctx,
                         ByteBuf buf, List<Object> out) throws Exception {
        // 读取head的值,例如6,说明body的长度是6个字节
        int length = buf.readLong();   
        // 读取6个长度的body
        out.add(buf.readBytes(length));
    }
}

状态管理和checkpoint方法

状态可以使用枚举Enum来表示,如:

public enum MyDecoderState {
     READ_HEAD,
     READ_BODY;
}

当调用checkpoint(MyDecoderState state)时,ReplayingDecoder会将当前readerIndex赋值给int类型的成员变量checkpoint,在后续数据读取过程中方便重置。

protected void checkpoint(S state) {
    checkpoint();
    state(state);
}

protected void checkpoint() {
    checkpoint = internalBuffer().readerIndex();
}

使用状态管理后的LongHeaderFrameDecoder:

public class LongHeaderFrameDecoder
        extends ReplayingDecoder<MyDecoderState> {
     // HEAD的长度
     private int length;
    
     public LongHeaderFrameDecoder() {
       // 初始状态是读取头部HEAD
       super(MyDecoderState.READ_LENGTH);
     }
    
      @Override
     protected void decode(ChannelHandlerContext ctx,
                             ByteBuf buf, List<Object> out) throws Exception {
       switch (state()) {
           case READ_HEAD:
             length = buf.readLong();
             checkpoint(MyDecoderState.READ_BODY);
           case READ_BODY:
             ByteBuf frame = buf.readBytes(length);
             checkpoint(MyDecoderState.READ_BODY);
             out.add(frame);
             break;
           default:
             throw new Error("Shouldn't reach here.");
           }
     }
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容