Netty源码_编解码器

每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。
这种转换逻辑由编解码器处理,编解码器编码器解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。
它们有什么区别呢?

对于 Netty 来说,编码器是针对出站数据的出站处理器,解码器是针对入站数据的入站处理器。

我们知道netty 中的通道channel 都有一个管道 ChannelPipeline ,管道中可以按照顺序添加各种入站处理器或者出站处理器(也可以是这里的编解码器),就可以按照顺序进行数据类型转换。
按照管道 ChannelPipeline中处理器的流向,我们可以知道:

  • 入站处理器流向是从头到尾,而出站处理器流向是从尾到头。
  • 而且入场处理器刚开始是接收到远端传递来的缓存区ByteBuf类型数据;而出站处理器最后是要将应用数据转换成缓存区ByteBuf类型数据发送到远端。

一. 编码器

Netty中提供了两个方便使用的编码器基类:

  • MessageToByteEncoder 将符合条件的类型数据转成缓存区对象ByteBuf
  • MessageToMessageEncoder 将符合条件的一种类型数据转成另一种类型数据。

1.1 MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { ...}

继承自 ChannelOutboundHandlerAdapter 类,表示它是一个出站处理器。

1.1.1 write 方法

要进行入站数据的处理,就要复写 write(ChannelHandlerContext, Object, ChannelPromise) 方法就可以了。

  @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ByteBuf buf = null;
        try {
            // 先判断当前这个编码器 接不接受这个数据 msg
            if (acceptOutboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                // 创建缓存区,preferDirect 表示创建直接缓存区
                buf = allocateBuffer(ctx, cast, preferDirect);
                try {
                    encode(ctx, cast, buf);
                } finally {
                    // 因为数据 msg 已经转成 buf,就用释放 msg 的引用
                    ReferenceCountUtil.release(cast);
                }

                // 判断缓存区有没有数据
                if (buf.isReadable()) {
                    // 缓存区有数据,就传递给下一个 出站处理器
                    ctx.write(buf, promise);
                } else {
                    // 缓存区没有数据,先释放这个缓存区
                    buf.release();
                    // 将空缓存区传递给下一个 出站处理器
                    ctx.write(Unpooled.EMPTY_BUFFER, promise);
                }
                buf = null;
            } else {
                // 如果这个编码器不接收这个类型数据 msg,
                // 那就传递给下一个 出站处理器处理。
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable e) {
            throw new EncoderException(e);
        } finally {
            if (buf != null) {
                buf.release();
            }
        }
    }

方法流程:

  • 先通过 acceptOutboundMessage(msg) 判断当前这个编码器处不处理这个类型数据;如果不处理,那么就调用ctx.write(msg, promise) 传递给下一个出站处理器处理。
  • 通过 allocateBuffer(...) 方法,创建缓存区对象 buf
  • 子类需要实现 encode(ctx, cast, buf) 方法,将 cast 存入缓存区中。

1.1.2 子类必须实现方法

    protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

子类必须实现这个方法,将msg 对象的数据存入缓存区 out 中。

1.1.3 简单实现

 public class IntegerEncoder extends MessageToByteEncoder<Integer> {
            @Override
           public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
                   throws Exception {
               out.writeInt(msg);
           }
       }

Integer 类型数据存到缓存区out中。

1.2 MessageToMessageEncoder

public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {...}

同样继承自 ChannelOutboundHandlerAdapter 类,表示它是一个出站处理器。

1.2.1 write 方法

  @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         // 使用 CodecOutputList ,因为一个类型可以解析成一组另一种类型数据
        CodecOutputList out = null;
        try {
            if (acceptOutboundMessage(msg)) {
                out = CodecOutputList.newInstance();
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    encode(ctx, cast, out);
                } finally {
                    // 释放 cast 引用
                    ReferenceCountUtil.release(cast);
                }

                //
                if (out.isEmpty()) {
                    throw new EncoderException(
                            StringUtil.simpleClassName(this) + " must produce at least one message.");
                }
            } else {
                // 如果当前编码器不接收 msg 类型数据,
                // 交给下一个出站处理器处理
                ctx.write(msg, promise);
            }
        } catch (EncoderException e) {
            throw e;
        } catch (Throwable t) {
            throw new EncoderException(t);
        } finally {
            if (out != null) {
                try {
                    // 因为写操作,要在写完成后,通知 promise;
                    // 因此如果 out 中有多个数据,那么就进行特殊处理,当它们每个都写完之后,才通知promise。
                    final int sizeMinusOne = out.size() - 1;
                    if (sizeMinusOne == 0) {
                        ctx.write(out.getUnsafe(0), promise);
                    } else if (sizeMinusOne > 0) {
                        // Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
                        // See https://github.com/netty/netty/issues/2525
                        if (promise == ctx.voidPromise()) {
                            writeVoidPromise(ctx, out);
                        } else {
                            writePromiseCombiner(ctx, out, promise);
                        }
                    }
                } finally {
                    out.recycle();
                }
            }
        }
    }

