SOFABolt 源码分析19 - Codec 编解码设计

注意:本文部分内容摘抄自 SOFABolt 源码解析系列文章(蚂蚁金服将会在最近不断推出 SOFA 系列的博文),大家可以关注微信公众号:金融级分布式架构 来得到通知。部分内容参考自:蚂蚁通信框架实践

image.png

类组成

  • 编解码器工厂 - 工厂模式
  • Codec 是编解码器工厂类接口
  • RpcCodec 是针对 Rpc 场景下的编解码器工厂类
    附:SOFABolt 高度提取了一些接口,是的我们可以自己去进行扩展,假设我们要做 mq 的编解码,可以模仿实现 RpcCodec 的实现方式
  • RpcCodec 用于创建 ProtocolCodeBasedEncoder 和 ProtocolCodeBasedDecoder 的子类 RpcProtocolDecoder,二者被设置为 netty 的编解码器 handler
  • 编解码模板 - 模板模式
  • MessageToByteEncoder 是 Netty 提供的一个编码模板
  • AbstractBatchDecoder 是 SOFABolt hack 了 Netty 的 ByteToMessageDecoder 来提供一个解码模板(相较于 Netty 增加了批量提交的功能)
  • 编解码代理类 - 代理模式策略模式
  • ProtocolCodeBasedEncoder 是 CommandEncoder 的代理类,通过不同的 protocol 协议,指定使用不同的编码器
  • ProtocolCodeBasedDecoder 是 CommandDecoder 的代理类,通过不同的 protocol 协议,指定使用不同的解码器
  • RpcProtocolDecoder 是 ProtocolCodeBasedDecoder 的子类,针对于 Rpc 场景定制了 decodeProtocolVersion 方法
  • 真正的解码器
  • CommandEncoder 提供了编码接口
  • CommandDecoder 提供了解码接口
  • RpcCommandEncoder 提供了 RpcProtocol 协议数据的编码器
  • RpcCommandEncoderV2 提供了 RpcProtocolV2 协议数据的编码器
  • RpcCommandDecoder 提供了 RpcProtocol 协议数据的解码器
  • RpcCommandDecoderV2 提供了 RpcProtocolV2 协议数据的解码器

一、编码分析

本质:序列化会将业务数据转化为 byte[],编码按照私有协议将 byte[] 写入到 ByteBuf 中

基本流程

  1. 判断传入的数据是否是 Serializable 类型(该类型由 MessageToByteEncoder 的泛型指定),如果不是,直接传播给 pipeline 中的下一个 handler;否则
  2. 创建一个 ByteBuf 实例,用于存储最终的编码数据
  3. 从 channel 的附加属性中获取协议标识 protocolCode,之后从协议管理器中获取相应的 Protocol 对象
  4. 从 Protocol 对象中获取相应的 CommandEncoder 实现类实例,使用 CommandEncoder 实现类实例按照 SOFABolt 源码分析18 - Protocol 私有协议的设计 所介绍的协议规则将数据写入到第二步创建好的 ByteBuf 实例中
  5. 如果原始数据是 ReferenceCounted 实现类,则释放原始数据
  6. 如果 ByteBuf 中有数据了,则传播给 pipeline 中的下一个 handler;否则,释放该 ByteBuf 对象,传递一个空的 ByteBuf 给下一个 handler

源码分析

======================= 编解码工厂 =======================
public class RpcCodec implements Codec {
    @Override
    public ChannelHandler newEncoder() {
        return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
    }
    @Override
    public ChannelHandler newDecoder() {
        return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
    }
}

======================= 服务端 Netty 设置(客户端类似) =======================
// 编解码工厂
private Codec codec = new RpcCodec();
public class RpcServer {
    @Override
    protected void doInit() {
        ...
        this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) {
                ...
                // 创建解码器
                pipeline.addLast("decoder", codec.newDecoder());
                // 创建编码器
                pipeline.addLast("encoder", codec.newEncoder());
                ...
            }
    }
}

