ByteToMessageDecoder是解码的基类
类的属性介绍
1.Cumulator :累加器,有COMPOSITE_CUMULATOR和MERGE_CUMULATOR,默认采用后者,第一个是混合存储相当于把ByteBuf加入到CompositeByteBuf。第二个通过内存拷贝把数据都放入一个ByteBuf里面
2.构造函数:是禁止在channel之间共享一个ByteToMessageDecoder
3.setSingleDecode方法会使得ByteToMessageDecoder的channelRead 每次只会解码一条消息
channelRead方法如下
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
是基于线程的解码结果列表
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
如果是第一次就直接赋值,否则叠加
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
调用具体的解码器
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
如果cumulation 不可读,则释放
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
//已经连续读取discardAfterReads次数 这个时候字节容器中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
channelReadComplete:
是指读取完毕后的操作,如果一次数据读取完毕之后(可能接收端一边收,发送端一边发,这里的读取完毕指的是接收端在某个时间不再接受到数据为止),发现仍然没有拆到一个完整的用户数据包,即使该channel的设置为非自动读取,也会触发一次读取操作 ctx.read(),该操作会重新向selector注册op_read事件,以便于下一次能读到数据之后拼接成一个完整的数据包。
decodeWasNull
来标识本次读取数据是否拆到一个业务数据包,为true 的时候,如果这个channel不是自动read,则调用ctx.read()
insertSinceRecycled
代表当有任何的数据添加或者设置就是true,如果recycle方法被调用了就是false
CodecOutputList
采用类似于threadloacl缓存,对象内部通过数组存储对象,回收的时候通过将数组置空,但是对于CodecOutputList 则保存起来
AbstractNioByteChannel,这个方法是当有数据的时候就去读取
,默认分配1024大小的byteBuf 去读取 然后调用 pipeline.fireChannelRead(byteBuf);最终传到ByteToMessage进行解码。
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
callDecode
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
out如果有值代表,一个业务包被完全解码成功,比如我们传递3250字节,确实收到这么多字节
int outSize = out.size();
if (outSize > 0) {
如果收到一个完整的业务包就传递给其他的channelhandler
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
//进行解码判断,其中关键的就是读取字节长度发现小于预期的长度,就等下一次的解码
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
整个流程:还是不太难的,一般headContext 那边第一次读取数据都是1024读取 后续会增大,然后我们发现能知道我们一个完整的业务包的长度,一直等长度到了 才进行编码,解码出一个完整的业务包我们就发送到下一个channelhandler