也是让子类实现 encode(ctx, cast, out) 方法,来进行数据转换。

但是这里转换的时候,为什么使用 CodecOutputList 对象?

  • 那是因为进行类型转换的时候,有可能是一个数据类型转成另一个数据类型,但是也有可能是一个数据类型转成另一组数据类型,而且还需要平铺发送。
  • 这里的意思是由一个数据类型,转换后发送多个另一种类型的数据。
  • 当然如果你只想发送一个集合类型数据,也是没有关系的,直接把集合对象当一个元素添加到 CodecOutputList 中。

1.2.2 子类必须实现方法

protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

子类必须实现这个方法,将 msg 数据转成另一个类型数据,或者多个数据,添加到 out 中就可以了。

1.2.3 简单实现

 public class IntegerToStringEncoder extends
               MessageToMessageEncoder<Integer> {
  
            @Override
           public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
                   throws Exception {
               out.add(message.toString());
           }
       }

Integer 类型的数据转成字符串类型。

二. 解码器

Netty中提供了多种解码器基类,先说两个最基础的:

  • MessageToMessageDecoder 将符合条件的一种类型数据转成另一种类型数据。
  • ByteToMessageDecoder 将缓存区数据转成另一种类型数据。

2.1 MessageToMessageDecoder

public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter { ... }

继承自ChannelInboundHandlerAdapter 类,表示它是一个入站处理器。

2.1.1 channelRead 方法

入站数据的处理,就是通过 channelRead(ChannelHandlerContext ctx, Object msg) 方法。

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 使用 CodecOutputList ,因为一个类型可以解析成一组另一种类型数据
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            if (acceptInboundMessage(msg)) {
                @SuppressWarnings("unchecked")
                I cast = (I) msg;
                try {
                    // 通过 decode(...) 方法,将 cast 数据解析,
                    // 解析后的数据添加到 CodecOutputList 中
                    decode(ctx, cast, out);
                } finally {
                    // 释放 cast 引用
                    ReferenceCountUtil.release(cast);
                }
            } else {
                // 如果当前解码器不接收 msg 类型数据,
                // 那么就将这个数据添加到 CodecOutputList 中,
                // 会在 finally 方法中将它传递给下一个入站处理器
                out.add(msg);
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            try {
                int size = out.size();
                for (int i = 0; i < size; i++) {
                    // 遍历 CodecOutputList 中所有数据,发送到下一个入站处理器
                    ctx.fireChannelRead(out.getUnsafe(i));
                }
            } finally {
                // 释放 CodecOutputList
                out.recycle();
            }
        }
    }

也是让子类实现 decode(ctx, cast, out) 方法,来进行数据转换。
同样传递的是 CodecOutputList 对象,也就是说允许转成一组数据。

2.1.2 子类必须实现方法

    protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;

子类必须实现这个方法,将 msg 数据转成另一个类型数据,或者多个数据,添加到 out 中就可以了。

2.1.3 简单实现

 public class StringToIntegerDecoder extends
               MessageToMessageDecoder<String> {
  
            @Override
           public void decode(ChannelHandlerContext ctx, String message,
                              List<Object> out) throws Exception {
               out.add(message.length());
           }
       }

2.2 ByteToMessageDecoder

这个类就比较复杂了,主要是因为 TPC 是一个数据流,会将应用层的数据拆成一个一个帧包发送,不会按照应用层的数据格式进行分割。

也是说应用层发送10条数据,有可能被当成一个帧包发送;或者发送一条数据,被拆成几个帧包发送。

因此入站处理器接读取到远端发送来的缓存区数据时,有可能要接收足够多的数据之后,才能按照协议格式,解析出想要的数据格式的内容。

所以在 ByteToMessageDecoder 中有一个累加器,如果一次读取的帧包数据,不够解析成应用层数据,那么就累加多个帧包内容,进行解析。