======================= 编码模板 =======================
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            // 判断是否可编码(SOFABolt 中判断 msg instanceof Serializable)
            if (acceptOutboundMessage(msg)) {
                I cast = (I) msg;
                // 分配 ByteBuffer,默认为 DirectByteBuf
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    // 调用子类进行编码
                    encode(ctx, cast, buf);
                } finally {
                    // 编码结束后,如果原始数据是 ReferenceCounted 实现类,则释放原始数据
                    ReferenceCountUtil.release(cast);
                }
                // 如果 ByteBuf 中有数据了,则传播给 pipeline 中的下一个 handler
                if (buf.isReadable()) {
                    ctx.write(buf, promise);
                } else {
                    // 如果 ByteBuf 中没有数据,释放该 ByteBuf 对象,传递一个空的 ByteBuf 给下一个 handler
                    buf.release();
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 如果不可进行编码,直接传播给 pipeline 中的下一个 handler
                ctx.write(msg, promise);
            }
        } finally {
            if (buf != null) {
                // 释放开始分配的 ByteBuf
                buf.release();
            }
        }
    }
    private final TypeParameterMatcher matcher = TypeParameterMatcher.get(outboundMessageType); // 只要泛型 I 不是 Object,就是 ReflectiveMatcher 实例;如果 I 是 Object,则全部可编码

    public boolean acceptOutboundMessage(Object msg) throws Exception {
        return matcher.match(msg);
    }
}

public abstract class TypeParameterMatcher {
    private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() {
        @Override
        public boolean match(Object msg) {
            return true;
        }
    };
    // parameterType 是 MessageToByteEncoder 的泛型
    public static TypeParameterMatcher get(final Class<?> parameterType) {
        ...
        // 从缓存获取,缓存有直接返回,没有则 new
        TypeParameterMatcher matcher = getCache.get(parameterType);
        if (matcher == null) {
            // 如果泛型是 Object,全部可编码
            if (parameterType == Object.class) {
                matcher = NOOP;
            } else {
                // 如果泛型不是 Object,则使用 ReflectiveMatcher
                matcher = new ReflectiveMatcher(parameterType);
            }
            // 设置到缓存
            getCache.put(parameterType, matcher);
        }
        return matcher;
    }

    private static final class ReflectiveMatcher extends TypeParameterMatcher {
        // 判断传递进来的将要编码的数据是不是 MessageToByteEncoder 的泛型类型的数据
        @Override
        public boolean match(Object msg) {
            return type.isInstance(msg);
        }
    }
}

======================= 编码代理类 =======================
@ChannelHandler.Sharable
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {

    // default protocol code, 默认是 RpcProtocolV2
    protected ProtocolCode defaultProtocolCode;

    public ProtocolCodeBasedEncoder(ProtocolCode defaultProtocolCode) {
        super();
        this.defaultProtocolCode = defaultProtocolCode;
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
        // 1. 从 Channel 获取 ProtocolCode 附属属性
        Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
        ProtocolCode protocolCode;
        // 如果为 null,使用默认协议 RpcProtocolV2;否则使用附属属性
        if (att == null || att.get() == null) {
            protocolCode = this.defaultProtocolCode;
        } else {
            protocolCode = att.get();
        }
        // 2. 从协议管理器中获取 ProtocolCode 相应的 Protocol 对象
        Protocol protocol = ProtocolManager.getProtocol(protocolCode);
        // 3. 再从协议对象中获取相应的 CommandEncoder 实现类实例
        protocol.getEncoder().encode(ctx, msg, out);
    }
}

