Netty中解决半包粘包问题

粘包半包产生原因

半包就是收到了半个包,它是由于发送包的大小比TCP发送缓存的容量大,具体说就是客户端发的数据大于套接字缓冲区大小,或者是大于协议的MTU(Maximum Transmission Unit )。这样数据包会分成多个包,通过Socket多次发送到服务端。服务端没法一次接收到整个包的数据,只接收了当前TCP缓存里面的部分,就是半包。所以说半包并不是严格意义上的收到了半个包,收到包的一部分都叫半包。
粘包和半包对应,就是多个包在一个TCP缓存里面一次发送过来了,这样如果服务端接收缓存相当于一次读了多个包进来,这就叫粘包。所以说粘包产生的根本原因在于客户端每次写入数据小于套接字缓冲区大小,还有就是接收方读取套接字缓冲区数据又不够及时,导致累积了多个包一起读。

如何解决

既然半包和粘包产生的原因在于数据包没有一个一个的在服务端读取,那么要解决它就是把这个边界,或者说切分点找出来,把他分成或者是组合成一个个完整的数据包让客户端读取就好了。这个切分,主要采用封装成帧的方式:


image.png

Netty中如何实现

Netty中针对上述封装成帧的办法都做了具体实现:

image.png

FixedLengthFrameDecoder类(固定长度解码):

public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    private final int frameLength;   //指定的帧长度

    public FixedLengthFrameDecoder(int frameLength) {
        if (frameLength <= 0) {
            throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength);
        } else {
            this.frameLength = frameLength;
        }
    }

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = this.decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }

    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        return in.readableBytes() < this.frameLength ? null : in.readRetainedSlice(this.frameLength);
    }
}

DelimiterBasedFrameDecoder类(基于分割符解码):

public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteBuf[] delimiters;
    private final int maxFrameLength;
    private final boolean stripDelimiter;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private int tooLongFrameLength;
    private final LineBasedFrameDecoder lineBasedDecoder;

    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf delimiter) {
        this(maxFrameLength, true, delimiter);
    }

    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf delimiter) {
        this(maxFrameLength, stripDelimiter, true, delimiter);
    }

    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf delimiter) {
        this(maxFrameLength, stripDelimiter, failFast, delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes()));
    }

    public DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters) {
        this(maxFrameLength, true, delimiters);
    }

    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, ByteBuf... delimiters) {
        this(maxFrameLength, stripDelimiter, true, delimiters);
    }

    public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
        validateMaxFrameLength(maxFrameLength);
        if (delimiters == null) {
            throw new NullPointerException("delimiters");
        } else if (delimiters.length == 0) {
            throw new IllegalArgumentException("empty delimiters");
        } else {
            if (isLineBased(delimiters) && !this.isSubclass()) {
                this.lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
                this.delimiters = null;
            } else {
                this.delimiters = new ByteBuf[delimiters.length];

                for(int i = 0; i < delimiters.length; ++i) {
                    ByteBuf d = delimiters[i];
                    validateDelimiter(d);
                    this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
                }

                this.lineBasedDecoder = null;
            }

            this.maxFrameLength = maxFrameLength;
            this.stripDelimiter = stripDelimiter;
            this.failFast = failFast;
        }
    }

    private static boolean isLineBased(ByteBuf[] delimiters) {
        if (delimiters.length != 2) {
            return false;
        } else {
            ByteBuf a = delimiters[0];
            ByteBuf b = delimiters[1];
            if (a.capacity() < b.capacity()) {
                a = delimiters[1];
                b = delimiters[0];
            }

            return a.capacity() == 2 && b.capacity() == 1 && a.getByte(0) == 13 && a.getByte(1) == 10 && b.getByte(0) == 10;
        }
    }

    private boolean isSubclass() {
        return this.getClass() != DelimiterBasedFrameDecoder.class;
    }

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = this.decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }

    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        if (this.lineBasedDecoder != null) {
            return this.lineBasedDecoder.decode(ctx, buffer);
        } else {
            int minFrameLength = 2147483647;
            ByteBuf minDelim = null;
            ByteBuf[] var5 = this.delimiters;
            int var6 = var5.length;

            int tooLongFrameLength;
            for(tooLongFrameLength = 0; tooLongFrameLength < var6; ++tooLongFrameLength) {
                ByteBuf delim = var5[tooLongFrameLength];
                int frameLength = indexOf(buffer, delim);
                if (frameLength >= 0 && frameLength < minFrameLength) {
                    minFrameLength = frameLength;
                    minDelim = delim;
                }
            }

            if (minDelim != null) {
                int minDelimLength = minDelim.capacity();
                if (this.discardingTooLongFrame) {
                    this.discardingTooLongFrame = false;
                    buffer.skipBytes(minFrameLength + minDelimLength);
                    tooLongFrameLength = this.tooLongFrameLength;
                    this.tooLongFrameLength = 0;
                    if (!this.failFast) {
                        this.fail((long)tooLongFrameLength);
                    }

                    return null;
                } else if (minFrameLength > this.maxFrameLength) {
                    buffer.skipBytes(minFrameLength + minDelimLength);
                    this.fail((long)minFrameLength);
                    return null;
                } else {
                    ByteBuf frame;
                    if (this.stripDelimiter) {
                        frame = buffer.readRetainedSlice(minFrameLength);
                        buffer.skipBytes(minDelimLength);
                    } else {
                        frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
                    }

                    return frame;
                }
            } else {
                if (!this.discardingTooLongFrame) {
                    if (buffer.readableBytes() > this.maxFrameLength) {
                        this.tooLongFrameLength = buffer.readableBytes();
                        buffer.skipBytes(buffer.readableBytes());
                        this.discardingTooLongFrame = true;
                        if (this.failFast) {
                            this.fail((long)this.tooLongFrameLength);
                        }
                    }
                } else {
                    this.tooLongFrameLength += buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                }

                return null;
            }
        }
    }

    private void fail(long frameLength) {
        if (frameLength > 0L) {
            throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + " - discarding");
        }
    }

    private static int indexOf(ByteBuf haystack, ByteBuf needle) {
        for(int i = haystack.readerIndex(); i < haystack.writerIndex(); ++i) {
            int haystackIndex = i;

            int needleIndex;
            for(needleIndex = 0; needleIndex < needle.capacity() && haystack.getByte(haystackIndex) == needle.getByte(needleIndex); ++needleIndex) {
                ++haystackIndex;
                if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {
                    return -1;
                }
            }

            if (needleIndex == needle.capacity()) {
                return i - haystack.readerIndex();
            }
        }

        return -1;
    }

    private static void validateDelimiter(ByteBuf delimiter) {
        if (delimiter == null) {
            throw new NullPointerException("delimiter");
        } else if (!delimiter.isReadable()) {
            throw new IllegalArgumentException("empty delimiter");
        }
    }

    private static void validateMaxFrameLength(int maxFrameLength) {
        if (maxFrameLength <= 0) {
            throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength);
        }
    }
}