2.2.1 Cumulator

    /**
     * Cumulate {@link ByteBuf}s.
     * 累计缓存区接口
     */
    public interface Cumulator {
      ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
    }
  • cumulation 表示之前的累计缓存区。
  • in 表示当前读取到帧的输入缓存区。
  • 返回累加了输入缓存区后的新缓存区。

默认有两种实现类

  1. MERGE_CUMULATOR

     public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
       @Override
       public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
           if (!cumulation.isReadable() && in.isContiguous()) {
               // If cumulation is empty and input buffer is contiguous, use it directly
               // 如果累计缓存区为空,且输入缓冲区是连续的,则直接使用它
               cumulation.release();
               return in;
           }
           try {
               // 输入缓存区可读字节数
               final int required = in.readableBytes();
               if (required > cumulation.maxWritableBytes() ||
                       (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) ||
                       cumulation.isReadOnly()) {
                   // Expand cumulation (by replacing it) under the following conditions:
                   // - cumulation cannot be resized to accommodate the additional data
                   // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is
                   //   assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe.
                   // 扩展累计缓存区
                   return expandCumulation(alloc, cumulation, in);
               }
               // 将输入缓存区的数据写入到累计缓存区
               cumulation.writeBytes(in, in.readerIndex(), required);
               // 改变输入缓存区的读索引,表示这部分数据已经被读取了。
               in.readerIndex(in.writerIndex());
               return cumulation;
           } finally {
               // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw
               // for whatever release (for example because of OutOfMemoryError)
               in.release();
           }
       }
    };
    

    这个是将输入缓存区in 中的数据直接写入到累计缓存区cumulation 中,涉及到数据的复制,效率低一点。

  2. COMPOSITE_CUMULATOR

     public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() {
        @Override
        public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
            // 如果累计缓存区为空, 那么就直接返回输入缓存区
            if (!cumulation.isReadable()) {
                cumulation.release();
                return in;
            }
            // 累计缓存区是符合缓存区
            CompositeByteBuf composite = null;
            try {
                if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) {
                    composite = (CompositeByteBuf) cumulation;
                    // Writer index must equal capacity if we are going to "write"
                    // new components to the end
                    if (composite.writerIndex() != composite.capacity()) {
                        composite.capacity(composite.writerIndex());
                    }
                } else {
                    composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation);
                }
                // 将输入缓存区添加到累计缓存区
                composite.addFlattenedComponents(true, in);
                in = null;
                return composite;
            } finally {
                if (in != null) {
                    // We must release if the ownership was not transferred as otherwise it may produce a leak
                    in.release();
                    // Also release any new buffer allocated if we're not returning it
                    if (composite != null && composite != cumulation) {
                        composite.release();
                    }
                }
            }
        }
    };
    

    使用组合缓存区 CompositeByteBuf 来组合输入缓存区,这样就不用先将输入缓存区的数据复制到累计缓存区了。

2.2.2 channelRead 方法

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                // 当累计缓存区cumulation 是null 的时候,表示是第一次
                first = cumulation == null;
                // 将接收到缓存区 msg 添加到累计缓存区
                cumulation = cumulator.cumulate(ctx.alloc(),
                        first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
                // 进行解析
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Exception e) {
                throw new DecoderException(e);
            } finally {
                try {
                    if (cumulation != null && !cumulation.isReadable()) {
                        // 如果累计缓存区 cumulation 中没有数据,那么就释放它
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        // 读取多次数据了,那么调用缓存区 cumulation 的 discardSomeReadBytes() 方法,
                        // 尝试已经读取的数据区域,增大可写数据区域。
                        discardSomeReadBytes();
                    }

                    int size = out.size();
                    firedChannelRead |= out.insertSinceRecycled();
                    // 将解析完成的数据,发送到下一个入站处理器
                    fireChannelRead(ctx, out, size);
                } finally {
                    out.recycle();
                }
            }
        } else {
            // 如果数据不是 ByteBuf 类型,交给下一个入站处理器处理
            ctx.fireChannelRead(msg);
        }
    }
  • 通过 cumulator.cumulate(...) 方法,累计输入缓存区数据。
  • 调用 callDecode(ctx, cumulation, out) 方法进行解析。

