概述
概述
从上面的图示, 我们可以得到几个结论
- Page能继续再切小, 而PoolSubpage代表Page的subpage版本.
- PoolSubpage在初始化的时候就决定了elemsize,一个PoolSubpage里面只能包含相同elemsize的内存空间。
- 那么不同elemsize的PoolSubpage又由arena的tiny和smallsubpagepool来分组。那么elemsize相同的subpagepool当然由多个PoolSubpage,也就是多个page组成。
tinySubpagePools
用来分配小于 512 字节的页内空间,且大小必须是16的整数倍。tinySubpagePools
中的元素只存放对应空间大小的链表首指针。比如 tinySubpagePools[0] 为只分配大小为 16(16 * 1) 字节空间的链表首指针,tinySubpagePools[1] 为只分配大小为 32(16 * 2)字节的链表首指针,以此类推,tinySubpagePools[31] 为只分配大小为 496(16 * 31)字节的链表首指针,共 32 个元素。smallSubpagePools
分配 512 到 4096字节的页内空间,且大小依次翻倍。和tinySubpagePools
中的链表内容类似,但只有 4 个元素,各链表能分配空间大小分别为 512、1024、2048、4096。- 想象下两个pool都初始化有head, 而且每个head都代表一个elemsize, 那么我可以将所有相同size的subpage归拢到某个head下, 当用完后我可以释放掉. 当下次我要请求同样大小的内存区域时, 直接定位符合条件的head, 看里面是否有可用的就好.
初始化
final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap;
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
// 当申请的空间小于pagesize的时候,调用到这里
private long allocateSubpage(int normCapacity) {
// 这里找到对应空间的head后
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
// 拿到最底层的层数,也就是page层
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
// 拿到一个可用的节点
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
// 先减掉一个page,代表这个page被占用
freeBytes -= pageSize;
// 根据page的index去换算PoolSubpage的index, 其实就是减去2048的偏移量
int subpageIdx = subpageIdx(id);
// 拿到该page对应的PoolSubpage
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 如果为空,那么初始化一个PoolSubpage
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
// 否则init
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
// bitmap记录使用情况, 如subpage大小为16B, 一个page总共可以分配8k/16B=512个, 而一个Long型
// 可以记录64个状态, 所以512/64=8, 那么就是pageSize / 16 / 64的效果
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
this.elemSize = elemSize;
if (elemSize != 0) {
// 该page最大能切分多少个subpage
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
// 如上分析,这里是指bitmap的长度
bitmapLength = maxNumElems >>> 6;
// maxNumElems的结果必然会出现非64倍数的情况,那么不能整除64的情况下,在bitmap中额外补一个就好
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
// 初始化bitmap
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
addToPool(head);
}
private void addToPool(PoolSubpage<T> head) {
assert prev == null && next == null;
prev = head;
next = head.next;
next.prev = this;
head.next = this;
}
分配
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
// 拿到下一个PoolSubPage数组可用的的位置
final int bitmapIdx = getNextAvail();
// q相当于该位置对应的bitmap的index
int q = bitmapIdx >>> 6;
// 而r是q位置上实际可用的那个位置
int r = bitmapIdx & 63;
// 确认该位置没被占用
assert (bitmap[q] >>> r & 1) == 0;
// 设置该位置为1, 代表被占用
bitmap[q] |= 1L << r;
// 如果可用的数为0, 那么说明整个page或PoolSubPage已经满了,将其从Arena的pool中删除掉
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
toHandle
private long toHandle(int bitmapIdx) {
// 相当于将page的index和PoolSubpage的index组合在一起返回
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
getNextAvail
private int getNextAvail() {
int nextAvail = this.nextAvail;
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
// 遍历bitmap,每一个元素代表了64个位置
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
// 如果取反不等于0,说明还有空余的位置
// 如果取反等于0, 说明该元素64位全部为1, 没有空余位置
if (~bits != 0) {
return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
// 先根据bitmap中的位置来计算对应PoolSubPage数组的偏移量,也就是bitindex*64
final int baseVal = i << 6;
// 遍历bitmap中每一个元素的64位
for (int j = 0; j < 64; j ++) {
// 如果成立,表面该位置没有人占用
if ((bits & 1) == 0) {
// 再根据该位置的bit位来计算具体的空位在PoolSubPage的位置
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1;
}
return -1;
}
关联
当前面的内存空间分配好后,我们需要跟具体的ByteBuf关联起来,供应用层使用. PoolChunk会来托管这些.
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
// 根据handle解析出来,如果不包含bitmapindex,说明不是poolsubpage类型的
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
// 我们最终关注到这里
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
// 拿到Page的index
int memoryMapIdx = memoryMapIdx(handle);
//拿到PoolSubpage的index
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, handle,
// runOffset(memoryMapIdx)表示在chunk的相对偏移量
// (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize表示在page中的相对偏移量
// 合计便是定位到该段分配的内存在chunk中的相对位置
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
}
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk;
this.handle = handle;
// 想必你能猜到,其实创建chunk是由自己的内存地址的
memory = chunk.memory;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
this.cache = cache;
}
private void initMemoryAddress() {
// 既然整个chunk的地址已知,且我们又计算得到分配内存的相对偏移量,计算实际内存地址不是问题
memoryAddress = PlatformDependent.directBufferAddress(memory) + offset;
}
释放
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
// 将bitmap里面的位置设为0
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
bitmap[q] ^= 1L << r;
// 将该位置设置下个可用的位置
setNextAvail(bitmapIdx);
// 如果之前还没有位置, 经过上面的处理, 想必已经腾出了位置
// 那么加入到head后面, 等待对外提供subpage
if (numAvail ++ == 0) {
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// 如果整个page都被释放
// Subpage not in use (numAvail == maxNumElems)
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
return true;
}
// Remove this subpage from the pool if there are other subpages left in the pool.
doNotDestroy = false;
removeFromPool();
return false;
}
}