上一节查看了ByteBufAllocator
,并了解了其抽象实现,和一些根据不同的内存类型进行内存分配的思路。
本节研究UnpooledByteBufAllocator
,包括heap
和direct
的内存分配,以及Unsafe
和非unsafe
的区别。
关于heap
内存的分配
- 入口
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//判断是有unsafe来分配
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
- 查看
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
发现分配Unpooled
、Unsafe
、Heap
内存,其实是分配了一个byte数组
,并保存在UnpooledHeapByteBuf#array
成员变量中。该内存的初始值容量和最大可扩展容量可以指定。
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//设置array
setArray(allocateArray(initialCapacity));
//设置readerIndex和writerIndex指针初始值为0
setIndex(0, 0);
}
protected byte[] allocateArray(int initialCapacity) {
//初始化了一个新的byte数组
return new byte[initialCapacity];
}
private void setArray(byte[] initialArray) {
//保存数组
array = initialArray;
tmpNioBuf = null;
}
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
//设置
setIndex0(readerIndex, writerIndex);
return this;
}
final void setIndex0(int readerIndex, int writerIndex) {
//设置读写指针
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
}
- 查看
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
内存分配,其实例化过成和InstrumentedUnpooledUnsafeHeapByteBuf
一样。在这里看不出safe
和unsafe
的区别,经过之前的代码,可以重获取的时候getByte
方法进入查看.
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
checkNotNull(alloc, "alloc");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//设置array
setArray(allocateArray(initialCapacity));
//设置readerIndex和writerIndex指针初始值为0
setIndex(0, 0);
}
- 查看
UnpooledHeapByteBuf#getByte()
方法,堆内存类型的ByteBuf
获取的时候。直接通过下标获取byte数组
中的byte
@Override
public byte getByte(int index) {
ensureAccessible();
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
//该array为初始化的时候,实例化的byte[]
return HeapByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] memory, int index) {
//直接拿到一个数组
return memory[index];
}
- 查看
UnpooledUnsafeHeapByteBuf#getByte()
方法,获取byte字节的时候,调用的是jdk的UNSAFE
对象。
@Override
public byte getByte(int index) {
checkIndex(index);
return _getByte(index);
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(array, index);
}
static byte getByte(byte[] array, int index) {
return PlatformDependent.getByte(array, index);
}
public static byte getByte(byte[] data, int index) {
return PlatformDependent0.getByte(data, index);
}
static byte getByte(byte[] data, int index) {
//通过UNSAFE去获取
return UNSAFE.getByte(data, BYTE_ARRAY_BASE_OFFSET + index);
}
关于direct
内存的分配
- 入口
UnpooledByteBufAllocator#newDirectBuffer()
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
final ByteBuf buf;
//判断是否有unsafe对象
if (PlatformDependent.hasUnsafe()) {
buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}
- 跟踪
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
可以发现,Unpooled
、Direct
类型得内存分配实际上是维护了一个底层jdk的一个DirectByteBuffer
。分配内存的时候就创建它,并将他保存到buffer
成员变量。
InstrumentedUnpooledDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
//检查合法性
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//获取jdkDirectBuffer并保存到成员变量
setByteBuffer(allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
ByteBuffer oldBuffer = this.buffer;
//释放旧的buffer
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
//保存新buffer
this.buffer = buffer;
tmpNioBuf = null;
capacity = buffer.remaining();
}
protected ByteBuffer allocateDirect(int initialCapacity) {
//分配
return ByteBuffer.allocateDirect(initialCapacity);
}
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}
跟踪iUnpooledHeapByteBuf#_getByte()
,就比较简单了,直接使用jdk的api获取。
@Override
protected byte _getByte(int index) {
//使用buffer
return buffer.get(index);
}
- 跟踪
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
可以发现Unpooled
、Unsafe
、Direct
的内存分配,和非Unsafe
的区别在于它计算了一个内存首地址并且保存起来,在计算内存首地址的时候是通过UNSAFE
对象去获取的。保存内存首地址的好处是可以在获取的时候直接通过计算下标直接获取。
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
checkPositiveOrZero(initialCapacity, "initialCapacity");
checkPositiveOrZero(maxCapacity, "maxCapacity");
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
//分配jdk底层DirectByteBuffer,设置buffer
setByteBuffer(allocateDirect(initialCapacity), false);
}
final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
if (tryFree) {
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
doNotFree = false;
} else {
freeDirect(oldBuffer);
}
}
}
this.buffer = buffer;
//计算内存地址
memoryAddress = PlatformDependent.directBufferAddress(buffer);
tmpNioBuf = null;
capacity = buffer.remaining();
}
public static long directBufferAddress(ByteBuffer buffer) {
return PlatformDependent0.directBufferAddress(buffer);
}
static long directBufferAddress(ByteBuffer buffer) {
return getLong(buffer, ADDRESS_FIELD_OFFSET);
}
private static long getLong(Object object, long fieldOffset) {
//调用UNSAFE获取内存地址
return UNSAFE.getLong(object, fieldOffset);
}
跟踪UnpooledUnsafeDirectByteBuf#_getByte()
,可以知道UNSAFE
的直接内存内容获取方式是通过内存首地址 + 偏移量获取的。
@Override
protected byte _getByte(int index) {
//通过计算地址获取
return UnsafeByteBufUtil.getByte(addr(index));
}
long addr(int index) {
//直接从 内存首地址 + 偏移量 获取
return memoryAddress + index;
}
static byte getByte(long address) {
//address正是memoryAddress + index
return UNSAFE.getByte(address);
}