2.2.3 callDecode 方法

    final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
            throws Exception {
        decodeState = STATE_CALLING_CHILD_DECODE;
        try {
            // 让具体子类来解析数据
            decode(ctx, in, out);
        } finally {
            // STATE_HANDLER_REMOVED_PENDING 在 handlerRemoved(ChannelHandlerContext ctx) 方法中被设置。
            // 也就是当前这个处理器被从管道中移除了,那么不再进行解析操作了
            boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
            decodeState = STATE_INIT;
            if (removePending) {
                fireChannelRead(ctx, out, out.size());
                out.clear();
                handlerRemoved(ctx);
            }
        }
    }

    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        try {
            // 当累计缓存区还有数据,就进行解析
            while (in.isReadable()) {

                final int outSize = out.size();
                // 解析到数据,就发送到下一个入站处理器
                if (outSize > 0) {
                    fireChannelRead(ctx, out, outSize);
                    out.clear();

                    // Check if this handler was removed before continuing with decoding.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/4635
                    // 在继续解码之前,检查此处理程序是否被删除。
                    // 如果它被删除了,那么继续在缓冲区上操作就不安全了。
                    if (ctx.isRemoved()) {
                        break;
                    }
                }

                // 记录解析前,累计缓存区还剩余的可读数据字节数
                int oldInputLength = in.readableBytes();
                decodeRemovalReentryProtection(ctx, in, out);

                // Check if this handler was removed before continuing the loop.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See https://github.com/netty/netty/issues/1664
                // 在继续循环之前,检查该处理程序是否被删除。
                if (ctx.isRemoved()) {
                    break;
                }

                if (out.isEmpty()) {
                    // 没有解析到数据,如果缓存区可读字节数没有变,直接跳出循环
                    if (oldInputLength == in.readableBytes()) {
                        break;
                    } else {
                        // 解析了一点数据,那么就继续循环,读取缓存区数据。
                        continue;
                    }
                }

                if (oldInputLength == in.readableBytes()) {
                    // 如果子类实现的 `decode(...)` 方法,没有消耗累计缓存区数据,
                    // 但是却解析到数据,添加到 out 中,要抛出异常。
                    throw new DecoderException(
                            StringUtil.simpleClassName(getClass()) +
                                    ".decode() did not read anything but decoded a message.");
                }

                // 如果设置只解析一次,那么就跳出循环,默认是false
                if (isSingleDecode()) {
                    break;
                }
            }
        } catch (DecoderException e) {
            throw e;
        } catch (Exception cause) {
            throw new DecoderException(cause);
        }
    }

callDecode 方法逻辑很简单:

  • 调用 decodeRemovalReentryProtection(ctx, in, out) 方法;
  • 进而调用 decode(ctx, in, out) 方法,让子类去解析累计缓存区中的数据;
  • 将解析成功的数据添加到 out 中;
  • 最后将它们发送给下一个入站处理器。

但是我们知道 TCP 流可能会将数据分成几个帧包发送过来,所以当子类实现 decode(ctx, in, out) 进行解析的时候,可能累计缓存区的数据不够。
因此在 callDecode(...) 方法中,针对子类解析数据做了以下判断,主要是看累计缓存区in 数据读取情况和 out 中是否存在解析成功的数据:

  • 如果 out 为空,说明子类的 decode(ctx, in, out)没有成功解析完成数据,分两个情况:
    • 累计缓存区读索引没有变化,说明子类认为当前缓存区数据还不足,那就跳出 callDecode(...) 方法,继续增加累计缓存区数据。
    • 累计缓存区读索引发生变化,说明当前子类解析了一点数据,但是不够拼装成一个应用层数据对象,那就继续循环,再次解析累计缓存区数据。
  • 如果 out 不为空,那么就要检查一下累计缓存区的读索引有没有变化,如果没有变成,就要抛出异常。这个是子类实现 callDecode(...) 方法错误,它解析成功了数据,但是没有消费累计缓存区的数据,这是不允许的。

2.2.4 decode 方法

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception

这个是子类必须实现,用来解析累计缓存区数据的方法。

从上面的分析可以知道,子类实现这个方法时,还是有点要求的:

  • 如果你解析成功数据,将它们添加到 out,记住一定要改变缓存区索引。
  • 如果你能解析部分数据,但是不够拼接成完成对象,添加到out;这时你可以选择改变缓存区读索引,或者不改变。

2.2.5 decodeLast 方法

    protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if (in.isReadable()) {
            // Only call decode() if there is something left in the buffer to decode.
            // See https://github.com/netty/netty/issues/4386
            
            // 当缓存区有数据的时候,调用 decode() 进行解析
            decodeRemovalReentryProtection(ctx, in, out);
        }
    }
  • 当通道由活跃变成不活跃时调用,也就是由 channelInactive(ChannelHandlerContext) 方法触发。
  • 让使用者可以在这种情况下,做特殊处理。默认情况下,只是调用了 decode(...) 方法。