======================= 编码真实类(以简单的 RpcProtocol 为例) =======================
// Encode remoting command into ByteBuf.
public class RpcCommandEncoder implements CommandEncoder {
    @Override
    public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
        if (msg instanceof RpcCommand) {
            /*
             * ver: version for protocol
             * type: request/response/request oneway
             * cmdcode: code for remoting command
             * ver2:version for remoting command
             * requestId: id of request
             * codec: code for codec
             * (req)timeout: request timeout.
             * (resp)respStatus: response status
             * classLen: length of request or response class name
             * headerLen: length of header
             * cotentLen: length of content
             * className
             * header
             * content
             */
            RpcCommand cmd = (RpcCommand) msg;
            out.writeByte(RpcProtocol.PROTOCOL_CODE); // protocol_code
            out.writeByte(cmd.getType()); // RpcCommandType
            out.writeShort(((RpcCommand) msg).getCmdCode().value()); // CommandCode
            out.writeByte(cmd.getVersion()); // commandVersion
            out.writeInt(cmd.getId()); // commandId
            out.writeByte(cmd.getSerializer()); // serializer
            if (cmd instanceof RequestCommand) {
                //timeout
                out.writeInt(((RequestCommand) cmd).getTimeout()); // timeout
            }
            if (cmd instanceof ResponseCommand) {
                //response status
                ResponseCommand response = (ResponseCommand) cmd;
                out.writeShort(response.getResponseStatus().getValue()); // responseStatus
            }
            out.writeShort(cmd.getClazzLength());
            out.writeShort(cmd.getHeaderLength());
            out.writeInt(cmd.getContentLength());
            if (cmd.getClazzLength() > 0) {
                out.writeBytes(cmd.getClazz());
            }
            if (cmd.getHeaderLength() > 0) {
                out.writeBytes(cmd.getHeader());
            }
            if (cmd.getContentLength() > 0) {
                out.writeBytes(cmd.getContent());
            }
        }
    }
}

注意事项

  • 由 acceptOutboundMessage 可知,在 SOFABolt 中数据要想经过编码器的处理,必须实现 Serializable 接口。
  • 编码器是无状态的,可以标注注解 @ChannelHandler.Sharable,见 ProtocolCodeBasedEncoder

二、解码分析

本质:将 byte[] 按照私有协议转化为中间数据,再通过反序列化将请求信息转化为业务数据

基本流程

  1. 创建或者从 Netty 的回收池中获取一个 RecyclableArrayList 实例,用于存储最终的解码数据
  2. 将传入的 ByteBuf 添加到 Cumulator 累加器实例中
  3. 之后不断的从 ByteBuf 中读取数据:首先解码出 protocolCode,之后从协议管理器中获取相应的协议对象,再从协议对象中获取相应的 CommandDecoder 实现类实例 - Netty 的 ByteToMessageDecoder 具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List
  4. 使用 CommandDecoder 实现类实例按照上文所介绍的协议规则进行解码,将解码好的数据放到 RecyclableArrayList 实例中,需要注意的是在解码之前必须先记录当前 ByteBuf 的 readerIndex,如果发现数据不够一个整包长度(发生了拆包粘包问题),则将当前 ByteBuf 的 readerIndex 复原到解码之前,然后直接返回,等待读取更多的数据
  5. 为了防止发送端发送数据太快导致OOM,会清理 Cumulator 累加器实例或者其空间,将已经读取的字节删除,向左压缩 ByteBuf 空间
  6. 判断 RecyclableArrayList 中的元素个数,如果是1个,则将这个元素单个发送给 pipeline 的下一个 handler;如果元素大于1个,则将整个 RecyclableArrayList 以 List 形式发送给 pipeline 的下一个 handler。- 这就是 SOFABolt 相较于 Netty 改进的地方,提供了批量提交的功能(Netty 本身的做法是循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。Bolt 是将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升),批量提交的数据会经过如下的处理:
@Sharable
public class RpcCommandHandler implements CommandHandler {
    private void handle(RemotingContext ctx, Object msg) {
        // 批量提交来的数据
        if (msg instanceof List) {
            final Runnable handleTask = new Runnable() {
                public void run() {
                    // 循环处理
                    for (final Object m : (List<?>) msg) {
                        RpcCommandHandler.this.process(ctx, m);
                    }
                }
            };
            // 批量提交的数据是否在 processorManager#defaultExecutor 中执行
            // -Dbolt.rpc.dispatch-msg-list-in-default-executor=true
            if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
                // If msg is list ,then the batch submission to biz threadpool can save io thread.
                processorManager.getDefaultExecutor().execute(handleTask);
            } else {
                handleTask.run();
            }
        } else {
            // 处理单条数据
            process(ctx, msg);
        }
    }
}
  1. 回收 RecyclableArrayList 实例

源码分析