LengthFieldBasedFrameDecoder(固定长度字段内容)类:

public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
    private final ByteOrder byteOrder;
    private final int maxFrameLength;
    private final int lengthFieldOffset;
    private final int lengthFieldLength;
    private final int lengthFieldEndOffset;
    private final int lengthAdjustment;
    private final int initialBytesToStrip;
    private final boolean failFast;
    private boolean discardingTooLongFrame;
    private long tooLongFrameLength;
    private long bytesToDiscard;

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        this(maxFrameLength, lengthFieldOffset, lengthFieldLength, 0, 0);
    }

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
        this(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, true);
    }

    public LengthFieldBasedFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        this(ByteOrder.BIG_ENDIAN, maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast);
    }

    public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        if (byteOrder == null) {
            throw new NullPointerException("byteOrder");
        } else if (maxFrameLength <= 0) {
            throw new IllegalArgumentException("maxFrameLength must be a positive integer: " + maxFrameLength);
        } else if (lengthFieldOffset < 0) {
            throw new IllegalArgumentException("lengthFieldOffset must be a non-negative integer: " + lengthFieldOffset);
        } else if (initialBytesToStrip < 0) {
            throw new IllegalArgumentException("initialBytesToStrip must be a non-negative integer: " + initialBytesToStrip);
        } else if (lengthFieldOffset > maxFrameLength - lengthFieldLength) {
            throw new IllegalArgumentException("maxFrameLength (" + maxFrameLength + ") must be equal to or greater than lengthFieldOffset (" + lengthFieldOffset + ") + lengthFieldLength (" + lengthFieldLength + ").");
        } else {
            this.byteOrder = byteOrder;
            this.maxFrameLength = maxFrameLength;
            this.lengthFieldOffset = lengthFieldOffset;
            this.lengthFieldLength = lengthFieldLength;
            this.lengthAdjustment = lengthAdjustment;
            this.lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
            this.initialBytesToStrip = initialBytesToStrip;
            this.failFast = failFast;
        }
    }

    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = this.decode(ctx, in);
        if (decoded != null) {
            out.add(decoded);
        }

    }

    private void discardingTooLongFrame(ByteBuf in) {
        long bytesToDiscard = this.bytesToDiscard;
        int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes());
        in.skipBytes(localBytesToDiscard);
        bytesToDiscard -= (long)localBytesToDiscard;
        this.bytesToDiscard = bytesToDiscard;
        this.failIfNecessary(false);
    }

    private static void failOnNegativeLengthField(ByteBuf in, long frameLength, int lengthFieldEndOffset) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength);
    }

    private static void failOnFrameLengthLessThanLengthFieldEndOffset(ByteBuf in, long frameLength, int lengthFieldEndOffset) {
        in.skipBytes(lengthFieldEndOffset);
        throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than lengthFieldEndOffset: " + lengthFieldEndOffset);
    }

    private void exceededFrameLength(ByteBuf in, long frameLength) {
        long discard = frameLength - (long)in.readableBytes();
        this.tooLongFrameLength = frameLength;
        if (discard < 0L) {
            in.skipBytes((int)frameLength);
        } else {
            this.discardingTooLongFrame = true;
            this.bytesToDiscard = discard;
            in.skipBytes(in.readableBytes());
        }

        this.failIfNecessary(true);
    }

    private static void failOnFrameLengthLessThanInitialBytesToStrip(ByteBuf in, long frameLength, int initialBytesToStrip) {
        in.skipBytes((int)frameLength);
        throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less than initialBytesToStrip: " + initialBytesToStrip);
    }

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (this.discardingTooLongFrame) {
            this.discardingTooLongFrame(in);
        }

        if (in.readableBytes() < this.lengthFieldEndOffset) {
            return null;
        } else {
            int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset;
            long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder);
            if (frameLength < 0L) {
                failOnNegativeLengthField(in, frameLength, this.lengthFieldEndOffset);
            }

            frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
            if (frameLength < (long)this.lengthFieldEndOffset) {
                failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, this.lengthFieldEndOffset);
            }

            if (frameLength > (long)this.maxFrameLength) {
                this.exceededFrameLength(in, frameLength);
                return null;
            } else {
                int frameLengthInt = (int)frameLength;
                if (in.readableBytes() < frameLengthInt) {
                    return null;
                } else {
                    if (this.initialBytesToStrip > frameLengthInt) {
                        failOnFrameLengthLessThanInitialBytesToStrip(in, frameLength, this.initialBytesToStrip);
                    }

                    in.skipBytes(this.initialBytesToStrip);
                    int readerIndex = in.readerIndex();
                    int actualFrameLength = frameLengthInt - this.initialBytesToStrip;
                    ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength);
                    in.readerIndex(readerIndex + actualFrameLength);
                    return frame;
                }
            }
        }
    }

    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        buf = buf.order(order);
        long frameLength;
        switch(length) {
        case 1:
            frameLength = (long)buf.getUnsignedByte(offset);
            break;
        case 2:
            frameLength = (long)buf.getUnsignedShort(offset);
            break;
        case 3:
            frameLength = (long)buf.getUnsignedMedium(offset);
            break;
        case 4:
            frameLength = buf.getUnsignedInt(offset);
            break;
        case 5:
        case 6:
        case 7:
        default:
            throw new DecoderException("unsupported lengthFieldLength: " + this.lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)");
        case 8:
            frameLength = buf.getLong(offset);
        }

        return frameLength;
    }

    private void failIfNecessary(boolean firstDetectionOfTooLongFrame) {
        if (this.bytesToDiscard == 0L) {
            long tooLongFrameLength = this.tooLongFrameLength;
            this.tooLongFrameLength = 0L;
            this.discardingTooLongFrame = false;
            if (!this.failFast || firstDetectionOfTooLongFrame) {
                this.fail(tooLongFrameLength);
            }
        } else if (this.failFast && firstDetectionOfTooLongFrame) {
            this.fail(this.tooLongFrameLength);
        }

    }

    protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) {
        return buffer.retainedSlice(index, length);
    }

    private void fail(long frameLength) {
        if (frameLength > 0L) {
            throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded");
        } else {
            throw new TooLongFrameException("Adjusted frame length exceeds " + this.maxFrameLength + " - discarding");
        }
    }
}

参考资料:https://www.cnblogs.com/-qilin/p/11686539.h

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。