2.2.6 简单实现

 public class SquareDecoder extends ByteToMessageDecoder {
            @Override
           public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
                   throws Exception {
               out.add(in.readBytes(in.readableBytes()));
           }
       }

这里只是简单地将累计缓存区中数据读取变成一个 ByteBuf 发送给下一个入站处理器。没有太多实际意义。

2.2.7 重要子类

Netty 中为我们提供了几个比较方便我们使用的ByteToMessageDecoder 子类:

名字 定义
FixedLengthFrameDecoder 固定长度分割数据的解码器
LineBasedFrameDecoder 使用换行符 \n\r\n 分割数据的解码器
DelimiterBasedFrameDecoder 自定义多种分隔符的解码器
LengthFieldBasedFrameDecoder 根据消息中长度字段的值动态分割数据的解码器

三. FixedLengthFrameDecoder

3.1 介绍

这个是 ByteToMessageDecoder 最简单的实现,简单地使用固定长度分割数据。
例如我们接收到的数据报文如下:

   +---+----+------+----+
   | A | BC | DEFG | HI |
   +---+----+------+----+

如果我们使用一个 FixedLengthFrameDecoder(3) 解码器去解析,那么就会解码成以下三个固定长度的数据包:

   +-----+-----+-----+
   | ABC | DEF | GHI |
   +-----+-----+-----+

3.2 具体实现

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 通过 decode(ctx, in) 方法解析数据,
        // 如果缓存区in中数据不够,那么这个方法返回 null,且不会改变缓存区读索引
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            // 解析完成数据对象,就添加到 out 中
            out.add(decoded);
        }
    }

    protected Object decode(
            @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        // 如果缓存区可读数据不够帧长度,那么返回null
        if (in.readableBytes() < frameLength) {
            return null;
        } else {
            return in.readRetainedSlice(frameLength);
        }
    }

这个实现非常简单,当缓存区可读数据足够帧长度,就解析成数据片段。

四. LineBasedFrameDecoder

4.1 介绍

使用换行符 \n\r\n 分割数据,但是为了防止在传输过程丢失一些数据,比如丢失了换行符,导致大量数据积累在解析器中,没办法进行分割。
因此使用LineBasedFrameDecoder时,必须设置最大帧长度maxLength

如果超过最大帧长度maxLength,仍然没有读取到换行符,那么就丢弃这些数据。

