PoolChunk负责8KB-16MB的内存分配,那如果小于8KB呢,比如申请64B,此时用PoolChunk分配就显得浪费了,netty用PoolSubpage来分配小于8KB的内存,如下:
补充PoolChunk和PoolSubpage关联图
补充Area和PoolSubpage关联图
netty使用bitmap来表示PoolSubpage分配结果,bitmap是long数组,
类属性
final class PoolSubpage<T> implements PoolSubpageMetric {
// 对应的chunk应用
final PoolChunk<T> chunk;
//对应的chunk叶子节点下标
private final int memoryMapIdx;
//当前page在chunk中memory的偏移量
private final int runOffset;
private final int pageSize;
//long数组表示分配情况
private final long[] bitmap;
// arena双向链表的前驱节点
PoolSubpage<T> prev;
// arena双向链表的后置节点
PoolSubpage<T> next;
boolean doNotDestroy;
//page切分后的每一段大小
int elemSize;
//切分后段的数量
private int maxNumElems;
private int bitmapLength;
//下一个可用的位置
private int nextAvail;
// 可用的段数量
private int numAvail;
内存申请方法
private long allocateSubpage(int normCapacity) {
//从arena中获取PoolSubpage链表头部(参见后文arena分析)
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
// d=11,即poolchunk叶节点层次
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
//从poolchunk中获取一个叶子节点,返回对应的下标
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
//PoolChunk维护了一个2048大小的poolSubpage数组,对应2048个叶子节点,这里是获取叶子节点对应的subpage下标。
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
//如果叶子节点对应的subpage为空,则初始化
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
PoolSubpage初始化方法如下:
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk; //对应的chunk引用
this.memoryMapIdx = memoryMapIdx; //对应的chunk叶子节点下标
this.runOffset = runOffset;
this.pageSize = pageSize; //叶子节点大小,默认8K
//用long数组表示subpage分配情况,最小单位为16B,long可表示64位,所以只需要8KB/16B/64=8个long字段即可
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
//elemSize为初次申请的大小,如1024B
this.elemSize = elemSize;
if (elemSize != 0) {
//按申请的大小进行分段,如8KB/1024B=8
maxNumElems = numAvail = pageSize / elemSize;
//此时第一次分配,下一个可用的位置即0
nextAvail = 0;
// 计算需要几个long来表示,如maxNumElems=8,右移6位(除64)=0,表示最多需要1个long
bitmapLength = maxNumElems >>> 6;
if ((maxNumElems & 63) != 0) {
//表示不是64的倍数
bitmapLength ++;
}
//初始化bitmap
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
//将poolsubpage加入Arena双向链表
addToPool(head);
}
netty为了提供效率,大量使用了位运算,比如上述需要根据段的数量计算bitmap数组的long数量时,就先使用了无符号右移6(即除64),然后使用与63求&(即判断是不是64的整数),等价于求maxNumElems/64向上取整。
继续看分配方法allocate
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
//没有可分配的段,或需要销毁
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
//获取下一个可用段的下标
final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
//将对应的bitmap中的long对应的位数设为0,表示已使用
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
removeFromPool();
}
return toHandle(bitmapIdx);
}
//转为64位分配信息
private long toHandle(int bitmapIdx) {
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
位计算有点多,不要慌。难点在于设置要使用的位数为1,即位图中的set操作。而这里的位图是用long来表示,首先要找到是哪个long(可以理解为long数组索引),即bitmapIdx >>> 6,接着要找位于long的哪里,bitmapIdx & 63表示对64求余,即找出了对应位r(可以理解为long的位偏移),然后1左移r位后再对long值求或即是目标结果。
接着看getNextAvail方法:
private int getNextAvail() {
int nextAvail = this.nextAvail;
//如果nextAvail大于0表示有可用的,直接返回
if (nextAvail >= 0) {
this.nextAvail = -1;
return nextAvail;
}
return findNextAvail();
}
private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
//取非不为0,表示还有可用的
if (~bits != 0) {
return findNextAvail0(i, bits);
}
}
return -1;
}
private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6;
//从低位开始遍历,对应的值表示这位已分配
for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) {
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1;
}
return -1;
}
内存释放方法
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) != 0;
//逆向操作,清除对应位
bitmap[q] ^= 1L << r;
// 该位置可用于下次分配
setNextAvail(bitmapIdx);
if (numAvail ++ == 0) {
//已分配过,可加入arena双向链表
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// Subpage not in use (numAvail == maxNumElems)
if (prev == next) {
// Do not remove if this subpage is the only one left in the pool.
return true;
}
// Remove this subpage from the pool if there are other subpages left in the pool.
doNotDestroy = false;
removeFromPool();
return false;
}
}
PoolSubpage分析就到这里了。