类图
缓冲区介绍
当我们进行数据传输的时候,往往需要缓冲区。java NIO 中自带的提供的就是java.nio.Buffer
但是由于java自带的过于复杂,而且自身也有一定的缺陷(定长,一个标识位position等)。Netty便提供的自己的缓冲ByteBuf
Nio ByteBuffer 和 Netty ByteBuf 对比
1.指针问题
public class Test2 {
public static void main(String[] args) {
String content = "hello,world";
ByteBuffer byteBuffer = ByteBuffer.allocate(256);
byteBuffer.put(content.getBytes());
byteBuffer.flip();
byte[] bufferValue = new byte[byteBuffer.remaining()];
byteBuffer.get(bufferValue);
System.out.println(new String(bufferValue));
}
}
示例中就是一种比较常见的NIO操作,比较关键的代码 byteBuffer.flip();它会把limit设置为position的位置。否则读取到的将会是错误的内容。
ByteBuf通过2个索引来维护缓冲区的读写操作。读操作通过readerIndex,写操作通过writeIndex。
他们的初始值都为0,数据的写入将导致writeIndex增加,数据的读取将会导致readerIndex增加。但是它不会操作writeIndex。读取之后在0和readIndex范围称之为discard。调用discardReadBytes方法。可以释放这部分空间。readIndex和writeIndex之间的数据为可读数据。writeIndex和limit之间的数据为可写的空间。由于读写由不同的指针来维护,这样就可以避免NIO中显示的调用flip()来切换不同的操作了。
2.定长问题
操作NIO的时候,当我们对缓冲区put的时候,如果缓冲区空间不够,将会抛出异常。为了避免这个问题。Netty在write数据的时候,首先会对数据的长度和可写空间做个校验。如果不足,就会创建一个新的ByteBuf,并把之前的复制到新建的这个ByteBuf。最后释放老的ByteBuf。
下来,我们一起来追踪下源码
buffer.writeInt(1);
首先进入writeInt方法
@Override
public ByteBuf writeInt(int value) {
ensureWritable(4);
_setInt(writerIndex, value);
writerIndex += 4;
return this;
}
其中非常关键的一行 ensureWritable(4);netty就是通过这个方法达到扩容。我们继续往下追踪
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
代码中首先判断写的长度是否小于0,紧接着判断。当前缓存对象是否有足够的空间存放当前需要写入的最大长度。否则就计算下次需要生产的空间的大小。也就是
代码中的 caculateNewCapacity()方法。
接着 我们可继续看看它的计算空间算法
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
首先判断当前传入的大小是否小于64,否则就返回64,如果大于64且小于threadshould 就每次增大2倍。否则就每次添加4m或者当新需要的空间大于最大空间减去4m时,就直接赋值最大的空间
有了新需要的容器大小,就可以准备扩容了。
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
首先对引用和参数的校验。然后创建新的
byte[] newArray = new byte[newCapacity]
容器。接着赋值,更新索引。最后返回新的容器
到这里,缓存就成功的扩容了。