4.2 具体实现

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 通过 decode(ctx, in) 方法解析数据,
        // 如果缓存区in中数据不够,那么这个方法返回 null,且不会改变缓存区读索引
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            // 解析完成数据对象,就添加到 out 中
            out.add(decoded);
        }
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        // 找到行尾`\n` 的索引
        final int eol = findEndOfLine(buffer);
        // 是否需要丢弃
        if (!discarding) {
            // 还没有到maxLength, 需要进行解析
            if (eol >= 0) {
                // eol >= 0 表示找了行尾,那么就需要将行数据取出
                final ByteBuf frame;
                // 得到行的大小 length
                final int length = eol - buffer.readerIndex();
                // 使用 `\r\n` 结尾还是 '\n' 结尾
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

                if (length > maxLength) {
                    // 超过最大值 maxLength,设置缓存区读索引
                    buffer.readerIndex(eol + delimLength);
                    // 抛出异常
                    fail(ctx, length);
                    return null;
                }

                // stripDelimiter 表示解析出来的数据是否要截断行尾
                if (stripDelimiter) {
                    frame = buffer.readRetainedSlice(length);
                    buffer.skipBytes(delimLength);
                } else {
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                return frame;
            } else {
                // 没找到行尾字符,就要判断目前缓存区可读字节数是否超过最大值 maxLength
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    // 超过最大值,就需要丢弃数据了。
                    discardedBytes = length;
                    // 设置缓存区读索引,表示已经读取数据了
                    buffer.readerIndex(buffer.writerIndex());
                    discarding = true;
                    offset = 0;
                    if (failFast) {
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                return null;
            }
        } else {
            // 表示已经超过 maxLength,那么不用解析了,直接丢弃
            if (eol >= 0) {
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                // 设置缓存区读索引,表示已经读取数据了
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            } else {
                discardedBytes += buffer.readableBytes();
                // 设置缓存区读索引,表示已经读取数据了
                buffer.readerIndex(buffer.writerIndex());
                // We skip everything in the buffer, we need to set the offset to 0 again.
                offset = 0;
            }
            return null;
        }
    }
  • 通过 findEndOfLine(buffer) 方法从缓存区中寻找换行符的索引。
  • 没找到换行符,判断是否超过最大帧,没有超过,返回 null, 继续累计缓存区数据。
  • 通过 failFast 属性,来决定超过最大帧时,是直接抛出异常 TooLongFrameException,还是等待找到换行符之后才抛出异常 TooLongFrameException
  • 通过 stripDelimiter 属性,来决定返回数据是否要截断换行符。

五. DelimiterBasedFrameDecoder

5.1 介绍

自定义多种分隔符的解码器。

其实和 LineBasedFrameDecoder 逻辑很像,只不过它可以自定义分隔符,而不只是换行符,而且可以定义多种分隔符。
当缓冲区中发现多个分隔符,则LineBasedFrameDecoder 会选择产生最短帧的分隔符进行分割。

5.2 具体实现

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        // 从缓存区中解析数据
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        // 如果只是以行尾作为分割符,那么就使用 lineBasedDecoder 来解析
        if (lineBasedDecoder != null) {
            return lineBasedDecoder.decode(ctx, buffer);
        }
        // Try all delimiters and choose the delimiter which yields the shortest frame.
        // 最短帧的索引
        int minFrameLength = Integer.MAX_VALUE;
        // 最短帧的分隔符
        ByteBuf minDelim = null;
        // 通过循环,尝试所有的分隔符,并选择产生最短帧的分隔符。
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            if (frameLength >= 0 && frameLength < minFrameLength) {
                minFrameLength = frameLength;
                minDelim = delim;
            }
        }

        if (minDelim != null) {
            // 最短帧的长度
            int minDelimLength = minDelim.capacity();
            ByteBuf frame;

            if (discardingTooLongFrame) {
                // 因为 discardingTooLongFrame 为true,我们要丢弃这个超大帧
                discardingTooLongFrame = false;
                // 设置缓存区已经读取这些数据
                buffer.skipBytes(minFrameLength + minDelimLength);

                int tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }

            if (minFrameLength > maxFrameLength) {
                // 如果超过最大值 maxFrameLength,丢弃这帧
                // 设置缓存区已经读取这些数据
                buffer.skipBytes(minFrameLength + minDelimLength);
                fail(minFrameLength);
                return null;
            }

            if (stripDelimiter) {
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }

            return frame;
        } else {
            if (!discardingTooLongFrame) {
                if (buffer.readableBytes() > maxFrameLength) {
                    // Discard the content of the buffer until a delimiter is found.

                    //记录超过最大值的帧长度
                    tooLongFrameLength = buffer.readableBytes();
                    // 设置缓存区,表示已经读取这些数据
                    buffer.skipBytes(buffer.readableBytes());
                    discardingTooLongFrame = true;
                    if (failFast) {
                        // 如果 failFast 为true,直接抛出异常,
                        // 否则这个超过最长帧异常,要到找到分隔符的时候,才会抛出
                        fail(tooLongFrameLength);
                    }
                }
            } else {
                // 仍然丢弃缓冲区内容,因为没有找到分隔符。
                // 因为 failFast 是 false,只会在找到分隔符的时候,才会抛出异常
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }
  • 先遍历所有的分隔符,找到产生最短帧的分隔符。
  • 如果没有找到,就要考虑最大帧情况了。
  • 通过 failFast 属性,来决定超过最大帧时,是直接抛出异常 TooLongFrameException,还是等待找到分隔符之后才抛出异常 TooLongFrameException
  • 通过 stripDelimiter 属性,来决定返回数据是否要截断分隔符。

六. LengthFieldBasedFrameDecoder

6.1 介绍

在我们进行数据传输的时候,经常将数据分成两个部分,头和内容体。头中有字段指定整个数据的大小,这样就可以解析完成的数据了。
LengthFieldBasedFrameDecoder 就是为了实现这样的功能。
但是我们会遇到下面情况:

  • 表示长度字段中存储的数据大小,可能是表示整个数据的大小,也有可能仅仅是表示内容体的大小。
  • 头中还有其他字段,长度字段不一定在开头位置。
  • 有可能只需要内容体的数据,要把头数据截取掉。

针对上面的情况,LengthFieldBasedFrameDecoder 中提供了几个非常重要的属性:

    // 长度字段开始偏移量
    private final int lengthFieldOffset;
    // 长度字段的长度,1表示UnsignedByte 2表示UnsignedShort
    // 3表示UnsignedMedium 4表示UnsignedInt 8表示UnsignedLong
    private final int lengthFieldLength;
    // 长度字段结束偏移量,就是 lengthFieldOffset + lengthFieldLength
    private final int lengthFieldEndOffset;
    // 调整量
    private final int lengthAdjustment;
    // 截断开始一部分数据,即跳过 initialBytesToStrip 个字节数据
    private final int initialBytesToStrip;
  1. lengthFieldOffset 表示长度字段开始偏移量。
  2. lengthFieldLength 表示长度字段的长度。

    1表示UnsignedByte, 2表示UnsignedShort, 3表示UnsignedMedium, 4表示UnsignedInt, 8表示UnsignedLong

  3. lengthFieldEndOffset 表示长度字段结尾偏移量。

    就是 lengthFieldOffset + lengthFieldLength 结果值。

  4. lengthAdjustment 表示调整量。

    要读取帧的长度,是由 lengthFieldLength 得到帧长度frameLength 再加上长度字段结尾偏移量lengthFieldEndOffset,最后加上这个调整量lengthAdjustment,即 frameLength += lengthAdjustment + lengthFieldEndOffset

  5. initialBytesToStrip 截断开始一部分数据。

6.2 具体实现

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (discardingTooLongFrame) {
            // 需要丢弃帧
            discardingTooLongFrame(in);
        }

        if (in.readableBytes() < lengthFieldEndOffset) {
            // 缓存区的可读内容还不够读取帧长度字段,
            // 那么直接返回 null,继续读取
            return null;
        }

        // 计算帧长度字段的索引 actualLengthFieldOffset
        int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
        // 通过 get 方法获取帧长度,不改变缓存区读索引
        long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);

        if (frameLength < 0) {
            // 帧长度不可能小于 0
            failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
        }

        // 通过 lengthAdjustment 和 lengthFieldEndOffset
        // 来调整最后要读取帧内容的大小。
        // 因为帧长度 frameLength 由用户自定义,有可能包含头长度,有可能不包含,
        // 所以需要灵活调整
        frameLength += lengthAdjustment + lengthFieldEndOffset;

        if (frameLength < lengthFieldEndOffset) {
            failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
        }

        if (frameLength > maxFrameLength) {
            // 超过最大帧内容,需要进行特殊处理,不用读取帧内容了。
            exceededFrameLength(in, frameLength);
            return null;
        }

        // 永远不会溢出,因为它小于maxFrameLength
        int frameLengthInt = (int) frameLength;
        if (in.readableBytes() < frameLengthInt) {
            // 当缓存区可读数据字节数小于 frameLengthInt,
            // 表示还没有获取到足够数据,那么就返回 null,继续让缓存区收集数据。
            return null;
        }

        if (initialBytesToStrip > frameLengthInt) {
            // 跳过的字节数超过 帧大小,这个需要抛出异常
            failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
        }
        // 跳过 initialBytesToStrip 个字节, initialBytesToStrip 的值只会大于或等于0
        in.skipBytes(initialBytesToStrip);

        // extract frame
        int readerIndex = in.readerIndex();
        // 减去跳过的字节数 initialBytesToStrip, 就是最后要获取 帧的内容大小
        int actualFrameLength = frameLengthInt - initialBytesToStrip;
        // 获取内容帧数据
        ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
        // 设置缓存区读索引,表示已经读取数据了
        in.readerIndex(readerIndex + actualFrameLength);
        return frame;
    }

