Netty-ByteToMessageDecoder

ByteToMessageDecoder是解码的基类

类的属性介绍

1.Cumulator :累加器,有COMPOSITE_CUMULATOR和MERGE_CUMULATOR,默认采用后者,第一个是混合存储相当于把ByteBuf加入到CompositeByteBuf。第二个通过内存拷贝把数据都放入一个ByteBuf里面
2.构造函数:是禁止在channel之间共享一个ByteToMessageDecoder
3.setSingleDecode方法会使得ByteToMessageDecoder的channelRead 每次只会解码一条消息

channelRead方法如下
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
是基于线程的解码结果列表
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
    如果是第一次就直接赋值,否则叠加
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
调用具体的解码器
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
如果cumulation 不可读,则释放
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    //已经连续读取discardAfterReads次数 这个时候字节容器中仍然有未被业务拆包器读取的数据,那就做一次压缩,有效数据段整体移到容器首部
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }
channelReadComplete:

是指读取完毕后的操作,如果一次数据读取完毕之后(可能接收端一边收,发送端一边发,这里的读取完毕指的是接收端在某个时间不再接受到数据为止),发现仍然没有拆到一个完整的用户数据包,即使该channel的设置为非自动读取,也会触发一次读取操作 ctx.read(),该操作会重新向selector注册op_read事件,以便于下一次能读到数据之后拼接成一个完整的数据包。

decodeWasNull

来标识本次读取数据是否拆到一个业务数据包,为true 的时候,如果这个channel不是自动read,则调用ctx.read()

insertSinceRecycled

代表当有任何的数据添加或者设置就是true,如果recycle方法被调用了就是false

CodecOutputList

采用类似于threadloacl缓存,对象内部通过数组存储对象,回收的时候通过将数组置空,但是对于CodecOutputList 则保存起来

AbstractNioByteChannel,这个方法是当有数据的时候就去读取

,默认分配1024大小的byteBuf 去读取 然后调用 pipeline.fireChannelRead(byteBuf);最终传到ByteToMessage进行解码。

  byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
callDecode
 protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            while (in.isReadable()) {
out如果有值代表,一个业务包被完全解码成功,比如我们传递3250字节,确实收到这么多字节
                int outSize = out.size();

                if (outSize > 0) {
如果收到一个完整的业务包就传递给其他的channelhandler
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    if (ctx.isRemoved()) {
                        break;
                    }
                    outSize = 0;
                }

                int oldInputLength = in.readableBytes();
//进行解码判断,其中关键的就是读取字节长度发现小于预期的长度,就等下一次的解码
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                if (ctx.isRemoved()) {
                    break;
                }

                if (outSize == out.size()) {
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

整个流程:还是不太难的,一般headContext 那边第一次读取数据都是1024读取 后续会增大,然后我们发现能知道我们一个完整的业务包的长度,一直等长度到了 才进行编码,解码出一个完整的业务包我们就发送到下一个channelhandler

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容