在Java NIO相关的组件中,ByteBuffer是除了Selector、Channel之外的另一个很重要的组件,它是直接和Channel打交道的缓冲区,通常场景或是从ByteBuffer写入Channel,或是从Channel读入Buffer;
在Netty中,被精心设计的ByteBuf则是Netty贯穿整个开发过程中的核心缓冲区,其简化了ByteBuffer的操作,同时提供与ByteBuffer互操作api,方便了应用的开发。
关于Java NIO的ByteBuffer请参考:https://www.jianshu.com/p/49d20a7547f6
1、ByteBuf功能说明
1.1、ByteBuffer的缺点
从功能角度而言,ByteBuffer完全可以满足NIO编程的需要,但是由于NIO编程的复杂性,ByteBuffer也有其局限性,它的主要缺点如下:
- ByteBuffer长度固定,一旦分配完成,它的容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常;
- ByteBuffer只有一个标识位置的指针position,读写的时候需要手工调用flip()和rewind()等,使用者必须小心谨慎地处理这些API,否则很容易导致程序处理失败;
- ByteBuffer的API功能有限,一些高级和实用的特性它不支持,需要使用者自己编程实现。
为了弥补这些不足,Netty提供了自己的ByteBuffer实现——ByteBuf。
1.2、ByteBuf原理
ByteBuf通过以下位置指针以简化缓冲区操作:
- readerIndex:读操作指针,标识读起始位置;
- writerIndex:写操作指针,标识写起始位置;
- maxCapacity:缓冲区容量指针,标识缓冲区的容量大小;
- markedReaderIndex:读标记指针,记录readerIndex值;
- markedWriterIndex:读标记指针,记录writterIndex值;
readerIndex和writerIndex的取值一开始都是0,随着数据的写入writerIndex会增加,读取数据会使readerIndex增加,但是它不会超过writerIndex。
在读取之后,0~readerIndex的就被视为discard的,调用discardReadBytes()方法,可以释放这部分空间,它的作用类似ByteBufferd的compact()方法。readerIndex和writerIndex之间的数据是可读取的,等价于ByteBuffer的position和limit之间的数据。writerIndex和capacity之间的空间是可写的,等价于ByteBuffer的limit和capacity之间的可用空间。
由于写操作不修改readerIndex指针,读操作不修改writerIndex指针,因此读写之间不再需要调整位置指针,这极大地简化了缓冲区的读写操作,避免了由于遗漏或者不熟悉flip()操作导致的功能异常。
初始分配的ByteBuf如图:
写入N字节后的ByteBuf如图:
读取M(M<N)字节后的ByteBuf如图:
调用discarReadBytes()之后的ByteBuf如图:
调用clear()操作之后的ByteBuf如图:
1.3、动态扩容
当我们对ByteBuffer进行put操作的时候,如果缓冲区剩余可写空间不够,就会发生 BufferOverflowException异常。为了避免发生这个问题,通常在进行put操作的时候会对剩余可用空间进行校验,如果剩余空间不足,需要重新创建一个新的ByteBuffer,并将之前的ByteBuffer复制到新创建的ByteBuffer中,最后释放老的ByteBuffer。
if(this.buffer.remaining() < needSize) {
int toBeExtSize = needSize < 128 ? needSize : 128;
ByteBuffer tmpBuffer = ByteBuffer.allocate(this.buffer.capacity() + toBeExtSize);
this.buffer.flip();
tmpBuffer.put(this.buffer);
this.buffer = tmpBuffer;
}
从示例代码可以看出,为了防止ByteBuffer溢出,每进行一次put操作,都需要对可用空间进行校验,这导致了代码冗余,稍有不慎,就可能引入其他问题。为了解决这个问题,ByteBuf对write操作进行了封装,由ByteBuf的write操作负责进行剩余可用空间的校验,如果可用缓冲区不足,ByteBuf会自动进行动态扩展,对于使用者而言,不需要关心底层的校验和扩展细节,只要不超过设置的最大缓冲区容量即可。当可用空间不足时,ByteBuf会帮助我们实现自动扩展。
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
ensureWritable0(minWritableBytes);
return this;
}
final void ensureWritable0(int minWritableBytes) {
ensureAccessible();
if (minWritableBytes <= writableBytes()) {
return;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// Adjust to the new capacity.
capacity(newCapacity);
}
通过源码分析,我们发现当进行write操作时会对需要write的字节进行校验,如果可写的字节数小于需要写入的字节数,并且需要写入的字节数小于可写的最大字节数时,对缓冲区进行动态扩展。无论缓冲区是否进行了动态扩展,从功能角度看使用者并不感知,这样就简化了上层的应用。
1.4、ByteBuf操作介绍
1.4.1、顺序读操作(read):
方法名称 | 返回值 | 功能说明 | 抛出异常 |
---|---|---|---|
readBoolean() | boolean | 从readerIndex开始读取1字节的数据 | throwsIndexOutOfBoundsExceptionreadableBytes<1 |
readByte() | byte | 从readerIndex开始读取1字节的数据 | throws IndexOutOfBoundsExceptionreadableBytes<1 |
readUnsignedByte() | short | 从readerIndex开始读取1字节的数据(无符号字节值) | throws IndexOutOfBoundsException:readableBytes<1 |
readShort() | short | 从readerIndex开始读取16位的短整形值 | throws IndexOutOfBoundsException:readableBytes<2 |
readUnsignedShort() | int | 从readerIndex开始读取16位的无符号短整形值 | throws IndexOutOfBoundsException:readableBytes<2 |
readMedium() | int | 从readerIndex开始读取24位的整形值,(该类型并非java基本类型,通常不用) | throws IndexOutOfBoundsException:readableBytes<3 |
readUnsignedMedium() | int | 从readerIndex开始读取24位的无符号整形值,(该类型并非java基本类型,通常不用) | throws IndexOutOfBoundsException:readableBytes<3 |
readInt() | int | 从readerIndex开始读取32位的整形值 | throws IndexOutOfBoundsException:readableBytes<4 |
readUnsignedInt() | long | 从readerIndex开始读取32位的无符号整形值 | throws IndexOutOfBoundsException:readableBytes<4 |
readLong() | long | 从readerIndex开始读取64位的整形值 | throws IndexOutOfBoundsException:readableBytes<8 |
readChar() | char | 从readerIndex开始读取2字节的字符值 | throws IndexOutOfBoundsException:readableBytes<2 |
readFloat() | float | 从readerIndex开始读取32位的浮点值 | throws IndexOutOfBoundsException:readableBytes<4 |
readDouble() | double | 从readerIndex开始读取64位的浮点值 | throws IndexOutOfBoundsException:readableBytes<8 |
readBytes(int length) | ByteBuf | 将当前ByteBuf中的数据读取到新创建的ByteBuf中,从readerIndex开始读取length字节的数据。返回的ByteBuf readerIndex 为0,writeIndex为length。 | throws IndexOutOfBoundsException:readableBytes<length |
readSlice(int length) | ByteBuf | 返回当前ByteBuf新创建的子区域,子区域和原ByteBuf共享缓冲区的内容,但独立维护自己的readerIndex和writeIndex,新创建的子区域readerIndex 为0,writeIndex为length。 | throws IndexOutOfBoundsException:readableBytes<length |
readBytes(ByteBuf dst) | ByteBuf | 将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从当前ByteBuf readerIndex开始读取,直到目标ByteBuf无可写空间,从目标ByteBuf writeIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=读取的字节数。目标ByteBuf的writeIndex+=读取的字节数。 | throws IndexOutOfBoundsException:this.readableBytes<dst.writableBytes |
readBytes(ByteBuf dst, int length) | ByteBuf | 将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从当前ByteBuf readerIndex开始读取,长度为length,从目标ByteBuf writeIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=length,目标ByteBuf的writeIndex+=length | throws IndexOutOfBoundsException:this.readableBytes<length ordst.writableBytes<length |
readBytes(ByteBuf dst, int dstIndex, int length) | ByteBuf | 将当前ByteBuf中的数据读取到目标ByteBuf (dst)中,从readerIndex开始读取,长度为length,从目标ByteBuf dstIndex开始写入数据。读取完成后,当前ByteBuf的readerIndex+=length,目标ByteBuf的writeIndex+=length | throws IndexOutOfBoundsException:dstIndex<0 orthis.readableBytes<length ordst.capacity<dstIndex + length |
readBytes(byte[] dst) | ByteBuf | 将当前ByteBuf中的数据读取到byte数组dst中,从当前ByteBuf readerIndex开始读取,读取长度为dst.length,从byte数组dst索引0处开始写入数据。 | throws IndexOutOfBoundsException:this.readableBytes<dst.length |
readBytes(byte[] dst, int dstIndex, int length) | ByteBuf | 将当前ByteBuf中的数据读取到byte数组dst中,从当前ByteBuf readerIndex开始读取,读取长度为length,从byte数组dst索引dstIndex处开始写入数据。 | throws IndexOutOfBoundsException:dstIndex<0 or this.readableBytes<length or dst.length<dstIndex + length |
readBytes(ByteBuffer dst) | ByteBuf | 将当前ByteBuf中的数据读取到ByteBuffer dst中,从当前ByteBuf readerIndex开始读取,直到dst的位置指针到达ByteBuffer 的limit。读取完成后,当前ByteBuf的readerIndex+=dst.remaining() | throws IndexOutOfBoundsException:this.readableBytes<dst.remaining() |
readBytes(OutputStream out, int length) | ByteBuf | 将当前ByteBuf readerIndex读取数据到输出流OutputStream中,读取的字节长度为length | throws IndexOutOfBoundsException:this.readableBytes<length throws IOException |
readBytes(GatheringByteChannel out, int length) | int | 将当前ByteBuf readerIndex读取数到GatheringByteChannel 中,写入out的最大字节长度为length。GatheringByteChannel为非阻塞Channel,调用其write方法不能够保存将全部需要写入的数据均写入成功,存在半包问题。因此其写入的数据长度为【0,length】,如果操作成功,readerIndex+=实际写入的字节数,返回实际写入的字节数 | throws IndexOutOfBoundsException:this.readableBytes<length throws IOException |
1.4.2、顺序写操作(write):
方法名称 | 返回值 | 功能说明 | 抛出异常 |
---|---|---|---|
writeBoolean(boolean value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=1 | throws IndexOutOfBoundsException:this.writableBytes<1 |
writeByte(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=1 | throws IndexOutOfBoundsException:this.writableBytes<1 |
writeShort(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=2 | throws IndexOutOfBoundsException:this.writableBytes<2 |
writeMedium(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=3 | throws IndexOutOfBoundsException:this.writableBytes<3 |
writeInt(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=4 | throws IndexOutOfBoundsException:this.writableBytes<4 |
writeLong(long value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=8 | throws IndexOutOfBoundsException:this.writableBytes<8 |
writeChar(int value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=2 | Throws IndexOutOfBoundsException:this.writableBytes<2 |
writeFloat(float value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=4 | throws IndexOutOfBoundsException:this.writableBytes<4 |
writeDouble(double value) | ByteBuf | 将value写入到当前ByteBuf中。写入成功,writeIndex+=8 | throws IndexOutOfBoundsException:this.writableBytes<8 |
writeBytes(ByteBuf src) | ByteBuf | 将源ByteBuf src中从readerIndex开始的所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.readableBytes | throws IndexOutOfBoundsException:this.writableBytes<src.readableBytes |
writeBytes(ByteBuf src, int length) | ByteBuf | 将源ByteBuf src中从readerIndex开始,长度length的可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length | throws IndexOutOfBoundsException:this.writableBytes<length or src.readableBytes<length |
writeBytes(ByteBuf src, int srcIndex, int length) | ByteBuf | 将源ByteBuf src中从srcIndex开始,长度length的可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length | throws IndexOutOfBoundsException:srcIndex<0 or this.writableBytes<length or src.capacity<srcIndex + length |
writeBytes(byte[] src) | ByteBuf | 将源字节数组src中所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.length | throws IndexOutOfBoundsException:this.writableBytes<src.length |
writeBytes(byte[] src, int srcIndex, int length) | ByteBuf | 将源字节数组src中srcIndex开始,长度为length可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length | throws IndexOutOfBoundsException:srcIndex<0 or this.writableBytes<src.length or src.length<srcIndex + length |
writeBytes(ByteBuffer mignsrc) | ByteBuf | 将源ByteBuffer src中所有可读字节写入到当前ByteBuf。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=src.remaining() | throws IndexOutOfBoundsException:this.writableBytes<src.remaining() |
writeBytes(InputStream in, int length) | int | 将源InputStream in中的内容写入到当前ByteBuf,写入的最大长度为length,实际写入的字节数可能少于length。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=实际写入的字节数。返回实际写入的字节数 | throws IndexOutOfBoundsException:this.writableBytes<length |
writeBytes(ScatteringByteChannel in, int length) | int | 将源ScatteringByteChannel in中的内容写入到当前ByteBuf,写入的最大长度为length,实际写入的字节数可能少于length。从当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=实际写入的字节数。返回实际写入的字节数 | throws IndexOutOfBoundsException:this.writableBytes<length |
writeZero(int length) | ByteBuf | 将当前缓冲区的内容填充为NUL(0x00),当前ByteBuf writeIndex写入数据。写入成功,writeIndex+=length | throws IndexOutOfBoundsException: this.writableBytes<length |
1.4.3、readerIndex 和 writeIndex
ByteBuf提供两个指针用于支持顺序读写操作:readerIndex用于标识读取索引,witterIndex用于标识写入索引。两个位置指针将ByteBuf缓冲区分割成三个区域。
作来重用这部分空间,以节约内存,防止ByteBuf动态扩张。这在私有协议栈消息解码是非常有用,因TCP底层粘包,多个整包消息被TCP粘包作物一个整包发送。通过discardReadBytes()操作可以重用之前已经解码过的缓冲区,从而防止接收缓冲区因容量不足导致的扩展。但discardReadBytes()操作是把双刃剑,不能滥用。
方法名称 | 返回值 | 功能说明 | 抛出异常 |
---|---|---|---|
readerIndex() | int | 返回当前ByteBuf的readerIndex | |
readerIndex(int readerIndex) | ByteBuf | 修改当前ByteBuf的readerIndex | throws IndexOutOfBoundsException this.writerIndex<readerIndex |
writerIndex() | int | 返回当前ByteBuf的writeIndex | |
writerIndex(int writerIndex) | ByteBuf | 修改当前ByteBuf的writeIndex | throws IndexOutOfBoundsException writeIndex<this.readerIndex or this.capacity<writerIndex |
readableBytes() | int | 获取当前ByteBuf的可读字节数 this.writerIndex -this.readerIndex | |
writableBytes() | int | 获取当前ByteBuf的可写字节数 this.capacity - this.writerIndex | |
setIndex(int readerIndex, int writerIndex) | ByteBuf | 快捷设置当前ByteBuf的readerIndex和writerIndex | throws IndexOutOfBoundsException readerIndex<0 or this.writerIndex<readerIndex or this.capacity<writerIndex |
skipBytes(int length) | ByteBuf | 更新当前ByteBuf的readerIndex,更新后将跳过length字节的数据读取。 | throws IndexOutOfBoundsException this.readableBytes<length |
1.4.4、discardReadBytes()和clear操作
相比其他操作,缓冲区分配和释放是比较耗时的操作,因此,我们需要尽量重用它们。而缓冲区的动态扩张需要字节数组的复制,其是个耗时的操作。因此为最大程度提升性能,需要尽最大努力提升缓冲区的重用率。
例如:
缓冲区包含N个整包消息,每个消息长度L,缓冲区可写字节数为R。当读取M个整包消息后,如果不对ByteBuf做压缩或discardReadBytes()操作,则可写缓冲区长度依然为R。如果做discardReadBytes()操作,则可写字节数变为R=(R+M*L),之前已经读取的M个整包的空间会被重用。
需要注意的是,discardReadBytes()操作会发生字节数组的内存复制,故频繁调用会导致性能下降,因此要注意调用场景。
至于clear()操作,正如JDK的ByteBuffer的clear()一样,它并不会清空缓冲区内容本身。其主要通过还原readerIndex和writterIndex的值来达到清空缓冲区的效果。
方法名称 | 返回值 | 功能说明 |
---|---|---|
discardReadBytes() | ByteBuf | 释放0到readerIndex之间已经读取的空间;同时复制readerIndex和writerIndex之间的数据到0到writerIndex-readerIndex之间;修改readerIndex和writerIndex的值。该操作会发生字节数据的内存复制,频繁调用会导致性能下降。此外,相比其他java对象,缓冲区的分配和释放是个耗时的操作,缓冲区的动态扩张需要进行进行字节数据的复制,也是耗时的操作,因此应尽量提高缓冲区的重用率 |
discardSomeReadBytes() | ByteBuf | 功能和discardReadBytes()相似,不同之处在于可定制要释放的空间,依赖于具体实现 |
clear() | ByteBuf | 与JDK 的ByteBuffer clear操作相同,该操作不会清空缓冲区内容本身,其主要是为了操作位置指针,将readerIndex和writerIndex重置为0 |
1.4.5、Readable bytes和Writable bytes
readable bytes区段是数据的实际存储区域,已read和skip开头的任何操作都将从readIndex开始读取或者跳过指定的数据,操作完成之后readIndex增加了读取或跳过的字节数长度。若读取字节数长度大于实际可读的字节数,则抛出IndexOutOfBoundException。当出现分配、包装或复制一个新的ByteBuf对象时,它的readerIndex为0。
writable bytes区段是尚未被使用的空闲空间,任何write开头的操作都会从writerIndex开始向空闲空间写入,操作完成后writeIndex增加写入的字节长度。如果写入字节长度大于可写的字节数,则抛出IndexOutOfBoundException异常。新分配、包装器或复制一个ByteBuf时,其writterIndex为0。
方法名称 | 返回值 | 功能说明 |
---|---|---|
readableBytes() | int | 获取可读的字节数,其等效于(writerIndex - readerIndex) |
writableBytes() | int | 获取可写的字节数,其等效于(capacity - waiterIndex) |
maxWritableBytes() | int | 获取最大可读取字节数,其等效于(maxCapacity - writterIndex) |
1.4.6、Mark和Reset
当对缓冲区进行读写操作时,可能需要对之前的操作进行回滚。ByteBuf可通过调用mark操作将当前的位置指针备份到mark变量中,调用rest操作后,重新将指针的当前位置恢复为备份在mark变量的值。ByteBuf主要有以下相关方法:
- markReaderIndex():将当前的readerIndex备份到markedReaderIndex中;
- resetReaderIndex():将当前的readerIndex重置为markedReaderIndex的值;
- markWriterIndex() :将当前的writerIndex备份到markedWriterIndex中;
- resetWriterIndex():将当前的writerIndex重置为markedWriterIndex的值;
1.4.7、查找操作
ByteBuf提供多种查找方法用于满足不同应用场景,细分如下:
方法名称 | 返回值 | 功能说明 | 抛出异常 |
---|---|---|---|
indexOf(int fromIndex, int toIndex, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,fromIndex<=查找范围<toIndex;查找成功返回位置索引,否则返回-1 | |
bytesBefore(byte value) | int | 从当前ByteBuf中查找首次出现value的位置,readerIndex<=查找范围<writerIndex;查找成功返回位置索引,否则返回-1 | |
bytesBefore(int length, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,readerIndex<=查找范围<readerIndex+length;查找成功返回位置索引,否则返回-1 | IndexOutOfBoundsException:this.readableBytes<length |
bytesBefore(int index, int length, byte value) | int | 从当前ByteBuf中查找首次出现value的位置,index<=查找范围<index+length;查找成功返回位置索引,否则返回-1 | IndexOutOfBoundsException:this.readableBytes<index+length |
forEachByte(ByteBufProcessor processor); | int | 遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从readerIndex开始遍历直到writerIndex。如果满足条件,返回位置索引,否则返回-1 | |
forEachByte(int index, int length, ByteBufProcessor processor) | void | 遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从index开始遍历直到index+length。如果满足条件,返回位置索引,否则返回-1 | |
forEachByteDesc(ByteBufProcessor processor) | 逆序遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从writerIndex-1开始遍历直到readerIndex。如果满足条件,返回位置索引,否则返回-1 | ||
forEachByteDesc(int index, int length, ByteBufProcessor processor) | 逆序遍历当前ByteBuf的可读字节数组,与ByteBufProcessor中设置的查找条件进行比对,从index+length-1开始遍历直到index。如果满足条件,返回位置索引,否则返回-1 |
对应查找字节,存在一些常用值,如回车换行等,Netty在ByteBufProcessor接口中对这些常用查找字符进行了抽象:
ByteBufProcessor | 功能说明 |
---|---|
FIND_NUL | NUL(0x00),查找是否为空字节 |
FIND_NON_NUL | 查找是否为非空字节 |
FIND_CR | CR('\r'),查找是否为回车符 |
FIND_NON_CR | 查找是否为非回车符 |
FIND_LF | LF('\n'),查找是否为换行符 |
FIND_NON_LF | 查找是否为非换行符 |
FIND_CRLF | CR('\r')或LF('\n'),回车或换行 |
FIND_NON_CRLF | |
FIND_LINEAR_WHITESPACE | ' '或'\t',查找是否为空字符或换行符 |
FIND_NON_LINEAR_WHITESPACE |
1.4.8、Buffer视图操作
Derived Buffers类似于数据库视图,ByteBuf提供了多个接口用于创建某个ByteBuf的视图或者复制ByteBuf。
主要操作如下:
方法名称 | 返回值 | 功能说明 |
---|---|---|
duplicate() | ByteBuf | 返回当前ByteBuf的复制对象,复制后的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。 |
copy() | ByteBuf | 从当前ByteBuf复制一个新的ByteBuf对象,复制的新对象缓冲区的内容和索引均是独立的。该操作不修改原ByteBuf的readerIndex和writerIndex。(复制readerIndex到writerIndex之间的内容,其他属性与原ByteBuf相同,如maxCapacity,ByteBufAllocator) |
copy(int index, int length) | ByteBuf | 从当前ByteBuf 指定索引index开始,字节长度为length,复制一个新的ByteBuf对象,复制的新对象缓冲区的内容和索引均是独立的。该操作不修改原ByteBuf的readerIndex和writerIndex。(其他属性与原ByteBuf相同,,如maxCapacity,ByteBufAllocator) |
slice() | ByteBuf | 返回当前ByteBuf的可读子区域,起始位置从readerIndex到writerIndex,返回的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。返回ByteBuf对象的长度为readableBytes() |
slice(int index, int length) | ByteBuf | 返回当前ByteBuf的可读子区域,起始位置从index到index+length,返回的ByteBuf对象与当前ByteBuf对象共享缓冲区的内容,但是维护自己独立的readerIndex和writerIndex。该操作不修改原ByteBuf的readerIndex和writerIndex。返回ByteBuf对象的长度为length |
1.4.9、转换为JDK ByteBuffer
当通过NIO的SocketChannel进行网络读写时,操作的对象为JDK的ByteBuffer,因此须在接口层支持netty ByteBuf到JDK的ByteBuffer的相互转换。
将ByteBuf转换为java.nio.ByteBuffer的方法主要有如下两个:
方法名称 | 返回值 | 功能说明 | 抛出异常 |
---|---|---|---|
nioBuffer() | ByteBuffer | 将当前ByteBuf的可读缓冲区(readerIndex到writerIndex之间的内容)转换为ByteBuffer,两者共享共享缓冲区的内容。对ByteBuffer的读写操作不会影响ByteBuf的读写索引。注意:ByteBuffer无法感知ByteBuf的动态扩展操作。ByteBuffer的长度为readableBytes() | UnsupportedOperationException |
nioBuffer(int index, int length) | ByteBuffer | 将当前ByteBuf的可读缓冲区(index到index+length之间的内容)转换为ByteBuffer,两者共享共享缓冲区的内容。对ByteBuffer的读写操作不会影响ByteBuf的读写索引。注意:ByteBuffer无法感知ByteBuf的动态扩展操作。ByteBuffer的长度为length | UnsupportedOperationException |
1.4.10、随机读写(set和get)
除顺序读写之外,ByteBuf还支持随机读写,其最大的区别在于可随机指定读写的索引位置。关于随机读写的API这里不再详述。无论set或get,执行前都会进行索引和长度的合法性验证,此外,set操作不同于write操作,set不支持动态扩展,所以使用者必须保证当前缓冲区可写的字节数大于需要写入的字节长度,否则会抛出缓冲区越界异常。
随机读操作:
随机写操作:
2、ByteBuf实现类分析
2.1、ByteBuf类继承图
- ReferenceCounted:对象引用计数器,初始化ReferenceCounted对象时,引用数量refCnt为1,调用retain()可增加refCnt,release()用于减少refCnt。refCnt为1时,说明对象实际不可达,release()方法将立即调用deallocate()释放对象。如果refCnt为0,说明对象被错误的引用。在AbstractReferenceCountedByteBuf源码分析小节将详细介绍ReferenceCounted的原理。
- ByteBuf:实现接口ReferenceCounted和Comparable,实现ReferenceCounted使得ByteBuf具备引用计数的能力,方便跟踪ByteBuf对象分配和释放。
ByteBuf直接子类:
- EmptyByteBuf:用于构建空ByteBuf对象,capacity和maxCapacity均为0。
- WrappedByteBuf:用于装饰ByteBuf对象,主要有AdvancedLeakAwareByteBuf、SimpleLeakAwareByteBuf、UnreleasableByteBuf和Component四个子类。这里WrappedByteBuf使用装饰者模式装饰ByteBuf对象,AdvancedLeakAwareByteBuf用于对所有操作记录堆栈信息,方便监控内存泄漏;SimpleLeakAwareByteBuf只记录order(ByteOrder endianness)的堆栈信息;UnreleasableByteBuf用于阻止修改对象引用计数器refCnt的值。
- AbstractByteBuf:提供ByteBuf的默认实现,同时组合ResourceLeakDetector和SwappedByteBuf的能力,ResourceLeakDetector是内存泄漏检测工具,SwappedByteBuf用于字节序不同时转换字节序。
AbstractReferenceCountedByteBuf直接子类:
- CompositeByteBuf:用于将多个ByteBuf组合在一起,形成一个虚拟的ByteBuf对象,支持读写和动态扩展。内部使用List<Component>组合多个ByteBuf。推荐使用ByteBufAllocator的compositeBuffer()方法,Unpooled的工厂方法compositeBuffer()或wrappedBuffer(ByteBuf... buffers)创建CompositeByteBuf对象。
- FixedCompositeByteBuf:用于将多个ByteBuf组合在一起,形成一个虚拟的只读ByteBuf对象,不允许写入和动态扩展。内部使用Object[]将多个ByteBuf组合在一起,一旦FixedCompositeByteBuf对象构建完成,则不会被更改。
- PooledByteBuf<T>:基于内存池的ByteBuf,主要为了重用ByteBuf对象,提升内存的使用效率;适用于高负载,高并发的应用中。主要有PooledDirectByteBuf,PooledHeapByteBuf,PooledUnsafeDirectByteBuf三个子类,PooledDirectByteBuf是在堆外进行内存分配的内存池ByteBuf,PooledHeapByteBuf是基于堆内存分配内存池ByteBuf,PooledUnsafeDirectByteBuf也是在堆外进行内存分配的内存池ByteBuf,区别在于PooledUnsafeDirectByteBuf内部使用基于PlatformDependent相关操作实现ByteBuf,具有平台相关性。
- ReadOnlyByteBufferBuf:只读ByteBuf,内部持有ByteBuffer对象,相关操作委托给ByteBuffer实现,该ByteBuf限内部使用,ReadOnlyByteBufferBuf还有一个子类ReadOnlyUnsafeDirectByteBuf。
- UnpooledDirectByteBuf:在堆外进行内存分配的非内存池ByteBuf,内部持有ByteBuffer对象,相关操作委托给ByteBuffer实现。
- UnpooledHeapByteBuf:基于堆内存分配非内存池ByteBuf,即内部持有byte数组。
- UnpooledUnsafeDirectByteBuf:与UnpooledDirectByteBuf相同,区别在于UnpooledUnsafeDirectByteBuf内部使用基于PlatformDependent相关操作实现ByteBuf,具有平台相关性。
从内存分配角度看,ByteBuf主要分为两类:
- 堆内存(HeapByteBuf)字节缓冲区:特点是内存的分配和回收速度快,可以被JVM自动回收;缺点是进行Socket的I/O读写需要额外进行一次内存复制,即将内存对应的缓冲区复制到内核Channel中,性能会有一定程度下降。
- 直接内存(DirectByteBuf)字节缓冲区:在堆外进行内存分配,相比堆内存,分配和回收速度稍慢。但用于Socket的I/O读写时,少一次内存复制,速度比堆内存字节缓冲区快。
经验表明,在I/O通信线程的读写缓冲区使用DirectByteBuf,后端业务消息的编解码模块使用HeapByteBuf,这样组合可以达到性能最优。
从内存回收角度看,ByteBuf也分为两类:
- 基于内存池的ByteBuf:优点是可以重用ByteBuf对象,通过自己维护一个内存池,可以循环利用创建的ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁GC。适用于高负载,高并发的应用中。推荐使用基于内存池的ByteBuf。
- 非内存池的ByteBuf:优点是管理和维护相对简单。
尽管推荐使用基于内存池的ByteBuf,但内存池的管理和维护更加复杂,使用起来也需要更加谨慎。
2.2、AbstractByteBuf源码分析
AbstractByteBuf是ByteBuf的抽象实现,其提供了一些基础属性和公用方法实现。
2.2.1、主要成员变量
readerIndex:读索引
writerIndex:写索引
markedReaderIndex:读索引标记
markedWriterIndex:写索引标记
maxCapacity:最大容量
SwappedByteBuf swappedBuf:缓冲区包装类
在AbstractByteBuf中并未定义ByteBuf缓冲区的实现,其实现留给具体实现类。
2.2.2、读操作簇
在读取数据数据前,会对缓冲区可读空间进行检验,如果读取长度小于0,则抛出IllegalArgumentException异常提示参数非法;如果需要读取的字节数大于可读长度,则抛出IndexOutOfBoundsException异常。异常中封装了详细信息,使用者可以方便地定位问题。
校验通过后在读取设定长度的数据,其具体实现依赖于子类实现。
2.2.3、写操作簇
首先校验写入长度的合法性;
如果写入的数据长度小于0,则抛出IllegalArgumentException异常;如果写入长度大于可动态扩展的最大长度,则抛出IndexOutOfBoundsException异常;
当写入长度大于缓冲区长度而小于可动态扩展的最大长度,则进行动态扩展;动态扩展根据倍增加阀值的方式;
2.2.4、操作索引
设置和读取相关索引;
2.2.4、重用缓冲区
重用缓冲区是通过discardReadBytes()和discardSomeReadBytes()方法实现。
处理流程如下:
- 如果读索引为0,表示无可重用缓冲区,则直接返回;
- 如果读索引大于0且不等于写索引,说明缓冲区有数据且有可重用空间,调用setBytes(0, this, readerIndex, writerIndex-readerIndex)方法进行缓冲区复制,然后重新设置读索引为0,写索引为(writerIndex - readerIndex);
2.2.5、skipBytes
在解码时,有时需要丢弃非法数据,使用skipBytes()可方便跳过指定长度的数据。
2.3、AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf主要是对引用进行计数,类似JVM内存回收的对象引用计数,用于跟踪对象的分配和销毁。
2.3.1、成员变量
refCntUpdater:类型为AtomicIntegerFieldUpdater,通过原子方式对成员变量进行更新等操作,以实现线程安全,消除锁;
2.3.1、对象引用计数
retain()方法用于引用计数加一,由于可能存在并发场景,所以其通过自旋加CAS操作保证线程安全。
实现源码如下:
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
}
return this;
}
2.4、UnpooledHeapByteBuf源码解析
UnpooledHeapByteBuf是基于堆内存进行内存分配的字节缓冲区,没有基于对象池技术实现,每次调用都会创建一个新的对象;
2.4.1、成员变量
成员变量中定义了一个ByteBufAllocator类型的alloc用来分配堆内存,定义一个byte字节数组用作缓冲区,定义一个ByteBuffer类型的tmpNioBuf用作Netty的ByteBuf到JDK ByteBuffer转换;
private final ByteBufAllocator alloc;
byte[] array;
private ByteBuffer tmpNioBuf;
实际使用ByteBuffer作为缓冲区也是可行,使用Byte数组根本原因是提升性能和更加敏捷地进行位操作。
2.4.2、动态扩展缓冲区
扩容源码如下:
@Override
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) {
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
先判断新的容量是否大于当前缓冲区的容量,如果大于则进行动态扩容;通过byte[] newArray = new byte[newCapacity]创建新容量的字节数组,然后通过System.arrayCopy()方法进行数据复制,最后通过setArray()方法替换旧的缓冲区数组;
2.5、PooledByteBuf源码解析
PooledByteBuf是基于内存池的缓冲区,netty自己实现了一套内存池管理,具体实现类有:PooledHeapByteBuf、PooledDirectByteBuf、PooledUnsafeDirectByteBuf、PooledUnsafeHeapByteBuf。
由于内存池管理等方面涉及内容较多,本处不多赘述,详情请看Netty内存池相关文章:
推荐博客:https://www.jianshu.com/p/4856bd30dd56
3、ByteBuf相关辅助类
netty提供一些ByteBuf相关的辅助类,以简化缓冲区相关的使用;
3.1、ByteBufHolder
ByteBufHolder是ByteBuf的容器,在Netty中,它非常有用,例如HTTP协议的请求消息和应答消息都可以携带消息体,这个消息体在NIO ByteBuffer中就是个ByteBuffer对象,在Netty中就是ByteBuf对象。由于不同的协议消息体可以包含不同的协议字段和功能,因此,需要对ByteBuf进行包装和抽象,不同的子类可以有不同的实现。为了满足这些定制化的需求,Netty抽象出了ByteBufHolder对象,它包含了一个ByteBuf,另外还提供了一些其他实用的方法,使用者继承ByteBufHolder接口后可以按需封装自己的实现。
3.1、ByteBufAllocator
ByteBufAllocator是字节缓冲区分配器,按照Netty的缓冲区实现不同,共有两种不同的分配器:基于内存池的字节缓冲区分配器和普通的字节缓冲区分配器。
类继承图如下:
主要功能列表:
方法名称 | 返回值 | 功能说明 |
---|---|---|
buffer() | ByteBuf | 分配一个字节缓冲区,缓冲区的类型由ByteBufAllocator的实现决定 |
buffer(int initialCapacity) | ByteBuf | 分配一个初始容量initialCapacity的字节缓冲区,缓冲区的类型由ByteBufAllocator的实现类决定 |
buffer(int initialCapacity,int maxCapacity) | ByteBuf | 分配一个初始容量initialCapacity,最大容量为maxCapacity的字节缓冲区,缓冲区的类型由ByteBufAllocator的实现类决定 |
ioBuffer(int initialCapacity,int maxCapacity) | ByteBuf | 分配一个初始容量initialCapacity,最大容量为maxCapacity的direct buffer |
heapBuffer(int initialCapacity,int maxCapacity) | ByteBuf | 分配一个初始容量initialCapacity,最大容量为maxCapacity的heap buffer |
directBuffer(int initialCapacity,int maxCapacity) | ByteBuf | 分配一个初始容量initialCapacity,最大容量为maxCapacity的directbuffer |
compositeBuffer(int maxNumComponent) | CompositeByteBuf | 分配一个最大容量为maxCapacity的CompositeByteBuf,内存类型由ByteBufAllocator的实现类决定 |
isDirectBufferPooled() | boolean | 是否使用了直接内存内存池 |
3.2、CompositeByteBuf
CompositeBytebuf允许将多个Bytebuf的实例组合起来,在内部使用一个Bytebuf进行统一管理。
CompositeBytebuf在某些场景非常有用,例如在协议中一个协议通常会包含多个部分,消息体,消息头等,他们都是Bytebuf对象,如果我们需要对他进行管理,可以把这些Bytebuf对象都放到CompositeBytebuf中进行统一管理
成员变量:
private final ByteBufAllocator alloc;
private final boolean direct;
private final ComponentList components;
private final int maxNumComponents;
private boolean freed;
CompositeBytebuf定义了一个Component类型的集合,Component为ByteBuf的包装器实现类,其聚合了ByteBuf对象,维护了在集合中的位置偏移量信息等;
Component源码实现:
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
buf.release(); // We should not get a NPE here. If so, it must be a bug.
}
}
3.2、ByteBufUtil
ByteBufUtil是ByteBuf的一个静态工具类,其提供了一系列静态方法用于操作ByteBuf对象。
操作方法如下:
操作方法很丰富,主要包括:
- 返回ByteBuf中可读字节的十六进制字符串,如hexDump()等;
- 字符串编解码,如encodeString(),decodeString()等;
- ByteBuf字符串比较;
- ByteBuf缓冲区拷贝等;
相关阅读:
Netty源码愫读(二)Channel相关源码学习【https://www.jianshu.com/p/02eac974258e】
Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【https://www.jianshu.com/p/be82d0fcdbcc】
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.jianshu.com/p/6ee0a3b9d73a】
Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.jianshu.com/p/05096995d296】
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.jianshu.com/p/a71a9a0291f3】
参考书籍:
李林锋《Netty权威指南》第二版
《Netty 实战(精髓)》
参考博客:
https://www.cnblogs.com/wade-luffy/p/6196481.html
https://my.oschina.net/7001/blog/743240
https://my.oschina.net/7001/blog/742236
https://blog.csdn.net/jeffleo/article/details/69230112
https://www.jianshu.com/p/c4bd37a3555b