总的处理流程:

  • 先通过 getUnadjustedFrameLength(...) 方法,读取 lengthFieldLength 代表的帧长度frameLength

    这个帧长度可能是整个数据的长度,也可能只是内容体的长度,由用户自定义。

  • frameLength += lengthAdjustment + lengthFieldEndOffset 来灵活调整帧长度。

    通过 lengthAdjustment 来调整帧长度。

  • 通过 in.skipBytes(initialBytesToStrip) 方法跳过一部分内容。

6.3 例子

6.3.1 头只有长度字段

   lengthFieldOffset   = 0
   lengthFieldLength   = 2
   lengthAdjustment    = 0
   initialBytesToStrip = 0 (= do not strip header)
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
   +--------+----------------+      +--------+----------------+
   | Length | Actual Content |----->| Length | Actual Content |
   | 0x000C | "HELLO, WORLD" |      | 0x000C | "HELLO, WORLD" |
   +--------+----------------+      +--------+----------------+
  • lengthFieldLength 代表的长度是12(0x000C)。
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 14lengthAdjustment = 0lengthFieldEndOffset = 2
  • initialBytesToStrip = 0,那么最终读取就是 14 个字节数据。
   lengthFieldOffset   = 0
   lengthFieldLength   = 2
   lengthAdjustment    = 0
   initialBytesToStrip = 2 (= the length of the Length field)
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (12 bytes)
   +--------+----------------+      +----------------+
   | Length | Actual Content |----->| Actual Content |
   | 0x000C | "HELLO, WORLD" |      | "HELLO, WORLD" |
   +--------+----------------+      +----------------+
  • lengthFieldLength 代表的长度是12(0x000C)。
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 14lengthAdjustment = 0lengthFieldEndOffset = 2
  • initialBytesToStrip = 2,那么最终读取就是跳过开头两个字节的 12 个字节数据。
   lengthFieldOffset   =  0
   lengthFieldLength   =  2
   lengthAdjustment    = -2 (= the length of the Length field)
   initialBytesToStrip =  0
  
   BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
   +--------+----------------+      +--------+----------------+
   | Length | Actual Content |----->| Length | Actual Content |
   | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
   +--------+----------------+      +--------+----------------+
  • lengthFieldLength 代表的长度是14(0x000E)
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 14lengthAdjustment = -2lengthFieldEndOffset = 2。通过 lengthAdjustment 来调整帧长度。
  • initialBytesToStrip = 0,那么最终读取就是 14 个字节数据。

