Netty
是一个高性能的网络应用程序框架,主要就是进行数据的交互,所以必须有一个高效的内存分配器。
内存分配器的功能就两个:
- 用户申请内存时,分配给它内存块。
- 用户主动释放内存时,回收这个内存块。
一般我们的做法是:
- 先申请一个较大的内存块。
- 当用户申请内存时,从这个内存块中,分割符合申请内存大小的内存块给用户。
- 用户主动释放内存时,再将这个内存块回收。
但是这么做有个问题,因为用户申请内存的大小各不相同,分配的内存块大小就不一样,回收以后就是各种尺寸的内存碎片。
- 例如,我们有一个
20
大小的总内存块,分配给用户两个大小为5
内存块,和一个内存为4
内存块,两个内存为2
内存块;- 之后都回收了,就有两个为
5
,一个为4
,三个为2
的内存碎片。- 这个时候在申请内存为
6
的内存块时,发现没有办法分配了。
为了解决这个问题,能够高效地进行内存分配,就要使用内存分配算法了。
- 在
Netty
4.1.45
版本之前使用的是jemalloc3
算法来进行内存分配的;- 而在
4.1.45
版本之后使用的是jemalloc4
算法来进行内存分配的。- 本篇文章我们先介绍
jemalloc3
算法实现。
一. 划分内存规格
产生内存碎片最主要的原因就是因为用户申请的内存大小不一样。
那么如果用户申请的内存大小都一样,那么不就没有内存碎片了么。
想法虽然是好的,但是明显是不可能的,因为程序运行过程中,需要的内存本来就是不同的。
那么我们就换一个思路,虽然不能要求申请的内存大小都一样,但是可以提前划分好不同规格的内存,然后根据请的内存大小不同,分配不同规格的内存快。
如上图所示,jemalloc3
一共将内存分为四种类型:
内存规格 | 描述 |
---|---|
Tiny |
微小规格内存块,容量从16B 到496B 一共31 个内存规格,每个规格容量相差16B
|
Small |
小规格内存块,容量从512B 到4KB 一共4 个内存规格,每个规格容量相差一倍 |
Normal |
正常规格内存块,容量从8KB 到16MB 一共11 个内存规格,每个规格容量相差一倍 |
Huge |
巨大内存块,不会放在内存管理中,直接内存中申请 |
因此就可以根据用户申请的内存大小,直接对应规格的内存块。
- 例如申请
40B
, 那么就分配48B
规格的内存块,虽然有8B
的字节被浪费了,但是避免了内存碎片的产生。- 你会发现从
Small
开始,每个规格内存块相差都是一倍,这就可以导致50%
的内存浪费;例如我们申请513B
大小,那么只能分配1KB
规格的内存块。这个是jemalloc3
算法的缺陷,只能使用jemalloc4
算法进行改进,以后我们会说到。
二. 内存规格算法实现
内存规格的划分作用和意义我们已经了解了,那么怎么实现它呢?
在Netty
中使用 PoolChunk
来进行内存分配:
-
PoolChunk
先申请一大块内存memory
(可以是字节数组,也可以是DirectByteBuffer
),大小就是chunkSize
(16MB
)。 - 我们知道
Normal
规格最小内存块是pageSize
(8KB
) 容量,那么就要能记录最小Normal
规格内存块使用情况。 -
Tiny
和Small
规格内存块小于pageSize
大小,可以使用一个最小Normal
规格内存块来分配多个Tiny
和Small
规格内存块。
如图所示:
-
PoolChunk
使用一个满二叉树(用数组实现)来记录内存块的分配使用情况。- 因为
chunkSize == 16MB
,且pageSize == 8KB
,那么树的深度depth
一共12
层(从0
到11
)。 - 根据不同深度,就可以获得不同大小的内存块,例如最底层即
11
层所有节点对应的内存块大小就是8KB
。
- 因为
-
使用数组来实现这个满二叉树。
- 这里有两个数组
memoryMap
和depthMap
,大小都是4096
。做了特殊处理,下标0
这个位置没有任何意义,从下标1
开始。 -
depthMap
的值表示当前下标对应在二叉树中的层数。例如下标为1
的值是0
,表示第0
层;下标为6
的值是2
,表示第2
层;下标为2048
的值是11
,表示第11
层。 -
memoryMap
的值表示当前这个节点能分配的内存块大小。刚开始时和depthMap
的值是一样的,但是当它的子节点被分配了,那么值就会变。例如刚开始时,下标为4
的值是2
,表示能分配4MB
内存块大小;如果它的一个子节点被分配了,那么它的值就会变成3
,表示只能分配2MB
内存块大小。
- 这里有两个数组
-
使用
bitmap
数据记录Tiny
和Small
规格内存使用情况- 最底层的内存块可以在分成
Tiny
和Small
规格小内存块。 - 一旦在最底层的内存块分配了一个
Tiny
和Small
规格小内存块,那么这个最底层的内存块就表示被使用了,而且这个内存块只能分配刚分配那个大小的规格的小内存块,直到它被回收(即由它分配的小内存快都被释放),进行重新分配,那么可以分配其他大小的规格的小内存块。即由第一次分配的规格大小来决定。 - 通过
bitmap
位图数组来记录,已经在最底层的内存块上分配了那些小内存块。因为最小内存块大小是16B
,而最底层的内存块大小是8KB
,因此最多可以分512
块;一个long
类型有64
位二进制数,所以最多需要8
个long
类型就可以记录。 - 通过
bitmapIdx
的值,可以得到在bitmap
位图数组中的那一个long
类型的那一位。通过bitmapIdx >>> 6
(即除以64
) 得到bitmap
位图数组的下标;通过bitmapIdx & 63
(即整除64
的余数)得到占据long
类型那一位。
- 最底层的内存块可以在分成
-
通过
handle
来记录偏移量和内存块大小- 高
32
位用来记录bitmapIdx
,从前面介绍bitmapIdx
的值很小的,最大值就是64 * 8
。最高位肯定是0
,次高位(0x4000000000000000L
)其实是用来记录是不是Tiny
和Small
类型规格。 - 低
32
位用来记录memoryMapIdx
。 - 如果是
Normal
规格,高32
位的值肯定是0
;通过memoryMapIdx
从depthMap
数组获取对应层数,这样就能得到内存块大小了;根据memoryMapIdx
可以计算在当前这一层的偏移值。例如memoryMapIdx = 2050
,那么是第11
层,大小就是8KB
;偏移值就是2050 - 2048 = 2
,那么偏移量就是16KB
;因此我们就在偏移量16KB
处分割一块8KB
大小的内存块给用户使用。 - 如果是
Tiny
和Small
规格,那么肯定是在最底层,先通过memoryMapIdx
计算偏移值,得到偏移量,然后得到这个最底层内存块分割成小内存的大小,再根据bitmapIdx
值得到在这个最底层内存块上的偏移量,最后就能得到最终偏移量和分割内存块大小了。
- 高
三. 源码实现
3.1 PoolSubpage
类
3.1.1 初始化
PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
this.chunk = chunk;
this.memoryMapIdx = memoryMapIdx;
this.runOffset = runOffset;
this.pageSize = pageSize;
// 因为 long 类型是8个字节,64位二进制数;
// 而 Tiny 类型最小容量都是 16 个字节。
// 所以 bitmap 位图数组最大长度就是 pageSize / 16/ 64
bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
init(head, elemSize);
}
void init(PoolSubpage<T> head, int elemSize) {
doNotDestroy = true;
// 当前这 PoolSubpage 只会分配 elemSize 大小容量的内存
this.elemSize = elemSize;
if (elemSize != 0) {
// PoolSubpage 一共可以分配多少块这个容量的内存
maxNumElems = numAvail = pageSize / elemSize;
nextAvail = 0;
// 无符号右移6位,也就是除以64,因为一个 long 有64个二进制位
bitmapLength = maxNumElems >>> 6;
// 如果 maxNumElems 不能整除 64,那么就要将 bitmapLength 加一
if ((maxNumElems & 63) != 0) {
bitmapLength ++;
}
for (int i = 0; i < bitmapLength; i ++) {
bitmap[i] = 0;
}
}
// 添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
addToPool(head);
}
- 刚开始创建的时候,主要是创建
bitmap
位图数组,数组长度就是pageSize >>> 10
,即除以64
位二进制数,和最小Tiny
类型规格都是16
个字节。init(...)
初始化方法,刚创建的时候或者PoolSubpage
被回收重新使用的时候调用。- 确定当前
PoolSubpage
分配内存块大小elemSize
;- 计算最多分配多少这个大小的内存块。
- 计算真实
bitmap
位图数组长度bitmapLength
。- 将这个
PoolSubpage
添加到PoolArena
中对应尺寸容量的PoolSubpage
链表中,这样就不需要需要查找,加快内存块分配速度。
3.1.2 分配内存块
/**
* 返回子页面内存分配的位图索引
* 使用 long 类型每个二进制位数`0`或 `1` 来记录这块内存有没有被分配过,
* 因为 long 是8个字节,64位二进制数,所以可以表示 64 个内存块分配情况。
*/
long allocate() {
if (elemSize == 0) {
return toHandle(0);
}
if (numAvail == 0 || !doNotDestroy) {
return -1;
}
// 得到下一个可用的位图 bitmap 索引
final int bitmapIdx = getNextAvail();
// 除以 64 得到的整数,即 bitmap[] 数组的下标
int q = bitmapIdx >>> 6;
// 与 64 的余数,即占据的 long 类型的位数
int r = bitmapIdx & 63;
// 必须是 0, 表示这一块内存没有被分配
assert (bitmap[q] >>> r & 1) == 0;
// 将r对应二进制位设置为1,表示这一位代表的内存块已经被分配了
bitmap[q] |= 1L << r;
if (-- numAvail == 0) {
// 如果可分配内存块的数量numAvail为0,
// 那么就要这个 PoolSubpage 从 PoolArena 中
// 对应尺寸容量的PoolSubpage链表中移除。
removeFromPool();
}
// 使用 long类型的高32位储存 bitmapIdx 的值,即使用 PoolSubpage 中那一块的内存;
// 低32位储存 memoryMapIdx 的值,即表示使用那一个 PoolSubpage
return toHandle(bitmapIdx);
}
private long toHandle(int bitmapIdx) {
// 虽然我们使用高 32 为表示 bitmapIdx,但是当bitmapIdx = 0 时,
// 就无法确定是否表示 bitmapIdx 的值。
// 所以这里就 0x4000000000000000L | (long) bitmapIdx << 32,那进行区分。
// 放心 bitmapIdx << 32 是不可能超过 0x4000000000000000L
return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
}
方法流程:
- 如果没有可用内存块了,就直接返回
-1
。- 通过
getNextAvail()
方法,获取下一个能分配的内存块的位图索引bitmapIdx
。- 根据位图索引
bitmapIdx
将bitmap
位图数组中对应二进制位设置为1
,表示已经被分配了。- 如果分配之后没有内存块了,就将这个
PoolSubpage
从PoolArena
中对应尺寸容量的PoolSubpage
链表中删除,因为已经不能分配了。- 通过
toHandle(bitmapIdx)
返回handle
值。
3.1.3 回收内存块
/**
* 返回 true,表示这个 PoolSubpage 还在使用,即上面还有其他小内存块被使用;
* 返回 false,表示这个 PoolSubpage 上面分配的小内存块都释放了,可以回收整个 PoolSubpage。
*/
boolean free(PoolSubpage<T> head, int bitmapIdx) {
if (elemSize == 0) {
return true;
}
// 得到位图 bitmap 中的下标
int q = bitmapIdx >>> 6;
// 得到使用 long 类型中那一位
int r = bitmapIdx & 63;
// 必须不能是 0, 表示这个 bitmapIdx 对应内存块肯定是在被使用
assert (bitmap[q] >>> r & 1) != 0;
// 将r对应二进制位设置为0,表示这一位代表的内存块已经被释放了
bitmap[q] ^= 1L << r;
// 将 bitmapIdx 设置为下一个可以使用的内存块索引,
// 因为刚被释放,这样就不用进行搜索来查找可用内存块索引。
setNextAvail(bitmapIdx);
if (numAvail ++ == 0) {
// 如果可分配内存块的数量numAvail从0开始增加,
// 那么就要重新添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
addToPool(head);
return true;
}
if (numAvail != maxNumElems) {
return true;
} else {
// 子页面未使用(numAvail == maxNumElems)
if (prev == next) {
// 如果 prev == next,即 subpage 组成的链表中没有其他 subpage,不能删除它
return true;
}
// 如果 prev != next,即 subpage 组成的链表中还有其他 subpage,那么就删除它
doNotDestroy = false;
removeFromPool();
return false;
}
}
- 根据位图索引
bitmapIdx
将bitmap
位图数组中对应二进制位设置为0
,表示已经被释放了。- 调用
setNextAvail(bitmapIdx)
,加快下一次分配内存块的速度,不需要重新查找了。- 最后再处理一下
PoolArena
中对应尺寸容量的PoolSubpage
链表。
3.2 PoolChunk
类
3.2.1 分配内存块
3.2.1.1 allocate
方法
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
// >= pageSize,即 Normal 规格类型内存块,通过 allocateRun 方法分配
handle = allocateRun(normCapacity);
} else {
// 分配 Tiny 和 Small 规格类型内存块
handle = allocateSubpage(normCapacity);
}
if (handle < 0) {
// 小于 0, 说明当前PoolChunk都被分配完了
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
// 使用 handle 来初始化 池化缓存区PooledByteBuf
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
- 通过
allocateRun(...)
方法,分配Normal
规格类型内存块;通过allocateSubpage(normCapacity)
方法,分配Tiny
和Small
规格类型内存块。- 通过
initBuf(...)
方法,使用申请的内存块handle
来初始化池化缓存区PooledByteBuf
。
3.2.1.2 allocateRun
方法
private long allocateRun(int normCapacity) {
/**
* 默认情况下,maxOrder=11,pageShifts=13
* normCapacity肯定是大于或者等于pageSize,即 log2(normCapacity) >= pageShifts
*
* d 表示在第几层可以分配这个尺寸容量normCapacity 的内存块。
* 最底层,即11层,最多只能分配 pageSize尺寸容量的内存块。
*/
int d = maxOrder - (log2(normCapacity) - pageShifts);
/**
* 得到尺寸容量normCapacity 内存块的索引id
*/
int id = allocateNode(d);
if (id < 0) {
// 小于 0, 说明当前PoolChunk都被分配完了
return id;
}
freeBytes -= runLength(id);
return id;
}
- 通过
allocateNode(d)
方法获取memoryMapIdx
的值。- 减少当前
PoolChunk
可用内存字节数freeBytes
。
3.2.1.3 allocateNode
方法
private int allocateNode(int d) {
// d 代表层数,其实也代表需要的内存容量,(1 << (maxOrder - d)) * pageSize
// 当 d 和 maxOrder 相等,即需要内存容量就是 pageSize
int id = 1;
/**
* 例如 d = 11
* 那么 1 << d 就是 100000000000
* - (1 << d) 就是 1111111111111111111111111111111111111111111111111111100000000000
* 所以 initial的作用就是用来快速判断某个值是不是小于 (1 << d)
*/
int initial = - (1 << d); // has last d bits = 0 and rest all = 1
byte val = value(id);
if (val > d) { // unusable
return -1;
}
/**
* val < d 表示当前节点id对应的内存容量还大于 d 对应的内存容量,继续寻找。
* (id & initial) == 0 只有当 id < (1 << d) 时成立,保证 id 是 d层的。
*/
while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
id <<= 1;
val = value(id);
if (val > d) {
/**
* val > d,表示从 id 节点对应的内存容量已经不足要求内存块的大小了,
* 但是它能走到这一个判断,说明 id 节点的父节点对应的内存容量是可以满足内存块的大小的;
* 那一定是因为 id 节点兄弟节点对应内存容量能满足内存块的大小。
*
* id ^= 1 就是得到 id 节点的兄弟节点
*/
id ^= 1;
val = value(id);
}
}
byte value = value(id);
assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
value, id & initial, d);
// 将当前 memoryMap 的下标id 的值设置为 unusable,
// 表示它已经被使用了, unusable = maxOrder + 1
setValue(id, unusable); // mark as unusable
// 更新这个 id 之上所有父节点的值,
// 因为id 节点被使用了,那么它之上所有父节点代表的内存容量都收到影响。
updateParentsAlloc(id);
return id;
}
private void updateParentsAlloc(int id) {
// 通过循环更新所有父节点的值。
while (id > 1) {
int parentId = id >>> 1;
byte val1 = value(id);
byte val2 = value(id ^ 1);
// 寻找父节点对应子节点中较小的值
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
id = parentId;
}
}
- 现在满二叉树中,在
d
对应的那层中寻找还没有被分配的节点(从左到右寻找),返回这个节点的下标值(即memoryMapIdx
)。- 通过
setValue(id, unusable)
方法,将这个节点值设置成unusable
,表示这个节点已经被分配了。- 通过
updateParentsAlloc(id)
方法更新父节点可分配的内存大小。- 因为
d
层有个节点被分配了,那么这个节点的父节点以及父节点的父节点等,它们的可分配的内存大小就和它们未分配的那个子节点大小一样了。
3.2.1.4 allocateNode
方法
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
// 得到 PoolArena 中对应尺寸容量的PoolSubpage链表头
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
// 因为 Tiny 或者 Small类型容量都小于 pageSize,
// 所以它们肯定只使用最低层一个 PoolSubpage
int d = maxOrder;
synchronized (head) {
// 寻找没有被分配的 PoolSubpage 索引id
int id = allocateNode(d);
if (id < 0) {
// 小于 0, 说明当前PoolChunk都被分配完了
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
// 当前PoolChunk 可用字节数
freeBytes -= pageSize;
// 得到 this.subpages 的下标
int subpageIdx = subpageIdx(id);
// 需要容量normCapacity的内存就从这个 subpage 中分配
// 分配之后,这个 subpage 就只能存放这个尺寸容量normCapacity的内存
// 这里的 subpage 肯定是没有分配过内存的,
// 因为通过 allocateNode(d) 找到的肯定是没有分配过内存的,
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
// 创建 PoolSubpage 实例,构造方法中会调用 subpage.init(head, normCapacity) 方法
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
// 这个 PoolSubpage 之前被用过,但是被释放了。
// 初始化, 将这个 PoolSubpage 能分配的容量尺寸就是 normCapacity
// 再将这个 PoolSubpage 添加到 PoolArena 对应容量尺寸 PoolSubpage<T> 数组中
// 这样下次再请求这个尺寸的内存时,直接从 PoolSubpage<T> 数组中找到这个 PoolSubpage,
// 进行内存分配
subpage.init(head, normCapacity);
}
// 在PoolSubpage上进行内存分配
return subpage.allocate();
}
}
- 通过
allocateNode(d)
方法在最底层寻找未分配的内存块。- 减少当前
PoolChunk
可用内存字节数freeBytes
。- 初始化一个
PoolSubpage
, 通过它的subpage.allocate()
方法进行Tiny
和Small
规格类型内存块分配。
3.2.1.5 initBuf
方法
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
// 因为 handle 高32位表示 bitmapIdx, 低32位表示 memoryMapIdx
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
// 因为 0x4000000000000000L | (long) bitmapIdx << 32,
// 所以 bitmapIdx == 0时,一定是 Normal 类型
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
// 肯定是被使用状态
assert val == unusable : String.valueOf(val);
// runOffset(memoryMapIdx) 表示在当前这个 PoolChunk 的字节偏移量
// runLength(memoryMapIdx) 这个内存块容量
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
}
}
private int runOffset(int id) {
// depth(id) 得到id对应层数
// id ^ 1 << depth(id) 就是这个id节点在这一层的偏移值
// 例如 id = 2049,depth(id) 就是 11,1 << depth(id) 就是 2048
// 2049 ^ 2048 = 1
int shift = id ^ 1 << depth(id);
// 偏移量shift乘以 id对应内存块容量runLength(id),
// 就得到最后的字节偏移量
return shift * runLength(id);
}
private int runLength(int id) {
// represents the size in #bytes supported by node 'id' in the tree
// 节点id对应的内存块大小,单位是字节,一个字节就是8位bits
// log2ChunkSize 是 chunkSize 的log2 的对数,
// 而 chunkSize = pageSize * maxOrder, depth(id) 就是求id节点对应的层数,最低层就是maxOrder,
// 所以id节点在最底层,那么depth(id)就是maxOrder,那么结果值就是 pageSize。
return 1 << log2ChunkSize - depth(id);
}
初始化
Normal
规格的PooledByteBuf
, 通过runOffset(memoryMapIdx)
方法计算偏移量,通过runLength(memoryMapIdx)
方法计算内存块大小。
3.2.1.6 initBuf
方法
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
// 通过 memoryMapIdx 找到 PoolSubpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
// 容量必须小于或等于 PoolSubpage 对应的块容量elemSize
assert reqCapacity <= subpage.elemSize;
// runOffset(memoryMapIdx) 表示这个 PoolSubpage 在当前这个 PoolChunk 的字节偏移量;
// (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
// 就是表示这个 Tiny或者Small类型内存块在 中PoolSubpage 偏移量。
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
初始化
Tiny
和Small
规格类型的PooledByteBuf
, 内存块大小通过PoolSubpage
的elemSize
获取,偏移量要增加(bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
的值。
3.2.2 回收内存块
void free(long handle, ByteBuffer nioBuffer) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx != 0) { // free a subpage
// bitmapIdx != 0 说明它是一个Tiny 或者 Small类型
// 通过 memoryMapIdx 找到对应的PoolSubpage
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage != null && subpage.doNotDestroy;
// 获取 PoolArena 中对应尺寸容量的PoolSubpage链表头
PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
synchronized (head) {
// 释放PoolSubpage中 bitmapIdx 对应那一个内存块
if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
// 如果 free(...) 方法返回true,说明这个PoolSubpage还在被使用,
// 不能被回收,那么直接返回
return;
}
}
}
/**
* 运行到这里,
* 要么它是一个 Normal 类型内存块,那就释放这个内存块。
* 要么它是一个 Tiny 或者 Small类型,但是它对应 PoolSubpage 那一块内存块都被释放了,这里就释放它
*/
freeBytes += runLength(memoryMapIdx);
// 将 memoryMapIdx 对应节点设置回原来值,又可以进行内存块分配了
setValue(memoryMapIdx, depth(memoryMapIdx));
// 因为子节点内存块释放,更新父节点的可分配容量
updateParentsFree(memoryMapIdx);
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
- 如果是
Tiny
和Small
规格类型的内存块,那么就要使用PoolSubpage
的free(...)
方法释放内存块。如果返回true
,表示这个PoolSubpage
还在使用,直接返回;如果返回false
,表示这个PoolSubpage
不在使用,可以被回收了,就要回收对应的整个内存块。- 增加当前
PoolChunk
可用内存字节数freeBytes
。- 通过
setValue(...)
方法,将memoryMapIdx
对应节点值改成初始层数值,表示这个节点有可以分配了。- 通过
updateParentsFree(memoryMapIdx)
方法,更新父节点的节点值,因为子节点内存块被释放,那么父节点可分配内存大小变了。
private void updateParentsFree(int id) {
int logChild = depth(id) + 1;
while (id > 1) {
// 父节点
int parentId = id >>> 1;
// 左子节点
byte val1 = value(id);
// 右子节点
byte val2 = value(id ^ 1);
// 子节点的标准值
logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up
if (val1 == logChild && val2 == logChild) {
// 左子节点和右子节点都是标准值,说明两个子节点都是空闲的
// 那么父节点的容量就是两倍
setValue(parentId, (byte) (logChild - 1));
} else {
// 取左子节点和右子节点中较大的容量,也是val比较小的值。
byte val = val1 < val2 ? val1 : val2;
setValue(parentId, val);
}
id = parentId;
}
}
- 如果左子节点和右子节点都是标准值,说明两个子节点都是空闲的,么父节点的容量就是两倍。
- 如果不是,那么就取左子节点和右子节点中较小值,即可分配内存大小更大。
3.3 PoolArena
类
3.3.1 allocate
方法
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);
return buf;
}
先通过
newByteBuf(maxCapacity)
方法,创建对应类型的池化缓存区PooledByteBuf
,然后调用allocate(cache, buf, reqCapacity)
方法进行内存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
// 如果容量小于 pageSize 值,即是 Tiny 或者 Small类型
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// 先从当前线程缓存中获取,如果能得到,直接返回
return;
}
// 得到Tiny 类型索引,因为 Tiny 类型是每个相隔16,所以索引就是 normCapacity >>> 4
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// 先从当前线程缓存中获取,如果能得到,直接返回
return;
}
// 得到Small 类型索引,每个Small 类型是成倍扩展的,即 512 1024 2048 4096, 小于 pageSize 的大小
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 得到符合容量尺寸的头PoolSubpage
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
// 使用 synchronized 防止并发
final PoolSubpage<T> s = head.next;
if (s != head) {
// 有这种尺寸容量的 PoolSubpage, 容量必须是 normCapacity
assert s.doNotDestroy && s.elemSize == normCapacity;
// 从 PoolSubpage 中分配内存
long handle = s.allocate();
assert handle >= 0;
// 将分配的
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
// 运行到这里,表示目前没有这个尺寸的 PoolSubpage,
// 那么从PoolChunk 中分配
allocateNormal(buf, reqCapacity, normCapacity);
}
// 增加计数
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge类型的内存块肯定不会缓存在当前线程中,直接调用 allocateHuge 分配
allocateHuge(buf, reqCapacity);
}
}
这个方法看起来很复杂,但是其实逻辑很简单:
- 通过
normalizeCapacity(reqCapacity)
方法来将用户申请内存大小转成规格化大小,例如18
就变成32
;990
就变成1024
。 - 根据
Tiny
,Small
,Normal
和Huge
不同类型,进行不同内存块分配。 - 对于
Tiny
和Small
规格类型:- 先从线程缓存
PoolThreadCache
中获取,如果获取到,就直接返回,获取不到就继续下面步骤。 - 再通过
tinySubpagePools
或smallSubpagePools
进行快速分配内存块,如果tinySubpagePools
中有对应的PoolSubpage
,那么就直接分配,如果没有,那么继续下面步骤。 - 通过
allocateNormal(buf, reqCapacity, normCapacity)
方法进行内存块的分配。
- 先从线程缓存
- 对于
Normal
规格类型:- 先从线程缓存
PoolThreadCache
中获取,如果获取到,就直接返回,获取不到就继续下面步骤。 - 通过
allocateNormal(buf, reqCapacity, normCapacity)
方法进行内存块的分配。
- 先从线程缓存
- 对于
Huge
规格类型:这种规格是没有线程缓存的,所以直接通过
allocateHuge(buf, reqCapacity)
方法进行内存块的分配。
3.3.2 normalizeCapacity
方法
int normalizeCapacity(int reqCapacity) {
if (reqCapacity < 0) {
throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
}
// 大于 chunkSize,表示是一个 Huge 类型
if (reqCapacity >= chunkSize) {
// 是否需要进行内存对齐
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
// 得到与 reqCapacity 最近的 2的幂数,
// 如果 reqCapacity 就是2的幂数,那么就是它自己
int normalizedCapacity = reqCapacity;
// 先减一,防止 reqCapacity 就是 2的幂数,导致结果值是 reqCapacity 的两步
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
//
return normalizedCapacity;
}
// 小于 512 数,Tiny 类型的数,是否需要进行内存对齐
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// 能够被 16 整除,那么就直接返回
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
// 结果值是 16 的倍数
return (reqCapacity & ~15) + 16;
}
- 对于
Huge
规格类型,只考虑是否需要进行内存对齐,即需要的内存块大小必须是某个数倍数;这个数必须是2
的幂数。例如内存对齐数directMemoryCacheAlignment
是16
,那么内存块大小必须能整除16
,也就是低四位都是0
。Small
和Normal
规格类型,它们相隔都是1
倍,那么只需要寻找最近的2
的幂数就行了。Tiny
规格类型,最小值是16
,因此只需要16
的倍数就可以了。
3.3.3 allocateNormal
方法
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
// 从 PoolChunkList 中分配内存
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// 如果没有从 PoolChunkList 中分配内存,
// 那么就要新创建 PoolChunk 对象,
// 默认情况下 pageSize=8192 maxOrder=11 pageShifts=13 chunkSize=16777216
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
// 将新创建的 PoolChunk 添加到 qInit 中
qInit.add(c);
}
- 先从
PoolChunkList
中寻找可用PoolChunk
进行内存分配。- 找不到,那么就创建新的
PoolChunk
实例。- 通过
PoolChunk
的allocate(buf, reqCapacity, normCapacity)
方法进行内存分配;这个上面已经介绍。- 最后将这个
PoolChunk
添加到PoolChunkList
中。
3.3.4 allocateHuge
方法
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
// 创建 Huge 类型的PoolChunk,不会放在内存池中 unpooled = true
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
Huge
规格的内存块是不会进入内存池的。
3.3.5 释放内存块
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
// 非池中内存块,直接回收
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
// 根据不同内存规格,将回收的内存块优先放入线程缓存中
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
// 线程缓存已经满了,那么就释放内存块
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
// 调用 PoolChunkList 方法进行内存块释放,需要改变 PoolChunkList 中的一些值
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
destroyChunk(chunk);
}
}
Huge
规格类型的内存块直接释放。Tiny
,Small
和Normal
规格类型的内存块,优先放入线程缓存中,如果对应的线程缓存已经满了,那么才释放。