系列
Netty源码分析 - Bootstrap服务端
Netty源码分析 - Bootstrap客户端
netty源码分析 - ChannelHandler
netty源码分析 - EventLoop类关系
netty源码分析 - register分析
Netty源码分析 - NioEventLoop事件处理
netty源码分析 - accept过程分析
Netty源码分析 - ByteBuf
Netty源码分析 - 粘包和拆包问题
背景介绍
产生粘包主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较小,没达到缓冲区大小,TCP则会将多个请求合并为同一个请求进行发送,这就形成了粘包问题。
产生拆包主要原因是,操作系统在发送TCP数据的时候,底层会有一个缓冲区,例如1024个字节大小,如果一次请求发送的数据量比较大,超过了缓冲区大小,TCP就会将其拆分为多次发送,这就是拆包问题。
上图中演示了粘包和拆包的三种情况:
- A和B两个包都刚好满足TCP缓冲区的大小,或者说其等待时间已经达到TCP等待时长,从而还是使用两个独立的包进行发送;
- A和B两次请求间隔时间内较短,并且数据包较小,因而合并为同一个包发送给服务端;
- B包比较大,因而将其拆分为两个包B_1和B_2进行发送,而这里由于拆分后的B_2比较小,其又与A包合并在一起发送。
常见解决方案
对于粘包和拆包问题,常见的解决方案有四种:
客户端在发送数据包的时候,每个包都固定长度,比如1024个字节大小,如果客户端发送的数据长度不足1024个字节,则通过补充空格的方式补全到指定长度;
客户端在每个包的末尾使用固定的分隔符,例如\r\n,如果一个包被拆分了,则等待下一个包发送过来之后找到其中的\r\n,然后对其拆分后的头部部分与前一个包的剩余部分进行合并,这样就得到了一个完整的包;
将消息分为头部和消息体,在头部中保存有当前整个消息的长度,只有在读取到足够长度的消息之后才算是读到了一个完整的消息;
通过自定义协议进行粘包和拆包的处理。
FixedLengthFrameDecoder
/**
* A decoder that splits the received {@link ByteBuf}s by the fixed number
* of bytes. For example, if you received the following four fragmented packets:
* <pre>
* +---+----+------+----+
* | A | BC | DEFG | HI |
* +---+----+------+----+
* </pre>
* A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
* following three packets with the fixed length:
* <pre>
* +-----+-----+-----+
* | ABC | DEF | GHI |
* +-----+-----+-----+
* </pre>
*/
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength;
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0) {
throw new IllegalArgumentException(
"frameLength must be a positive integer: " + frameLength);
}
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(
@SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
// 按照固定长度进行读取
return in.readRetainedSlice(frameLength);
}
}
}
- 该解码器会每次读取固定长度的消息,如果当前读取到的消息不足指定长度,那么就会等待下一个消息到达后进行补足。其使用也比较简单,只需要在构造函数中指定每个消息的长度即可。
LineBasedFrameDecoder
/**
* A decoder that splits the received {@link ByteBuf}s on line endings.
* <p>
* Both {@code "\n"} and {@code "\r\n"} are handled.
* For a more general delimiter-based decoder, see {@link DelimiterBasedFrameDecoder}.
*/
- LineBasedFrameDecoder的工作原理是它依次遍历ByteBuf中的可读字节,判断看是否有“\n”或者“\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。
- 它是以换行符为结束标志的解码器,支持携带结束符或者不携带结束符两种解码方式,同时支持配置单行的最大长度。
- 如果连续读取到最大长度后仍然没有发现换行符,就会抛出异常,同时忽略掉之前读到的异常码流。防止由于数据报没有携带换行符导 致接收到ByteBuf无限制积压,引起系统内存溢出。
public class LineBasedFrameDecoder extends ByteToMessageDecoder {
/** Maximum length of a frame we're willing to decode. */
private final int maxLength;
/** Whether or not to throw an exception as soon as we exceed maxLength. */
private final boolean failFast;
private final boolean stripDelimiter;
// 当前拆包是否属于丢弃模式,用一个成员变量来标识
private boolean discarding;
private int discardedBytes;
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 找到换行符位置
final int eol = findEndOfLine(buffer);
if (!discarding) {
// 非discarding模式下找到行分隔符的处理
if (eol >= 0) {
// 1.计算分隔符和包长度
final ByteBuf frame;
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 2.丢弃异常数据
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
// 3.取包的时候是否包括分隔符
if (stripDelimiter) {
frame = buffer.readRetainedSlice(length);
buffer.skipBytes(delimLength);
} else {
frame = buffer.readRetainedSlice(length + delimLength);
}
return frame;
} else {
// 非discarding模式下未找到分隔符的处理
final int length = buffer.readableBytes();
// 首先取得当前字节容器的可读字节个数,判断一下是否已经超过可允许的最大长度,
// 如果没有超过直接返回null,字节容器中的数据没有任何改变,
// 如果超过需要进入丢弃模式
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 在discarding模式下,如果找到分隔符,那可以将分隔符之前的都丢弃掉
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
// discarding模式下未找到行分隔符,当前读取的数据是丢弃的一部分,所以直接丢弃
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
private static int findEndOfLine(final ByteBuf buffer) {
// for循环遍历,找到第一个 \n 的位置,如果\n前面的字符为\r,那就返回\r的位置
// ByteProcessor FIND_LF = new IndexOfProcessor((byte) '\n');
int i = buffer.forEachByte(ByteProcessor.FIND_LF);
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
return i;
}
}
非discarding模式下找到行分隔符的处理
- 1.首先,新建一个帧,计算一下当前包的长度和分隔符的长度(因为有两种分隔符)
- 2.然后判断一下需要拆包的长度是否大于该拆包器允许的最大长度(maxLength),这个参数在构造函数中被传递进来,如超出允许的最大长度,就将这段数据抛弃,返回null
- 3.最后,将一个完整的数据包取出,如果构造本解包器的时候指定 stripDelimiter为false,即解析出来的包包含分隔符,默认为不包含分隔符
非discarding模式下未找到分隔符的处理
首先取得当前字节容器的可读字节个数,判断一下是否已经超过可允许的最大长度,如果没有超过,直接返回null,字节容器中的数据没有任何改变,否则,就需要进入丢弃模式
使用一个成员变量
discardedBytes
来表示已经丢弃了多少数据,然后将字节容器的读指针移到写指针,意味着丢弃这一部分数据,设置成员变量discarding
为true表示当前处于丢弃模式。如果设置了failFast
,那么直接抛出异常,默认情况下failFast
为false,即安静得丢弃数据
discarding模式下找到行分隔符
- 在discarding模式下,如果找到分隔符,那可以将分隔符之前的都丢弃掉
discarding模式下未找到行分隔符
- 因为当前还在丢弃模式,没有找到行分隔符意味着当前一个完整的数据包还没丢弃完,当前读取的数据是丢弃的一部分,所以直接丢弃。
DelimiterBasedFrameDecoder
/**
* A decoder that splits the received {@link ByteBuf}s by one or more
* delimiters. It is particularly useful for decoding the frames which ends
* with a delimiter such as {@link Delimiters#nulDelimiter() NUL} or
* {@linkplain Delimiters#lineDelimiter() newline characters}.
*
* <h3>Predefined delimiters</h3>
* <p>
* {@link Delimiters} defines frequently used delimiters for convenience' sake.
*
* <h3>Specifying more than one delimiter</h3>
* <p>
* {@link DelimiterBasedFrameDecoder} allows you to specify more than one
* delimiter. If more than one delimiter is found in the buffer, it chooses
* the delimiter which produces the shortest frame. For example, if you have
* the following data in the buffer:
* <pre>
* +--------------+
* | ABC\nDEF\r\n |
* +--------------+
* </pre>
* a {@link DelimiterBasedFrameDecoder}({@link Delimiters#lineDelimiter() Delimiters.lineDelimiter()})
* will choose {@code '\n'} as the first delimiter and produce two frames:
* <pre>
* +-----+-----+
* | ABC | DEF |
* +-----+-----+
* </pre>
* rather than incorrectly choosing {@code '\r\n'} as the first delimiter:
* <pre>
* +----------+
* | ABC\nDEF |
* +----------+
* </pre>
*/
- DelimiterBasedFrameDecoder与LineBasedFrameDecoder类似,只不过更加通用,允许我们指定任意特殊字符作为分隔符。
- 我们还可以同时指定多个分隔符,如果在请求中发的确有多个分隔符,将会选择内容最短的一个分隔符作为依据。
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
private final ByteBuf[] delimiters;
private final int maxFrameLength;
private final boolean stripDelimiter;
private final boolean failFast;
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
private final LineBasedFrameDecoder lineBasedDecoder;
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
if (delimiters == null) {
throw new NullPointerException("delimiters");
}
if (delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
}
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 针对有LineBasedFrameDecoder的情况直接走LineBasedFrameDecoder的逻辑
if (lineBasedDecoder != null) {
return lineBasedDecoder.decode(ctx, buffer);
}
// 如果在请求中发的确有多个分隔符,将会选择内容最短的一个分隔符作为依据。
int minFrameLength = Integer.MAX_VALUE;
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
// 从buffer中查找指定分隔符的位置,取分割后内容最短的一个。
int frameLength = indexOf(buffer, delim);
if (frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
// 处理有分割符的场景
if (minDelim != null) {
int minDelimLength = minDelim.capacity();
ByteBuf frame;
// 丢弃太长的报文
if (discardingTooLongFrame) {
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
// 丢弃过长的报文
if (minFrameLength > maxFrameLength) {
// Discard read frame.
buffer.skipBytes(minFrameLength + minDelimLength);
fail(minFrameLength);
return null;
}
// 跳过分割符,只获取minFrameLength的报文
if (stripDelimiter) {
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
// 不跳过分割符,获取minFrameLength+分隔符长度的报文
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
return frame;
} else {
if (!discardingTooLongFrame) {
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
discardingTooLongFrame = true;
if (failFast) {
fail(tooLongFrameLength);
}
}
} else {
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
}
- 从buffer中查找指定分隔符的位置,多个分隔符的情况下取分割后内容最短的一个进行处理。