每个网络应用程序都必须定义如何解析在两个节点之间来回传输的原始字节,以及如何将其和目标应用程序的数据格式做相互转换。
这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式。
它们有什么区别呢?
对于
Netty
来说,编码器是针对出站数据的出站处理器,解码器是针对入站数据的入站处理器。
我们知道netty
中的通道channel
都有一个管道 ChannelPipeline
,管道中可以按照顺序添加各种入站处理器或者出站处理器(也可以是这里的编解码器),就可以按照顺序进行数据类型转换。
按照管道 ChannelPipeline
中处理器的流向,我们可以知道:
- 入站处理器流向是从头到尾,而出站处理器流向是从尾到头。
- 而且入场处理器刚开始是接收到远端传递来的缓存区
ByteBuf
类型数据;而出站处理器最后是要将应用数据转换成缓存区ByteBuf
类型数据发送到远端。
一. 编码器
在Netty
中提供了两个方便使用的编码器基类:
-
MessageToByteEncoder
将符合条件的类型数据转成缓存区对象ByteBuf
。 -
MessageToMessageEncoder
将符合条件的一种类型数据转成另一种类型数据。
1.1 MessageToByteEncoder
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter { ...}
继承自 ChannelOutboundHandlerAdapter
类,表示它是一个出站处理器。
1.1.1 write
方法
要进行入站数据的处理,就要复写 write(ChannelHandlerContext, Object, ChannelPromise)
方法就可以了。
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
// 先判断当前这个编码器 接不接受这个数据 msg
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
// 创建缓存区,preferDirect 表示创建直接缓存区
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
// 因为数据 msg 已经转成 buf,就用释放 msg 的引用
ReferenceCountUtil.release(cast);
}
// 判断缓存区有没有数据
if (buf.isReadable()) {
// 缓存区有数据,就传递给下一个 出站处理器
ctx.write(buf, promise);
} else {
// 缓存区没有数据,先释放这个缓存区
buf.release();
// 将空缓存区传递给下一个 出站处理器
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
// 如果这个编码器不接收这个类型数据 msg,
// 那就传递给下一个 出站处理器处理。
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
方法流程:
- 先通过
acceptOutboundMessage(msg)
判断当前这个编码器处不处理这个类型数据;如果不处理,那么就调用ctx.write(msg, promise)
传递给下一个出站处理器处理。 - 通过
allocateBuffer(...)
方法,创建缓存区对象buf
。 - 子类需要实现
encode(ctx, cast, buf)
方法,将cast
存入缓存区中。
1.1.2 子类必须实现方法
protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;
子类必须实现这个方法,将
msg
对象的数据存入缓存区out
中。
1.1.3 简单实现
public class IntegerEncoder extends MessageToByteEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out)
throws Exception {
out.writeInt(msg);
}
}
将
Integer
类型数据存到缓存区out
中。
1.2 MessageToMessageEncoder
public abstract class MessageToMessageEncoder<I> extends ChannelOutboundHandlerAdapter {...}
同样继承自 ChannelOutboundHandlerAdapter
类,表示它是一个出站处理器。
1.2.1 write
方法
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
// 使用 CodecOutputList ,因为一个类型可以解析成一组另一种类型数据
CodecOutputList out = null;
try {
if (acceptOutboundMessage(msg)) {
out = CodecOutputList.newInstance();
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
encode(ctx, cast, out);
} finally {
// 释放 cast 引用
ReferenceCountUtil.release(cast);
}
//
if (out.isEmpty()) {
throw new EncoderException(
StringUtil.simpleClassName(this) + " must produce at least one message.");
}
} else {
// 如果当前编码器不接收 msg 类型数据,
// 交给下一个出站处理器处理
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable t) {
throw new EncoderException(t);
} finally {
if (out != null) {
try {
// 因为写操作,要在写完成后,通知 promise;
// 因此如果 out 中有多个数据,那么就进行特殊处理,当它们每个都写完之后,才通知promise。
final int sizeMinusOne = out.size() - 1;
if (sizeMinusOne == 0) {
ctx.write(out.getUnsafe(0), promise);
} else if (sizeMinusOne > 0) {
// Check if we can use a voidPromise for our extra writes to reduce GC-Pressure
// See https://github.com/netty/netty/issues/2525
if (promise == ctx.voidPromise()) {
writeVoidPromise(ctx, out);
} else {
writePromiseCombiner(ctx, out, promise);
}
}
} finally {
out.recycle();
}
}
}
}
也是让子类实现
encode(ctx, cast, out)
方法,来进行数据转换。
但是这里转换的时候,为什么使用 CodecOutputList
对象?
- 那是因为进行类型转换的时候,有可能是一个数据类型转成另一个数据类型,但是也有可能是一个数据类型转成另一组数据类型,而且还需要平铺发送。
- 这里的意思是由一个数据类型,转换后发送多个另一种类型的数据。
- 当然如果你只想发送一个集合类型数据,也是没有关系的,直接把集合对象当一个元素添加到
CodecOutputList
中。
1.2.2 子类必须实现方法
protected abstract void encode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
子类必须实现这个方法,将
msg
数据转成另一个类型数据,或者多个数据,添加到out
中就可以了。
1.2.3 简单实现
public class IntegerToStringEncoder extends
MessageToMessageEncoder<Integer> {
@Override
public void encode(ChannelHandlerContext ctx, Integer message, List<Object> out)
throws Exception {
out.add(message.toString());
}
}
将
Integer
类型的数据转成字符串类型。
二. 解码器
在Netty
中提供了多种解码器基类,先说两个最基础的:
-
MessageToMessageDecoder
将符合条件的一种类型数据转成另一种类型数据。 -
ByteToMessageDecoder
将缓存区数据转成另一种类型数据。
2.1 MessageToMessageDecoder
public abstract class MessageToMessageDecoder<I> extends ChannelInboundHandlerAdapter { ... }
继承自ChannelInboundHandlerAdapter
类,表示它是一个入站处理器。
2.1.1 channelRead
方法
入站数据的处理,就是通过 channelRead(ChannelHandlerContext ctx, Object msg)
方法。
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 使用 CodecOutputList ,因为一个类型可以解析成一组另一种类型数据
CodecOutputList out = CodecOutputList.newInstance();
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
try {
// 通过 decode(...) 方法,将 cast 数据解析,
// 解析后的数据添加到 CodecOutputList 中
decode(ctx, cast, out);
} finally {
// 释放 cast 引用
ReferenceCountUtil.release(cast);
}
} else {
// 如果当前解码器不接收 msg 类型数据,
// 那么就将这个数据添加到 CodecOutputList 中,
// 会在 finally 方法中将它传递给下一个入站处理器
out.add(msg);
}
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
int size = out.size();
for (int i = 0; i < size; i++) {
// 遍历 CodecOutputList 中所有数据,发送到下一个入站处理器
ctx.fireChannelRead(out.getUnsafe(i));
}
} finally {
// 释放 CodecOutputList
out.recycle();
}
}
}
也是让子类实现
decode(ctx, cast, out)
方法,来进行数据转换。
同样传递的是CodecOutputList
对象,也就是说允许转成一组数据。
2.1.2 子类必须实现方法
protected abstract void decode(ChannelHandlerContext ctx, I msg, List<Object> out) throws Exception;
子类必须实现这个方法,将
msg
数据转成另一个类型数据,或者多个数据,添加到out
中就可以了。
2.1.3 简单实现
public class StringToIntegerDecoder extends
MessageToMessageDecoder<String> {
@Override
public void decode(ChannelHandlerContext ctx, String message,
List<Object> out) throws Exception {
out.add(message.length());
}
}
2.2 ByteToMessageDecoder
这个类就比较复杂了,主要是因为 TPC
是一个数据流,会将应用层的数据拆成一个一个帧包发送,不会按照应用层的数据格式进行分割。
也是说应用层发送
10
条数据,有可能被当成一个帧包发送;或者发送一条数据,被拆成几个帧包发送。
因此入站处理器接读取到远端发送来的缓存区数据时,有可能要接收足够多的数据之后,才能按照协议格式,解析出想要的数据格式的内容。
所以在
ByteToMessageDecoder
中有一个累加器,如果一次读取的帧包数据,不够解析成应用层数据,那么就累加多个帧包内容,进行解析。
2.2.1 Cumulator
/**
* Cumulate {@link ByteBuf}s.
* 累计缓存区接口
*/
public interface Cumulator {
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
}
-
cumulation
表示之前的累计缓存区。 -
in
表示当前读取到帧的输入缓存区。 - 返回累加了输入缓存区后的新缓存区。
默认有两种实现类
-
MERGE_CUMULATOR
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { if (!cumulation.isReadable() && in.isContiguous()) { // If cumulation is empty and input buffer is contiguous, use it directly // 如果累计缓存区为空,且输入缓冲区是连续的,则直接使用它 cumulation.release(); return in; } try { // 输入缓存区可读字节数 final int required = in.readableBytes(); if (required > cumulation.maxWritableBytes() || (required > cumulation.maxFastWritableBytes() && cumulation.refCnt() > 1) || cumulation.isReadOnly()) { // Expand cumulation (by replacing it) under the following conditions: // - cumulation cannot be resized to accommodate the additional data // - cumulation can be expanded with a reallocation operation to accommodate but the buffer is // assumed to be shared (e.g. refCnt() > 1) and the reallocation may not be safe. // 扩展累计缓存区 return expandCumulation(alloc, cumulation, in); } // 将输入缓存区的数据写入到累计缓存区 cumulation.writeBytes(in, in.readerIndex(), required); // 改变输入缓存区的读索引,表示这部分数据已经被读取了。 in.readerIndex(in.writerIndex()); return cumulation; } finally { // We must release in in all cases as otherwise it may produce a leak if writeBytes(...) throw // for whatever release (for example because of OutOfMemoryError) in.release(); } } };
这个是将输入缓存区
in
中的数据直接写入到累计缓存区cumulation
中,涉及到数据的复制,效率低一点。 -
COMPOSITE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { // 如果累计缓存区为空, 那么就直接返回输入缓存区 if (!cumulation.isReadable()) { cumulation.release(); return in; } // 累计缓存区是符合缓存区 CompositeByteBuf composite = null; try { if (cumulation instanceof CompositeByteBuf && cumulation.refCnt() == 1) { composite = (CompositeByteBuf) cumulation; // Writer index must equal capacity if we are going to "write" // new components to the end if (composite.writerIndex() != composite.capacity()) { composite.capacity(composite.writerIndex()); } } else { composite = alloc.compositeBuffer(Integer.MAX_VALUE).addFlattenedComponents(true, cumulation); } // 将输入缓存区添加到累计缓存区 composite.addFlattenedComponents(true, in); in = null; return composite; } finally { if (in != null) { // We must release if the ownership was not transferred as otherwise it may produce a leak in.release(); // Also release any new buffer allocated if we're not returning it if (composite != null && composite != cumulation) { composite.release(); } } } } };
使用组合缓存区
CompositeByteBuf
来组合输入缓存区,这样就不用先将输入缓存区的数据复制到累计缓存区了。
2.2.2 channelRead
方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// 当累计缓存区cumulation 是null 的时候,表示是第一次
first = cumulation == null;
// 将接收到缓存区 msg 添加到累计缓存区
cumulation = cumulator.cumulate(ctx.alloc(),
first ? Unpooled.EMPTY_BUFFER : cumulation, (ByteBuf) msg);
// 进行解析
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
try {
if (cumulation != null && !cumulation.isReadable()) {
// 如果累计缓存区 cumulation 中没有数据,那么就释放它
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++numReads >= discardAfterReads) {
// We did enough reads already try to discard some bytes so we not risk to see a OOME.
// See https://github.com/netty/netty/issues/4275
numReads = 0;
// 读取多次数据了,那么调用缓存区 cumulation 的 discardSomeReadBytes() 方法,
// 尝试已经读取的数据区域,增大可写数据区域。
discardSomeReadBytes();
}
int size = out.size();
firedChannelRead |= out.insertSinceRecycled();
// 将解析完成的数据,发送到下一个入站处理器
fireChannelRead(ctx, out, size);
} finally {
out.recycle();
}
}
} else {
// 如果数据不是 ByteBuf 类型,交给下一个入站处理器处理
ctx.fireChannelRead(msg);
}
}
- 通过
cumulator.cumulate(...)
方法,累计输入缓存区数据。- 调用
callDecode(ctx, cumulation, out)
方法进行解析。
2.2.3 callDecode
方法
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 让具体子类来解析数据
decode(ctx, in, out);
} finally {
// STATE_HANDLER_REMOVED_PENDING 在 handlerRemoved(ChannelHandlerContext ctx) 方法中被设置。
// 也就是当前这个处理器被从管道中移除了,那么不再进行解析操作了
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
fireChannelRead(ctx, out, out.size());
out.clear();
handlerRemoved(ctx);
}
}
}
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 当累计缓存区还有数据,就进行解析
while (in.isReadable()) {
final int outSize = out.size();
// 解析到数据,就发送到下一个入站处理器
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
// 在继续解码之前,检查此处理程序是否被删除。
// 如果它被删除了,那么继续在缓冲区上操作就不安全了。
if (ctx.isRemoved()) {
break;
}
}
// 记录解析前,累计缓存区还剩余的可读数据字节数
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
// 在继续循环之前,检查该处理程序是否被删除。
if (ctx.isRemoved()) {
break;
}
if (out.isEmpty()) {
// 没有解析到数据,如果缓存区可读字节数没有变,直接跳出循环
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 解析了一点数据,那么就继续循环,读取缓存区数据。
continue;
}
}
if (oldInputLength == in.readableBytes()) {
// 如果子类实现的 `decode(...)` 方法,没有消耗累计缓存区数据,
// 但是却解析到数据,添加到 out 中,要抛出异常。
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
// 如果设置只解析一次,那么就跳出循环,默认是false
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
callDecode
方法逻辑很简单:
- 调用
decodeRemovalReentryProtection(ctx, in, out)
方法;- 进而调用
decode(ctx, in, out)
方法,让子类去解析累计缓存区中的数据;- 将解析成功的数据添加到
out
中;- 最后将它们发送给下一个入站处理器。
但是我们知道 TCP
流可能会将数据分成几个帧包发送过来,所以当子类实现 decode(ctx, in, out)
进行解析的时候,可能累计缓存区的数据不够。
因此在 callDecode(...)
方法中,针对子类解析数据做了以下判断,主要是看累计缓存区in
数据读取情况和 out
中是否存在解析成功的数据:
- 如果
out
为空,说明子类的decode(ctx, in, out)
没有成功解析完成数据,分两个情况:
- 累计缓存区读索引没有变化,说明子类认为当前缓存区数据还不足,那就跳出
callDecode(...)
方法,继续增加累计缓存区数据。- 累计缓存区读索引发生变化,说明当前子类解析了一点数据,但是不够拼装成一个应用层数据对象,那就继续循环,再次解析累计缓存区数据。
- 如果
out
不为空,那么就要检查一下累计缓存区的读索引有没有变化,如果没有变成,就要抛出异常。这个是子类实现callDecode(...)
方法错误,它解析成功了数据,但是没有消费累计缓存区的数据,这是不允许的。
2.2.4 decode
方法
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception
这个是子类必须实现,用来解析累计缓存区数据的方法。
从上面的分析可以知道,子类实现这个方法时,还是有点要求的:
- 如果你解析成功数据,将它们添加到
out
,记住一定要改变缓存区索引。- 如果你能解析部分数据,但是不够拼接成完成对象,添加到
out
;这时你可以选择改变缓存区读索引,或者不改变。
2.2.5 decodeLast
方法
protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (in.isReadable()) {
// Only call decode() if there is something left in the buffer to decode.
// See https://github.com/netty/netty/issues/4386
// 当缓存区有数据的时候,调用 decode() 进行解析
decodeRemovalReentryProtection(ctx, in, out);
}
}
- 当通道由活跃变成不活跃时调用,也就是由
channelInactive(ChannelHandlerContext)
方法触发。- 让使用者可以在这种情况下,做特殊处理。默认情况下,只是调用了
decode(...)
方法。
2.2.6 简单实现
public class SquareDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
out.add(in.readBytes(in.readableBytes()));
}
}
这里只是简单地将累计缓存区中数据读取变成一个 ByteBuf
发送给下一个入站处理器。没有太多实际意义。
2.2.7 重要子类
在 Netty
中为我们提供了几个比较方便我们使用的ByteToMessageDecoder
子类:
名字 | 定义 |
---|---|
FixedLengthFrameDecoder |
固定长度分割数据的解码器 |
LineBasedFrameDecoder |
使用换行符 \n 或 \r\n 分割数据的解码器 |
DelimiterBasedFrameDecoder |
自定义多种分隔符的解码器 |
LengthFieldBasedFrameDecoder |
根据消息中长度字段的值动态分割数据的解码器 |
三. FixedLengthFrameDecoder
3.1 介绍
这个是 ByteToMessageDecoder
最简单的实现,简单地使用固定长度分割数据。
例如我们接收到的数据报文如下:
+---+----+------+----+
| A | BC | DEFG | HI |
+---+----+------+----+
如果我们使用一个 FixedLengthFrameDecoder(3)
解码器去解析,那么就会解码成以下三个固定长度的数据包:
+-----+-----+-----+
| ABC | DEF | GHI |
+-----+-----+-----+
3.2 具体实现
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 通过 decode(ctx, in) 方法解析数据,
// 如果缓存区in中数据不够,那么这个方法返回 null,且不会改变缓存区读索引
Object decoded = decode(ctx, in);
if (decoded != null) {
// 解析完成数据对象,就添加到 out 中
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
// 如果缓存区可读数据不够帧长度,那么返回null
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readRetainedSlice(frameLength);
}
}
这个实现非常简单,当缓存区可读数据足够帧长度,就解析成数据片段。
四. LineBasedFrameDecoder
4.1 介绍
使用换行符 \n
或 \r\n
分割数据,但是为了防止在传输过程丢失一些数据,比如丢失了换行符,导致大量数据积累在解析器中,没办法进行分割。
因此使用LineBasedFrameDecoder
时,必须设置最大帧长度maxLength
。
如果超过最大帧长度
maxLength
,仍然没有读取到换行符,那么就丢弃这些数据。
4.2 具体实现
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 通过 decode(ctx, in) 方法解析数据,
// 如果缓存区in中数据不够,那么这个方法返回 null,且不会改变缓存区读索引
Object decoded = decode(ctx, in);
if (decoded != null) {
// 解析完成数据对象,就添加到 out 中
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 找到行尾`\n` 的索引
final int eol = findEndOfLine(buffer);
// 是否需要丢弃
if (!discarding) {
// 还没有到maxLength, 需要进行解析
if (eol >= 0) {
// eol >= 0 表示找了行尾,那么就需要将行数据取出
final ByteBuf frame;
// 得到行的大小 length
final int length = eol - buffer.readerIndex();
// 使用 `\r\n` 结尾还是 '\n' 结尾
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
if (length > maxLength) {
// 超过最大值 maxLength,设置缓存区读索引
buffer.readerIndex(eol + delimLength);
// 抛出异常
fail(ctx, length);
return null;
}
// stripDelimiter 表示解析出来的数据是否要截断行尾
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
// 没找到行尾字符,就要判断目前缓存区可读字节数是否超过最大值 maxLength
final int length = buffer.readableBytes();
if (length > maxLength) {
// 超过最大值,就需要丢弃数据了。
discardedBytes = length;
// 设置缓存区读索引,表示已经读取数据了
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
// 表示已经超过 maxLength,那么不用解析了,直接丢弃
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 设置缓存区读索引,表示已经读取数据了
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
discardedBytes += buffer.readableBytes();
// 设置缓存区读索引,表示已经读取数据了
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
- 通过
findEndOfLine(buffer)
方法从缓存区中寻找换行符的索引。- 没找到换行符,判断是否超过最大帧,没有超过,返回
null
, 继续累计缓存区数据。- 通过
failFast
属性,来决定超过最大帧时,是直接抛出异常TooLongFrameException
,还是等待找到换行符之后才抛出异常TooLongFrameException
。- 通过
stripDelimiter
属性,来决定返回数据是否要截断换行符。
五. DelimiterBasedFrameDecoder
5.1 介绍
自定义多种分隔符的解码器。
其实和
LineBasedFrameDecoder
逻辑很像,只不过它可以自定义分隔符,而不只是换行符,而且可以定义多种分隔符。
当缓冲区中发现多个分隔符,则LineBasedFrameDecoder
会选择产生最短帧的分隔符进行分割。
5.2 具体实现
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 从缓存区中解析数据
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 如果只是以行尾作为分割符,那么就使用 lineBasedDecoder 来解析
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
// 最短帧的索引
int minFrameLength = Integer.MAX_VALUE;
// 最短帧的分隔符
ByteBuf minDelim = null;
// 通过循环,尝试所有的分隔符,并选择产生最短帧的分隔符。
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if (minDelim != null) {
// 最短帧的长度
int minDelimLength = minDelim.capacity();
ByteBuf frame;
if (discardingTooLongFrame) {
// 因为 discardingTooLongFrame 为true,我们要丢弃这个超大帧
discardingTooLongFrame = false;
// 设置缓存区已经读取这些数据
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
if (minFrameLength > maxFrameLength) {
// 如果超过最大值 maxFrameLength,丢弃这帧
// 设置缓存区已经读取这些数据
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
//记录超过最大值的帧长度
tooLongFrameLength = buffer.readableBytes();
// 设置缓存区,表示已经读取这些数据
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
// 如果 failFast 为true,直接抛出异常,
// 否则这个超过最长帧异常,要到找到分隔符的时候,才会抛出
fail(tooLongFrameLength);
}
}
} else {
// 仍然丢弃缓冲区内容,因为没有找到分隔符。
// 因为 failFast 是 false,只会在找到分隔符的时候,才会抛出异常
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
- 先遍历所有的分隔符,找到产生最短帧的分隔符。
- 如果没有找到,就要考虑最大帧情况了。
- 通过
failFast
属性,来决定超过最大帧时,是直接抛出异常TooLongFrameException
,还是等待找到分隔符之后才抛出异常TooLongFrameException
。- 通过
stripDelimiter
属性,来决定返回数据是否要截断分隔符。
六. LengthFieldBasedFrameDecoder
6.1 介绍
在我们进行数据传输的时候,经常将数据分成两个部分,头和内容体。头中有字段指定整个数据的大小,这样就可以解析完成的数据了。
而 LengthFieldBasedFrameDecoder
就是为了实现这样的功能。
但是我们会遇到下面情况:
- 表示长度字段中存储的数据大小,可能是表示整个数据的大小,也有可能仅仅是表示内容体的大小。
- 头中还有其他字段,长度字段不一定在开头位置。
- 有可能只需要内容体的数据,要把头数据截取掉。
针对上面的情况,LengthFieldBasedFrameDecoder
中提供了几个非常重要的属性:
// 长度字段开始偏移量
private final int lengthFieldOffset;
// 长度字段的长度,1表示UnsignedByte 2表示UnsignedShort
// 3表示UnsignedMedium 4表示UnsignedInt 8表示UnsignedLong
private final int lengthFieldLength;
// 长度字段结束偏移量,就是 lengthFieldOffset + lengthFieldLength
private final int lengthFieldEndOffset;
// 调整量
private final int lengthAdjustment;
// 截断开始一部分数据,即跳过 initialBytesToStrip 个字节数据
private final int initialBytesToStrip;
-
lengthFieldOffset
表示长度字段开始偏移量。 -
lengthFieldLength
表示长度字段的长度。1
表示UnsignedByte
,2
表示UnsignedShort
,3
表示UnsignedMedium
,4
表示UnsignedInt
,8
表示UnsignedLong
。 -
lengthFieldEndOffset
表示长度字段结尾偏移量。就是
lengthFieldOffset + lengthFieldLength
结果值。 -
lengthAdjustment
表示调整量。要读取帧的长度,是由
lengthFieldLength
得到帧长度frameLength
再加上长度字段结尾偏移量lengthFieldEndOffset
,最后加上这个调整量lengthAdjustment
,即frameLength += lengthAdjustment + lengthFieldEndOffset
。 -
initialBytesToStrip
截断开始一部分数据。
6.2 具体实现
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (discardingTooLongFrame) {
// 需要丢弃帧
discardingTooLongFrame(in);
}
if (in.readableBytes() < lengthFieldEndOffset) {
// 缓存区的可读内容还不够读取帧长度字段,
// 那么直接返回 null,继续读取
return null;
}
// 计算帧长度字段的索引 actualLengthFieldOffset
int actualLengthFieldOffset = in.readerIndex() + lengthFieldOffset;
// 通过 get 方法获取帧长度,不改变缓存区读索引
long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
if (frameLength < 0) {
// 帧长度不可能小于 0
failOnNegativeLengthField(in, frameLength, lengthFieldEndOffset);
}
// 通过 lengthAdjustment 和 lengthFieldEndOffset
// 来调整最后要读取帧内容的大小。
// 因为帧长度 frameLength 由用户自定义,有可能包含头长度,有可能不包含,
// 所以需要灵活调整
frameLength += lengthAdjustment + lengthFieldEndOffset;
if (frameLength < lengthFieldEndOffset) {
failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
}
if (frameLength > maxFrameLength) {
// 超过最大帧内容,需要进行特殊处理,不用读取帧内容了。
exceededFrameLength(in, frameLength);
return null;
}
// 永远不会溢出,因为它小于maxFrameLength
int frameLengthInt = (int) frameLength;
if (in.readableBytes() < frameLengthInt) {
// 当缓存区可读数据字节数小于 frameLengthInt,
// 表示还没有获取到足够数据,那么就返回 null,继续让缓存区收集数据。
return null;
}
if (initialBytesToStrip > frameLengthInt) {
// 跳过的字节数超过 帧大小,这个需要抛出异常
failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, initialBytesToStrip);
}
// 跳过 initialBytesToStrip 个字节, initialBytesToStrip 的值只会大于或等于0
in.skipBytes(initialBytesToStrip);
// extract frame
int readerIndex = in.readerIndex();
// 减去跳过的字节数 initialBytesToStrip, 就是最后要获取 帧的内容大小
int actualFrameLength = frameLengthInt - initialBytesToStrip;
// 获取内容帧数据
ByteBuf frame = extractFrame(ctx, in, readerIndex, actualFrameLength);
// 设置缓存区读索引,表示已经读取数据了
in.readerIndex(readerIndex + actualFrameLength);
return frame;
}
总的处理流程:
- 先通过
getUnadjustedFrameLength(...)
方法,读取lengthFieldLength
代表的帧长度frameLength
。这个帧长度可能是整个数据的长度,也可能只是内容体的长度,由用户自定义。
-
frameLength += lengthAdjustment + lengthFieldEndOffset
来灵活调整帧长度。通过
lengthAdjustment
来调整帧长度。 - 通过
in.skipBytes(initialBytesToStrip)
方法跳过一部分内容。
6.3 例子
6.3.1 头只有长度字段
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
lengthFieldLength
代表的长度是12
(0x000C
)。frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是14
,lengthAdjustment = 0
和lengthFieldEndOffset = 2
。initialBytesToStrip = 0
,那么最终读取就是14
个字节数据。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
initialBytesToStrip = 2 (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
lengthFieldLength
代表的长度是12
(0x000C
)。frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是14
,lengthAdjustment = 0
和lengthFieldEndOffset = 2
。initialBytesToStrip = 2
,那么最终读取就是跳过开头两个字节的12
个字节数据。
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = -2 (= the length of the Length field)
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
lengthFieldLength
代表的长度是14
(0x000E
)frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是14
,lengthAdjustment = -2
和lengthFieldEndOffset = 2
。通过lengthAdjustment
来调整帧长度。initialBytesToStrip = 0
,那么最终读取就是14
个字节数据。
6.3.2 头中有两个字段
lengthFieldOffset = 2 (= the length of Header 1)
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
lengthFieldLength
代表的长度是12
(0x000C
)frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是17
,lengthAdjustment = 0
和lengthFieldEndOffset = 5
。initialBytesToStrip = 0
,那么最终读取就是17
个字节数据。
lengthFieldOffset = 0
lengthFieldLength = 3
lengthAdjustment = 2 (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
lengthFieldLength
代表的长度是12
(0x000C
)frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是17
,lengthAdjustment = 2
和lengthFieldEndOffset = 3
。initialBytesToStrip = 0
,那么最终读取就是17
个字节数据。
6.3.3 头中有多个字段
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
lengthAdjustment = 1 (= the length of HDR2)
initialBytesToStrip = 3 (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
lengthFieldLength
代表的长度是12
(0x000C
)frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是16
,lengthAdjustment = 1
和lengthFieldEndOffset = 3
。initialBytesToStrip = 3
,那么最终读取就是跳过开头三个字节的13
个字节数据。
lengthFieldOffset = 1
lengthFieldLength = 2
lengthAdjustment = -3 (= the length of HDR1 + LEN, negative)
initialBytesToStrip = 3
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
lengthFieldLength
代表的长度是16
(0x0010
)frameLength += lengthAdjustment + lengthFieldEndOffset
的值就是16
,lengthAdjustment = -3
和lengthFieldEndOffset = 3
。initialBytesToStrip = 3
,那么最终读取就是跳过开头三个字节的13
个字节数据。