======================= 解码模板 =======================
public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter {
    /**
     * 累加器常量
     */
    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
        // 将 in 累加到 cumulation
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            // 累加之后的 ByteBuf
            ByteBuf buffer;
            // 如果 cumulation 放不下 in 了,则进行扩容
            // Expand cumulation (by replace it) when either there is not more room in the buffer
            // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
            // duplicate().retain().
            if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                    || cumulation.refCnt() > 1) {
                buffer = expandCumulation(alloc, cumulation, in.readableBytes());
            } else {
                buffer = cumulation;
            }
            // 将 in 写入 buffer
            buffer.writeBytes(in);
            // 释放 in
            in.release();
            // 返回类加后的结果
            return buffer;
        }
    };

    // 累加 ByteBuf
    ByteBuf cumulation;
    // 当前 decoder 实例的累加器
    private Cumulator cumulator = MERGE_CUMULATOR;
    // 每调用一次 channelRead,只解码一个消息(默认为false,即批量尽可能多的对消息进行解码)
    private boolean singleDecode;
    private boolean decodeWasNull;
    // 是否是第一次将 ByteBuf 添加到累加器
    private boolean first;
    // 连续调用 16 次 channelRead,而没有调用 channelReadComplete 时,进行 Bytebuf 的压缩,去掉已经读取的 byte,防止 OOM
    private int discardAfterReads = 16;
    private int numReads;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 如果是 ByteBuf,进行解码
        if (msg instanceof ByteBuf) {
            // 1. 创建或者从 netty 的回收池中获取一个 RecyclableArrayList 实例
            RecyclableArrayList out = RecyclableArrayList.newInstance();
            try {
                // 2. 将传入的 ByteBuf 添加到 Cumulator 累加器实例中
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    // 将当前的 data 设置为 cumulation
                    cumulation = data;
                } else {
                    // 将 data 累加到 cumulation 中
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                // 3 ~ 4
                callDecode(ctx, cumulation, out);
            } finally {
                // 如果解码后的 cumulation 没有可读字节,直接释放
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                    // If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered.
                    // Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder.
                } else if (++numReads >= discardAfterReads) {
                    numReads = 0;
                    discardSomeReadBytes();
                }

                // 查看解码后的消息列表大小
                // 如果是一个,则直接传递单条消息
                // 如果是多个,则直接传递消息列表
                int size = out.size();
                if (size == 0) {
                    decodeWasNull = true;
                } else if (size == 1) {
                    ctx.fireChannelRead(out.get(0));
                } else {
                    ArrayList<Object> ret = new ArrayList<Object>(size);
                    for (int i = 0; i < size; i++) {
                        ret.add(out.get(i));
                    }
                    ctx.fireChannelRead(ret);
                }
                // 回收 RecyclableArrayList
                out.recycle();
            }
        } else {
            // 如果不是 ByteBuf,直接传递
            ctx.fireChannelRead(msg);
        }
    }

    // 每次读取完成之后,进行 ByteBuf 压缩
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        numReads = 0;
        discardSomeReadBytes();
        ctx.fireChannelReadComplete();
    }

    protected final void discardSomeReadBytes() {
        if (cumulation != null && !first && cumulation.refCnt() == 1) {
            cumulation.discardSomeReadBytes();
        }
    }

    // 将 in 解码到 out 中
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        // 循环不断的读取数据,批量解包
        while (in.isReadable()) {
            // 获取当前的 out 中的消息个数
            int outSize = out.size();
            //
            int oldInputLength = in.readableBytes();
            decode(ctx, in, out);

            // 如果解码后的 out 中的消息个数与原先的相等,即没有消息可读或者没有完整的消息了,后者继续循环,前者直接退出
            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            // 如果一次只解码一个消息
            if (isSingleDecode()) {
                break;
            }
        }
    }

    static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
        // 获取旧的 cumulation
        ByteBuf oldCumulation = cumulation;
        // 分配一个正好可以容纳老的 oldCumulation 已有字节 + readable 字节个数的 ByteBuf 
        cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
        // 将老的 oldCumulation 写入新分配的 cumulation
        cumulation.writeBytes(oldCumulation);
        // 释放老的 oldCumulation
        oldCumulation.release();
        // 返回新的 cumulation
        return cumulation;
    }

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);
}