6.3.2 头中有两个字段

   lengthFieldOffset   = 2 (= the length of Header 1)
   lengthFieldLength   = 3
   lengthAdjustment    = 0
   initialBytesToStrip = 0
  
   BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
   +----------+----------+----------------+      +----------+----------+----------------+
   | Header 1 |  Length  | Actual Content |----->| Header 1 |  Length  | Actual Content |
   |  0xCAFE  | 0x00000C | "HELLO, WORLD" |      |  0xCAFE  | 0x00000C | "HELLO, WORLD" |
   +----------+----------+----------------+      +----------+----------+----------------+
  • lengthFieldLength 代表的长度是12(0x000C)
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 17, lengthAdjustment = 0lengthFieldEndOffset = 5
  • initialBytesToStrip = 0,那么最终读取就是 17 个字节数据。
   lengthFieldOffset   = 0
   lengthFieldLength   = 3
   lengthAdjustment    = 2 (= the length of Header 1)
   initialBytesToStrip = 0
  
   BEFORE DECODE (17 bytes)                      AFTER DECODE (17 bytes)
   +----------+----------+----------------+      +----------+----------+----------------+
   |  Length  | Header 1 | Actual Content |----->|  Length  | Header 1 | Actual Content |
   | 0x00000C |  0xCAFE  | "HELLO, WORLD" |      | 0x00000C |  0xCAFE  | "HELLO, WORLD" |
   +----------+----------+----------------+      +----------+----------+----------------+
  • lengthFieldLength 代表的长度是12(0x000C)
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 17, lengthAdjustment = 2lengthFieldEndOffset = 3
  • initialBytesToStrip = 0,那么最终读取就是 17 个字节数据。

6.3.3 头中有多个字段

   lengthFieldOffset   = 1 (= the length of HDR1)
   lengthFieldLength   = 2
   lengthAdjustment    = 1 (= the length of HDR2)
   initialBytesToStrip = 3 (= the length of HDR1 + LEN)
  
   BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
   +------+--------+------+----------------+      +------+----------------+
   | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
   | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
   +------+--------+------+----------------+      +------+----------------+
  • lengthFieldLength 代表的长度是12(0x000C)
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 16, lengthAdjustment = 1lengthFieldEndOffset = 3
  • initialBytesToStrip = 3,那么最终读取就是跳过开头三个字节的 13 个字节数据。
   lengthFieldOffset   =  1
   lengthFieldLength   =  2
   lengthAdjustment    = -3 (= the length of HDR1 + LEN, negative)
   initialBytesToStrip =  3
  
   BEFORE DECODE (16 bytes)                       AFTER DECODE (13 bytes)
   +------+--------+------+----------------+      +------+----------------+
   | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
   | 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" |      | 0xFE | "HELLO, WORLD" |
   +------+--------+------+----------------+      +------+----------------+
  • lengthFieldLength 代表的长度是16(0x0010)
  • frameLength += lengthAdjustment + lengthFieldEndOffset 的值就是 16, lengthAdjustment = -3lengthFieldEndOffset = 3
  • initialBytesToStrip = 3,那么最终读取就是跳过开头三个字节的 13 个字节数据。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352