注意:本文部分内容摘抄自 SOFABolt 源码解析系列文章(蚂蚁金服将会在最近不断推出 SOFA 系列的博文),大家可以关注微信公众号:金融级分布式架构 来得到通知。部分内容参考自:蚂蚁通信框架实践
类组成
- 编解码器工厂 -
工厂模式
- Codec 是编解码器工厂类接口
- RpcCodec 是针对 Rpc 场景下的编解码器工厂类
附:SOFABolt 高度提取了一些接口,是的我们可以自己去进行扩展,假设我们要做 mq 的编解码,可以模仿实现 RpcCodec 的实现方式- RpcCodec 用于创建 ProtocolCodeBasedEncoder 和 ProtocolCodeBasedDecoder 的子类 RpcProtocolDecoder,二者被设置为 netty 的编解码器 handler
- 编解码模板 -
模板模式
- MessageToByteEncoder 是 Netty 提供的一个编码模板
- AbstractBatchDecoder 是 SOFABolt hack 了 Netty 的 ByteToMessageDecoder 来提供一个解码模板(相较于 Netty 增加了
批量提交
的功能)
- 编解码代理类 -
代理模式
和策略模式
- ProtocolCodeBasedEncoder 是 CommandEncoder 的代理类,通过不同的 protocol 协议,指定使用不同的编码器
- ProtocolCodeBasedDecoder 是 CommandDecoder 的代理类,通过不同的 protocol 协议,指定使用不同的解码器
- RpcProtocolDecoder 是 ProtocolCodeBasedDecoder 的子类,针对于 Rpc 场景定制了 decodeProtocolVersion 方法
- 真正的解码器
- CommandEncoder 提供了编码接口
- CommandDecoder 提供了解码接口
- RpcCommandEncoder 提供了 RpcProtocol 协议数据的编码器
- RpcCommandEncoderV2 提供了 RpcProtocolV2 协议数据的编码器
- RpcCommandDecoder 提供了 RpcProtocol 协议数据的解码器
- RpcCommandDecoderV2 提供了 RpcProtocolV2 协议数据的解码器
一、编码分析
本质:序列化会将业务数据转化为 byte[],编码按照私有协议将 byte[] 写入到 ByteBuf 中
基本流程
- 判断传入的数据是否是 Serializable 类型(该类型由 MessageToByteEncoder 的泛型指定),如果不是,直接传播给 pipeline 中的下一个 handler;否则
- 创建一个 ByteBuf 实例,用于存储最终的编码数据
- 从 channel 的附加属性中获取协议标识 protocolCode,之后从协议管理器中获取相应的 Protocol 对象
- 从 Protocol 对象中获取相应的 CommandEncoder 实现类实例,使用 CommandEncoder 实现类实例按照 SOFABolt 源码分析18 - Protocol 私有协议的设计 所介绍的协议规则将数据写入到第二步创建好的 ByteBuf 实例中
- 如果原始数据是 ReferenceCounted 实现类,则释放原始数据
- 如果 ByteBuf 中有数据了,则传播给 pipeline 中的下一个 handler;否则,释放该 ByteBuf 对象,传递一个空的 ByteBuf 给下一个 handler
源码分析
======================= 编解码工厂 =======================
public class RpcCodec implements Codec {
@Override
public ChannelHandler newEncoder() {
return new ProtocolCodeBasedEncoder(ProtocolCode.fromBytes(RpcProtocolV2.PROTOCOL_CODE));
}
@Override
public ChannelHandler newDecoder() {
return new RpcProtocolDecoder(RpcProtocolManager.DEFAULT_PROTOCOL_CODE_LENGTH);
}
}
======================= 服务端 Netty 设置(客户端类似) =======================
// 编解码工厂
private Codec codec = new RpcCodec();
public class RpcServer {
@Override
protected void doInit() {
...
this.bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
...
// 创建解码器
pipeline.addLast("decoder", codec.newDecoder());
// 创建编码器
pipeline.addLast("encoder", codec.newEncoder());
...
}
}
}
======================= 编码模板 =======================
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 判断是否可编码(SOFABolt 中判断 msg instanceof Serializable)
if (acceptOutboundMessage(msg)) {
I cast = (I) msg;
// 分配 ByteBuffer,默认为 DirectByteBuf
buf = allocateBuffer(ctx, cast, preferDirect);
try {
// 调用子类进行编码
encode(ctx, cast, buf);
} finally {
// 编码结束后,如果原始数据是 ReferenceCounted 实现类,则释放原始数据
ReferenceCountUtil.release(cast);
}
// 如果 ByteBuf 中有数据了,则传播给 pipeline 中的下一个 handler
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
// 如果 ByteBuf 中没有数据,释放该 ByteBuf 对象,传递一个空的 ByteBuf 给下一个 handler
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果不可进行编码,直接传播给 pipeline 中的下一个 handler
ctx.write(msg, promise);
}
} finally {
if (buf != null) {
// 释放开始分配的 ByteBuf
buf.release();
}
}
}
private final TypeParameterMatcher matcher = TypeParameterMatcher.get(outboundMessageType); // 只要泛型 I 不是 Object,就是 ReflectiveMatcher 实例;如果 I 是 Object,则全部可编码
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
}
public abstract class TypeParameterMatcher {
private static final TypeParameterMatcher NOOP = new TypeParameterMatcher() {
@Override
public boolean match(Object msg) {
return true;
}
};
// parameterType 是 MessageToByteEncoder 的泛型
public static TypeParameterMatcher get(final Class<?> parameterType) {
...
// 从缓存获取,缓存有直接返回,没有则 new
TypeParameterMatcher matcher = getCache.get(parameterType);
if (matcher == null) {
// 如果泛型是 Object,全部可编码
if (parameterType == Object.class) {
matcher = NOOP;
} else {
// 如果泛型不是 Object,则使用 ReflectiveMatcher
matcher = new ReflectiveMatcher(parameterType);
}
// 设置到缓存
getCache.put(parameterType, matcher);
}
return matcher;
}
private static final class ReflectiveMatcher extends TypeParameterMatcher {
// 判断传递进来的将要编码的数据是不是 MessageToByteEncoder 的泛型类型的数据
@Override
public boolean match(Object msg) {
return type.isInstance(msg);
}
}
}
======================= 编码代理类 =======================
@ChannelHandler.Sharable
public class ProtocolCodeBasedEncoder extends MessageToByteEncoder<Serializable> {
// default protocol code, 默认是 RpcProtocolV2
protected ProtocolCode defaultProtocolCode;
public ProtocolCodeBasedEncoder(ProtocolCode defaultProtocolCode) {
super();
this.defaultProtocolCode = defaultProtocolCode;
}
@Override
protected void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
// 1. 从 Channel 获取 ProtocolCode 附属属性
Attribute<ProtocolCode> att = ctx.channel().attr(Connection.PROTOCOL);
ProtocolCode protocolCode;
// 如果为 null,使用默认协议 RpcProtocolV2;否则使用附属属性
if (att == null || att.get() == null) {
protocolCode = this.defaultProtocolCode;
} else {
protocolCode = att.get();
}
// 2. 从协议管理器中获取 ProtocolCode 相应的 Protocol 对象
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 3. 再从协议对象中获取相应的 CommandEncoder 实现类实例
protocol.getEncoder().encode(ctx, msg, out);
}
}
======================= 编码真实类(以简单的 RpcProtocol 为例) =======================
// Encode remoting command into ByteBuf.
public class RpcCommandEncoder implements CommandEncoder {
@Override
public void encode(ChannelHandlerContext ctx, Serializable msg, ByteBuf out) {
if (msg instanceof RpcCommand) {
/*
* ver: version for protocol
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* (req)timeout: request timeout.
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* cotentLen: length of content
* className
* header
* content
*/
RpcCommand cmd = (RpcCommand) msg;
out.writeByte(RpcProtocol.PROTOCOL_CODE); // protocol_code
out.writeByte(cmd.getType()); // RpcCommandType
out.writeShort(((RpcCommand) msg).getCmdCode().value()); // CommandCode
out.writeByte(cmd.getVersion()); // commandVersion
out.writeInt(cmd.getId()); // commandId
out.writeByte(cmd.getSerializer()); // serializer
if (cmd instanceof RequestCommand) {
//timeout
out.writeInt(((RequestCommand) cmd).getTimeout()); // timeout
}
if (cmd instanceof ResponseCommand) {
//response status
ResponseCommand response = (ResponseCommand) cmd;
out.writeShort(response.getResponseStatus().getValue()); // responseStatus
}
out.writeShort(cmd.getClazzLength());
out.writeShort(cmd.getHeaderLength());
out.writeInt(cmd.getContentLength());
if (cmd.getClazzLength() > 0) {
out.writeBytes(cmd.getClazz());
}
if (cmd.getHeaderLength() > 0) {
out.writeBytes(cmd.getHeader());
}
if (cmd.getContentLength() > 0) {
out.writeBytes(cmd.getContent());
}
}
}
}
注意事项
- 由 acceptOutboundMessage 可知,在 SOFABolt 中数据要想经过编码器的处理,必须实现 Serializable 接口。
- 编码器是无状态的,可以标注注解 @ChannelHandler.Sharable,见 ProtocolCodeBasedEncoder
二、解码分析
本质:将 byte[] 按照私有协议转化为中间数据,再通过反序列化将请求信息转化为业务数据
基本流程
- 创建或者从 Netty 的回收池中获取一个 RecyclableArrayList 实例,用于存储最终的解码数据
- 将传入的 ByteBuf 添加到 Cumulator 累加器实例中
- 之后
不断
的从 ByteBuf 中读取数据:首先解码出 protocolCode,之后从协议管理器中获取相应的协议对象,再从协议对象中获取相应的 CommandDecoder 实现类实例 -Netty 的 ByteToMessageDecoder 具备 accumulate 批量解包能力,可以尽可能的从 socket 里读取字节,然后同步调用 decode 方法,解码出业务对象,并组成一个 List
- 使用 CommandDecoder 实现类实例按照上文所介绍的协议规则进行解码,将解码好的数据放到 RecyclableArrayList 实例中,需要注意的是在解码之前必须先记录当前 ByteBuf 的 readerIndex,如果发现数据不够一个整包长度(发生了拆包粘包问题),则将当前 ByteBuf 的 readerIndex 复原到解码之前,然后直接返回,等待读取更多的数据
- 为了防止发送端发送数据太快导致OOM,会清理 Cumulator 累加器实例或者其空间,将已经读取的字节删除,向左压缩 ByteBuf 空间
- 判断 RecyclableArrayList 中的元素个数,如果是1个,则将这个元素单个发送给 pipeline 的下一个 handler;如果元素大于1个,则将整个 RecyclableArrayList 以 List 形式发送给 pipeline 的下一个 handler。- 这就是 SOFABolt 相较于 Netty 改进的地方,提供了
批量提交
的功能(Netty 本身的做法是循环遍历该 List ,依次提交到 ChannelPipeline 进行处理。Bolt 是将提交的内容从单个 command ,改为整个 List 一起提交,如此能减少 pipeline 的执行次数,同时提升吞吐量。这个模式在低并发场景,并没有什么优势,而在高并发场景下对提升吞吐量有不小的性能提升
),批量提交的数据会经过如下的处理:
@Sharable
public class RpcCommandHandler implements CommandHandler {
private void handle(RemotingContext ctx, Object msg) {
// 批量提交来的数据
if (msg instanceof List) {
final Runnable handleTask = new Runnable() {
public void run() {
// 循环处理
for (final Object m : (List<?>) msg) {
RpcCommandHandler.this.process(ctx, m);
}
}
};
// 批量提交的数据是否在 processorManager#defaultExecutor 中执行
// -Dbolt.rpc.dispatch-msg-list-in-default-executor=true
if (RpcConfigManager.dispatch_msg_list_in_default_executor()) {
// If msg is list ,then the batch submission to biz threadpool can save io thread.
processorManager.getDefaultExecutor().execute(handleTask);
} else {
handleTask.run();
}
} else {
// 处理单条数据
process(ctx, msg);
}
}
}
- 回收 RecyclableArrayList 实例
源码分析
======================= 解码模板 =======================
public abstract class AbstractBatchDecoder extends ChannelInboundHandlerAdapter {
/**
* 累加器常量
*/
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
// 将 in 累加到 cumulation
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
// 累加之后的 ByteBuf
ByteBuf buffer;
// 如果 cumulation 放不下 in 了,则进行扩容
// Expand cumulation (by replace it) when either there is not more room in the buffer
// or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
// duplicate().retain().
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
|| cumulation.refCnt() > 1) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
// 将 in 写入 buffer
buffer.writeBytes(in);
// 释放 in
in.release();
// 返回类加后的结果
return buffer;
}
};
// 累加 ByteBuf
ByteBuf cumulation;
// 当前 decoder 实例的累加器
private Cumulator cumulator = MERGE_CUMULATOR;
// 每调用一次 channelRead,只解码一个消息(默认为false,即批量尽可能多的对消息进行解码)
private boolean singleDecode;
private boolean decodeWasNull;
// 是否是第一次将 ByteBuf 添加到累加器
private boolean first;
// 连续调用 16 次 channelRead,而没有调用 channelReadComplete 时,进行 Bytebuf 的压缩,去掉已经读取的 byte,防止 OOM
private int discardAfterReads = 16;
private int numReads;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 如果是 ByteBuf,进行解码
if (msg instanceof ByteBuf) {
// 1. 创建或者从 netty 的回收池中获取一个 RecyclableArrayList 实例
RecyclableArrayList out = RecyclableArrayList.newInstance();
try {
// 2. 将传入的 ByteBuf 添加到 Cumulator 累加器实例中
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
// 将当前的 data 设置为 cumulation
cumulation = data;
} else {
// 将 data 累加到 cumulation 中
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 3 ~ 4
callDecode(ctx, cumulation, out);
} finally {
// 如果解码后的 cumulation 没有可读字节,直接释放
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
// If a remote peer writes fast enough it may take a long time to have fireChannelReadComplete(...) triggered.
// Because of this we need to take special care and ensure we try to discard some bytes if channelRead(...) is called to often in ByteToMessageDecoder.
} else if (++numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
// 查看解码后的消息列表大小
// 如果是一个,则直接传递单条消息
// 如果是多个,则直接传递消息列表
int size = out.size();
if (size == 0) {
decodeWasNull = true;
} else if (size == 1) {
ctx.fireChannelRead(out.get(0));
} else {
ArrayList<Object> ret = new ArrayList<Object>(size);
for (int i = 0; i < size; i++) {
ret.add(out.get(i));
}
ctx.fireChannelRead(ret);
}
// 回收 RecyclableArrayList
out.recycle();
}
} else {
// 如果不是 ByteBuf,直接传递
ctx.fireChannelRead(msg);
}
}
// 每次读取完成之后,进行 ByteBuf 压缩
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
ctx.fireChannelReadComplete();
}
protected final void discardSomeReadBytes() {
if (cumulation != null && !first && cumulation.refCnt() == 1) {
cumulation.discardSomeReadBytes();
}
}
// 将 in 解码到 out 中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// 循环不断的读取数据,批量解包
while (in.isReadable()) {
// 获取当前的 out 中的消息个数
int outSize = out.size();
//
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
// 如果解码后的 out 中的消息个数与原先的相等,即没有消息可读或者没有完整的消息了,后者继续循环,前者直接退出
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
// 如果一次只解码一个消息
if (isSingleDecode()) {
break;
}
}
}
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
// 获取旧的 cumulation
ByteBuf oldCumulation = cumulation;
// 分配一个正好可以容纳老的 oldCumulation 已有字节 + readable 字节个数的 ByteBuf
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
// 将老的 oldCumulation 写入新分配的 cumulation
cumulation.writeBytes(oldCumulation);
// 释放老的 oldCumulation
oldCumulation.release();
// 返回新的 cumulation
return cumulation;
}
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out);
}
======================= 解码代理类 =======================
public class ProtocolCodeBasedDecoder extends AbstractBatchDecoder {
/** by default, suggest design a single byte for protocol version. */
public static final int DEFAULT_PROTOCOL_VERSION_LENGTH = 1;
/** protocol version should be a positive number, we use -1 to represent illegal */
public static final int DEFAULT_ILLEGAL_PROTOCOL_VERSION_LENGTH = -1;
/** the length of protocol code,默认为 1 */
protected int protocolCodeLength;
protected ProtocolCode decodeProtocolCode(ByteBuf in) {
// 从 in 的 readerIndex 开始读取 protocolCodeBytes.length(默认为 1)个字节
// 即只解码 protocolCode
if (in.readableBytes() >= protocolCodeLength) {
byte[] protocolCodeBytes = new byte[protocolCodeLength];
in.readBytes(protocolCodeBytes);
return ProtocolCode.fromBytes(protocolCodeBytes);
}
return null;
}
// RpcProtocolDecoder#decodeProtocolVersion
protected byte decodeProtocolVersion(ByteBuf in) {
// 恢复到 decode 中开始设置的读指针
in.resetReaderIndex();
if (in.readableBytes() >= protocolCodeLength + 1) {
byte rpcProtocolCodeByte = in.readByte();
// 如果 ProtocolCode>=2,则继续读取 version,否则不读取(即只读取 RpcProtocolV2 及以上的 protocolVersion)
if (rpcProtocolCodeByte >= 2) {
return in.readByte();
}
}
return -1;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 标记当前的读指针
in.markReaderIndex();
// 解码出 protocol_code
ProtocolCode protocolCode = decodeProtocolCode(in);
if (null != protocolCode) {
// 解码出 protocol_version
byte protocolVersion = decodeProtocolVersion(in);
...
// 根据 protocolCode 获取 Protocol
Protocol protocol = ProtocolManager.getProtocol(protocolCode);
// 恢复读指针到解析 protocolCode 之前
in.resetReaderIndex();
// 根据
protocol.getDecoder().decode(ctx, in, out);
}
}
}
======================= 解码真实类(以简单的 RpcProtocol 为例) =======================
public class RpcCommandDecoder implements CommandDecoder {
// 获取 RpcProtocol 协议下的响应头(20)和请求头(22)的长度,即为 20
private int lessLen;
{
lessLen = RpcProtocol.getResponseHeaderLength() < RpcProtocol.getRequestHeaderLength() ? RpcProtocol
.getResponseHeaderLength() : RpcProtocol.getRequestHeaderLength();
}
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// the less length between response header and request header
if (in.readableBytes() >= lessLen) {
// 标记当前读指针的位置
in.markReaderIndex();
// 读取 protocolCode
byte protocol = in.readByte();
// 恢复读指针到读取 protocolCode 之前
in.resetReaderIndex();
if (protocol == RpcProtocol.PROTOCOL_CODE) {
/*
* code: protocolCode
* type: request/response/request oneway
* cmdcode: code for remoting command
* ver2:version for remoting command
* requestId: id of request
* codec: code for codec
* (req)timeout: request timeout
* (resp)respStatus: response status
* classLen: length of request or response class name
* headerLen: length of header
* contentLen: length of content
* className
* header
* content
*/
if (in.readableBytes() > 2) {
in.markReaderIndex();
// 读取 protocol_code
in.readByte();
// 读取 type
byte type = in.readByte();
// 解码请求
if (type == RpcCommandType.REQUEST || type == RpcCommandType.REQUEST_ONEWAY) {
if (in.readableBytes() >= RpcProtocol.getRequestHeaderLength() - 2) {
// CommandCode :请求命令类型,request / response / heartbeat
short cmdCode = in.readShort();
// CommandVersion
byte ver2 = in.readByte();
// 请求ID
int requestId = in.readInt();
// 序列化器
byte serializer = in.readByte();
int timeout = in.readInt();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
if (in.readableBytes() >= classLen + headerLen + contentLen) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
// 解码内容,注意此时解码出来的都是一个 byte[],需要序列化才能转化为真正对象
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
} else {
// 不够一个完整包,返回,等待累加器类架构足够的内容
in.resetReaderIndex();
return;
}
RequestCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
// 如果是心跳请求消息,直接创建一个 HeartbeatCommand
command = new HeartbeatCommand();
} else {
// 如果是正常请求,创建一个 RpcRequestCommand
command = createRequestCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setTimeout(timeout);
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
out.add(command);
} else {
in.resetReaderIndex();
}
// 解码响应
} else if (type == RpcCommandType.RESPONSE) {
//decode response
if (in.readableBytes() >= RpcProtocol.getResponseHeaderLength() - 2) {
short cmdCode = in.readShort();
byte ver2 = in.readByte();
int requestId = in.readInt();
byte serializer = in.readByte();
short status = in.readShort();
short classLen = in.readShort();
short headerLen = in.readShort();
int contentLen = in.readInt();
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
if (in.readableBytes() >= classLen + headerLen + contentLen) {
if (classLen > 0) {
clazz = new byte[classLen];
in.readBytes(clazz);
}
if (headerLen > 0) {
header = new byte[headerLen];
in.readBytes(header);
}
if (contentLen > 0) {
content = new byte[contentLen];
in.readBytes(content);
}
} else {// not enough data
in.resetReaderIndex();
return;
}
ResponseCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatAckCommand();
} else {
command = createResponseCommand(cmdCode);
}
command.setType(type);
command.setVersion(ver2);
command.setId(requestId);
command.setSerializer(serializer);
command.setResponseStatus(ResponseStatus.valueOf(status));
command.setClazz(clazz);
command.setHeader(header);
command.setContent(content);
command.setResponseTimeMillis(System.currentTimeMillis());
command.setResponseHost((InetSocketAddress) ctx.channel()
.remoteAddress());
out.add(command);
} else {
in.resetReaderIndex();
}
}
}
}
}
}
private ResponseCommand createResponseCommand(short cmdCode) {
ResponseCommand command = new RpcResponseCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
return command;
}
private RpcRequestCommand createRequestCommand(short cmdCode) {
RpcRequestCommand command = new RpcRequestCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
command.setArriveTime(System.currentTimeMillis());
return command;
}
}
注意事项
- 解码器是有状态的(含有 累加器),不可标注注解 @ChannelHandler.Sharable
- 解码器解决的很重要的一个问题就是 tcp 的拆包粘包问题,最常见的解决方案是
基于变长消息协议:每一个消息分为消息头和消息体两部分,在编码时,将消息体的长度设置到消息头部,在解码的时候,首先解析出消息头部的长度信息,之后拆分或合并出该长度的消息体。