======================= 解码代理类 =======================
public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
    /** by default, suggest design a single byte for protocol version. */
    public static final int DEFAULT_PROTOCOL_VERSION_LENGTH         = 1;
    /** protocol version should be a positive number, we use -1 to represent illegal */
    public static final int DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH = -1;

    /** the length of protocol code,默认为 1 */
    protected int           protocolCodeLength;

    protected ProtocolCode decodeProtocolCode(ByteBuf in) {
        // 从 in 的 readerIndex 开始读取 protocolCodeBytes.length(默认为 1)个字节
        // 即只解码 protocolCode
        if (in.readableBytes() >= protocolCodeLength) {
            byte[] protocolCodeBytes = new byte[protocolCodeLength];
            in.readBytes(protocolCodeBytes);
            return ProtocolCode.fromBytes(protocolCodeBytes);
        }
        return null;
    }

    // RpcProtocolDecoder#decodeProtocolVersion
    protected byte decodeProtocolVersion(ByteBuf in) {
        // 恢复到 decode 中开始设置的读指针
        in.resetReaderIndex();
        if (in.readableBytes() >= protocolCodeLength + 1) {
            byte rpcProtocolCodeByte = in.readByte();
            // 如果 ProtocolCode>=2,则继续读取 version,否则不读取(即只读取 RpcProtocolV2 及以上的 protocolVersion)
            if (rpcProtocolCodeByte >= 2) {
                return in.readByte();
            }
        }
        return -1;
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 标记当前的读指针
        in.markReaderIndex();
        // 解码出 protocol_code
        ProtocolCode protocolCode = decodeProtocolCode(in); 
        if (null != protocolCode) {
            // 解码出 protocol_version
            byte protocolVersion = decodeProtocolVersion(in); 
            ...
            // 根据 protocolCode 获取 Protocol
            Protocol protocol = ProtocolManager.getProtocol(protocolCode);
            // 恢复读指针到解析 protocolCode 之前
            in.resetReaderIndex();
            // 根据 
            protocol.getDecoder().decode(ctx, in, out);
        }
    }
}

======================= 解码真实类(以简单的 RpcProtocol 为例) =======================
public class RpcCommandDecoder implements CommandDecoder {
    // 获取 RpcProtocol 协议下的响应头(20)和请求头(22)的长度,即为 20
    private int lessLen;
    {
        lessLen = RpcProtocol.getResponseHeaderLength() < RpcProtocol.getRequestHeaderLength() ? RpcProtocol
            .getResponseHeaderLength() : RpcProtocol.getRequestHeaderLength();
    }

    @Override
    public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // the less length between response header and request header
        if (in.readableBytes() >= lessLen) {
            // 标记当前读指针的位置
            in.markReaderIndex();
            // 读取 protocolCode
            byte protocol = in.readByte();
            // 恢复读指针到读取 protocolCode 之前
            in.resetReaderIndex();
            if (protocol == RpcProtocol.PROTOCOL_CODE) {
                /*
                 * code: protocolCode
                 * type: request/response/request oneway
                 * cmdcode: code for remoting command
                 * ver2:version for remoting command
                 * requestId: id of request
                 * codec: code for codec
                 * (req)timeout: request timeout
                 * (resp)respStatus: response status
                 * classLen: length of request or response class name
                 * headerLen: length of header
                 * contentLen: length of content
                 * className
                 * header
                 * content
                 */
                if (in.readableBytes() > 2) {
                    in.markReaderIndex();
                    // 读取 protocol_code
                    in.readByte();
                    // 读取 type
                    byte type = in.readByte();
                    // 解码请求
                    if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
                        if (in.readableBytes() >= RpcProtocol.getRequestHeaderLength() - 2) {
                            // CommandCode :请求命令类型,request / response / heartbeat
                            short cmdCode = in.readShort();
                            // CommandVersion
                            byte ver2 = in.readByte();
                            // 请求ID
                            int requestId = in.readInt();
                            // 序列化器
                            byte serializer = in.readByte();
                            int timeout = in.readInt();
                            short classLen = in.readShort();
                            short headerLen = in.readShort();
                            int contentLen = in.readInt();
                            byte[] clazz = null;
                            byte[] header = null;
                            byte[] content = null;
                            if (in.readableBytes() >= classLen + headerLen + contentLen) {
                                if (classLen > 0) {
                                    clazz = new byte[classLen];
                                    in.readBytes(clazz);
                                }
                                if (headerLen > 0) {
                                    header = new byte[headerLen];
                                    in.readBytes(header);
                                }
                                // 解码内容,注意此时解码出来的都是一个 byte[],需要序列化才能转化为真正对象
                                if (contentLen > 0) {
                                    content = new byte[contentLen];
                                    in.readBytes(content);
                                }
                            } else {
                                // 不够一个完整包,返回,等待累加器类架构足够的内容
                                in.resetReaderIndex();
                                return;
                            }
                            RequestCommand command;
                            if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
                                // 如果是心跳请求消息,直接创建一个 HeartbeatCommand
                                command = new HeartbeatCommand();
                            } else {
                                // 如果是正常请求,创建一个 RpcRequestCommand
                                command = createRequestCommand(cmdCode);
                            }
                            command.setType(type);
                            command.setVersion(ver2);
                            command.setId(requestId);
                            command.setSerializer(serializer);
                            command.setTimeout(timeout);
                            command.setClazz(clazz);
                            command.setHeader(header);
                            command.setContent(content);
                            out.add(command);

                        } else {
                            in.resetReaderIndex();
                        }
                    // 解码响应
                    } else if (type == RpcCommandType.RESPONSE) {
                        //decode response
                        if (in.readableBytes() >= RpcProtocol.getResponseHeaderLength() - 2) {
                            short cmdCode = in.readShort();
                            byte ver2 = in.readByte();
                            int requestId = in.readInt();
                            byte serializer = in.readByte();
                            short status = in.readShort();
                            short classLen = in.readShort();
                            short headerLen = in.readShort();
                            int contentLen = in.readInt();
                            byte[] clazz = null;
                            byte[] header = null;
                            byte[] content = null;
                            if (in.readableBytes() >= classLen + headerLen + contentLen) {
                                if (classLen > 0) {
                                    clazz = new byte[classLen];
                                    in.readBytes(clazz);
                                }
                                if (headerLen > 0) {
                                    header = new byte[headerLen];
                                    in.readBytes(header);
                                }
                                if (contentLen > 0) {
                                    content = new byte[contentLen];
                                    in.readBytes(content);
                                }
                            } else {// not enough data
                                in.resetReaderIndex();
                                return;
                            }
                            ResponseCommand command;
                            if (cmdCode == CommandCode.HEARTBEAT_VALUE) {

                                command = new HeartbeatAckCommand();
                            } else {
                                command = createResponseCommand(cmdCode);
                            }
                            command.setType(type);
                            command.setVersion(ver2);
                            command.setId(requestId);
                            command.setSerializer(serializer);
                            command.setResponseStatus(ResponseStatus.valueOf(status));
                            command.setClazz(clazz);
                            command.setHeader(header);
                            command.setContent(content);
                            command.setResponseTimeMillis(System.currentTimeMillis());
                            command.setResponseHost((InetSocketAddress) ctx.channel()
                                .remoteAddress());
                            out.add(command);
                        } else {
                            in.resetReaderIndex();
                        }
                    }
                }
            }
        }
    }

    private ResponseCommand createResponseCommand(short cmdCode) {
        ResponseCommand command = new RpcResponseCommand();
        command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
        return command;
    }

    private RpcRequestCommand createRequestCommand(short cmdCode) {
        RpcRequestCommand command = new RpcRequestCommand();
        command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
        command.setArriveTime(System.currentTimeMillis());
        return command;
    }

}

注意事项

  • 解码器是有状态的(含有 累加器),不可标注注解 @ChannelHandler.Sharable
  • 解码器解决的很重要的一个问题就是 tcp 的拆包粘包问题,最常见的解决方案是

基于变长消息协议:每一个消息分为消息头和消息体两部分,在编码时,将消息体的长度设置到消息头部,在解码的时候,首先解析出消息头部的长度信息,之后拆分或合并出该长度的消息体。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容