Netty源码_内存管理(jemalloc3)

Netty 是一个高性能的网络应用程序框架,主要就是进行数据的交互,所以必须有一个高效的内存分配器。
内存分配器的功能就两个:

  • 用户申请内存时,分配给它内存块。
  • 用户主动释放内存时,回收这个内存块。

一般我们的做法是:

  • 先申请一个较大的内存块。
  • 当用户申请内存时,从这个内存块中,分割符合申请内存大小的内存块给用户。
  • 用户主动释放内存时,再将这个内存块回收。

但是这么做有个问题,因为用户申请内存的大小各不相同,分配的内存块大小就不一样,回收以后就是各种尺寸的内存碎片。

  • 例如,我们有一个20大小的总内存块,分配给用户两个大小为5 内存块,和一个内存为 4 内存块,两个内存为2 内存块;
  • 之后都回收了,就有两个为 5,一个为4,三个为2内存碎片
  • 这个时候在申请内存为 6 的内存块时,发现没有办法分配了。

为了解决这个问题,能够高效地进行内存分配,就要使用内存分配算法了。

  • Netty 4.1.45版本之前使用的是 jemalloc3 算法来进行内存分配的;
  • 而在4.1.45版本之后使用的是 jemalloc4 算法来进行内存分配的。
  • 本篇文章我们先介绍 jemalloc3 算法实现。

一. 划分内存规格

产生内存碎片最主要的原因就是因为用户申请的内存大小不一样。

那么如果用户申请的内存大小都一样,那么不就没有内存碎片了么。

想法虽然是好的,但是明显是不可能的,因为程序运行过程中,需要的内存本来就是不同的。
那么我们就换一个思路,虽然不能要求申请的内存大小都一样,但是可以提前划分好不同规格的内存,然后根据请的内存大小不同,分配不同规格的内存快。

jemalloc3_内存规格.png

如上图所示,jemalloc3 一共将内存分为四种类型:

内存规格 描述
Tiny 微小规格内存块,容量从16B496B 一共31 个内存规格,每个规格容量相差16B
Small 小规格内存块,容量从512B4KB 一共4 个内存规格,每个规格容量相差一倍
Normal 正常规格内存块,容量从8KB16MB 一共11 个内存规格,每个规格容量相差一倍
Huge 巨大内存块,不会放在内存管理中,直接内存中申请

因此就可以根据用户申请的内存大小,直接对应规格的内存块。

  • 例如申请 40B, 那么就分配 48B 规格的内存块,虽然有 8B 的字节被浪费了,但是避免了内存碎片的产生。
  • 你会发现从Small 开始,每个规格内存块相差都是一倍,这就可以导致 50% 的内存浪费;例如我们申请 513B 大小,那么只能分配1KB 规格的内存块。这个是 jemalloc3 算法的缺陷,只能使用 jemalloc4 算法进行改进,以后我们会说到。

二. 内存规格算法实现

内存规格的划分作用和意义我们已经了解了,那么怎么实现它呢?
Netty 中使用 PoolChunk 来进行内存分配:

  • PoolChunk 先申请一大块内存memory(可以是字节数组,也可以是DirectByteBuffer),大小就是chunkSize(16MB)。
  • 我们知道 Normal 规格最小内存块是 pageSize(8KB) 容量,那么就要能记录最小 Normal 规格内存块使用情况。
  • TinySmall 规格内存块小于 pageSize 大小,可以使用一个最小 Normal 规格内存块来分配多个 TinySmall 规格内存块。
内存规格算法实现.png

如图所示:

  • PoolChunk 使用一个满二叉树(用数组实现)来记录内存块的分配使用情况。

    • 因为chunkSize == 16MB,且 pageSize == 8KB,那么树的深度depth 一共 12 层(从011)。
    • 根据不同深度,就可以获得不同大小的内存块,例如最底层即11层所有节点对应的内存块大小就是8KB
  • 使用数组来实现这个满二叉树。

    • 这里有两个数组 memoryMapdepthMap,大小都是4096。做了特殊处理,下标0 这个位置没有任何意义,从下标 1 开始。
    • depthMap 的值表示当前下标对应在二叉树中的层数。例如下标为1的值是 0,表示第 0 层;下标为 6 的值是 2,表示第 2 层;下标为 2048 的值是 11,表示第 11 层。
    • memoryMap 的值表示当前这个节点能分配的内存块大小。刚开始时和depthMap 的值是一样的,但是当它的子节点被分配了,那么值就会变。例如刚开始时,下标为 4 的值是 2,表示能分配 4MB 内存块大小;如果它的一个子节点被分配了,那么它的值就会变成 3,表示只能分配 2MB 内存块大小。
  • 使用 bitmap 数据记录TinySmall规格内存使用情况

    • 最底层的内存块可以在分成 TinySmall规格小内存块。
    • 一旦在最底层的内存块分配了一个 TinySmall规格小内存块,那么这个最底层的内存块就表示被使用了,而且这个内存块只能分配刚分配那个大小的规格的小内存块,直到它被回收(即由它分配的小内存快都被释放),进行重新分配,那么可以分配其他大小的规格的小内存块。即由第一次分配的规格大小来决定。
    • 通过bitmap 位图数组来记录,已经在最底层的内存块上分配了那些小内存块。因为最小内存块大小是16B,而最底层的内存块大小是8KB,因此最多可以分512块;一个 long 类型有64 位二进制数,所以最多需要8long 类型就可以记录。
    • 通过 bitmapIdx 的值,可以得到在bitmap 位图数组中的那一个long 类型的那一位。通过 bitmapIdx >>> 6 (即除以64) 得到bitmap 位图数组的下标;通过 bitmapIdx & 63(即整除64 的余数)得到占据long 类型那一位。
  • 通过 handle 来记录偏移量和内存块大小

    • 32 位用来记录 bitmapIdx,从前面介绍 bitmapIdx的值很小的,最大值就是 64 * 8。最高位肯定是0,次高位(0x4000000000000000L)其实是用来记录是不是TinySmall类型规格。
    • 32 位用来记录 memoryMapIdx
    • 如果是 Normal规格,高32 位的值肯定是0;通过memoryMapIdxdepthMap数组获取对应层数,这样就能得到内存块大小了;根据 memoryMapIdx 可以计算在当前这一层的偏移值。例如 memoryMapIdx = 2050,那么是第11 层,大小就是8KB;偏移值就是 2050 - 2048 = 2,那么偏移量就是 16KB;因此我们就在偏移量16KB处分割一块8KB大小的内存块给用户使用。
    • 如果是TinySmall规格,那么肯定是在最底层,先通过memoryMapIdx 计算偏移值,得到偏移量,然后得到这个最底层内存块分割成小内存的大小,再根据bitmapIdx值得到在这个最底层内存块上的偏移量,最后就能得到最终偏移量和分割内存块大小了。

三. 源码实现

3.1 PoolSubpage

3.1.1 初始化

    PoolSubpage(PoolSubpage<T> head, PoolChunk<T> chunk, int memoryMapIdx, int runOffset, int pageSize, int elemSize) {
        this.chunk = chunk;
        this.memoryMapIdx = memoryMapIdx;
        this.runOffset = runOffset;
        this.pageSize = pageSize;
        // 因为 long 类型是8个字节,64位二进制数;
        // 而 Tiny 类型最小容量都是 16 个字节。
        // 所以 bitmap 位图数组最大长度就是  pageSize / 16/ 64
        bitmap = new long[pageSize >>> 10]; // pageSize / 16 / 64
        init(head, elemSize);
    }

    void init(PoolSubpage<T> head, int elemSize) {
        doNotDestroy = true;
        // 当前这 PoolSubpage 只会分配 elemSize 大小容量的内存
        this.elemSize = elemSize;
        if (elemSize != 0) {
            // PoolSubpage 一共可以分配多少块这个容量的内存
            maxNumElems = numAvail = pageSize / elemSize;
            nextAvail = 0;
            // 无符号右移6位,也就是除以64,因为一个 long 有64个二进制位
            bitmapLength = maxNumElems >>> 6;
            // 如果 maxNumElems 不能整除 64,那么就要将 bitmapLength 加一
            if ((maxNumElems & 63) != 0) {
                bitmapLength ++;
            }

            for (int i = 0; i < bitmapLength; i ++) {
                bitmap[i] = 0;
            }
        }
        // 添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
        addToPool(head);
    }
  • 刚开始创建的时候,主要是创建 bitmap 位图数组,数组长度就是 pageSize >>> 10,即除以64位二进制数,和最小Tiny类型规格都是16 个字节。
  • init(...) 初始化方法,刚创建的时候或者PoolSubpage被回收重新使用的时候调用。
  • 确定当前PoolSubpage分配内存块大小elemSize
  • 计算最多分配多少这个大小的内存块。
  • 计算真实 bitmap位图数组长度bitmapLength
  • 将这个PoolSubpage添加到PoolArena中对应尺寸容量的PoolSubpage链表中,这样就不需要需要查找,加快内存块分配速度。

3.1.2 分配内存块

    /**
     * 返回子页面内存分配的位图索引
     * 使用 long 类型每个二进制位数`0`或 `1` 来记录这块内存有没有被分配过,
     * 因为 long 是8个字节,64位二进制数,所以可以表示 64 个内存块分配情况。
     */
    long allocate() {
        if (elemSize == 0) {
            return toHandle(0);
        }

        if (numAvail == 0 || !doNotDestroy) {
            return -1;
        }

        // 得到下一个可用的位图 bitmap 索引
        final int bitmapIdx = getNextAvail();
        // 除以 64 得到的整数,即 bitmap[] 数组的下标
        int q = bitmapIdx >>> 6;
        // 与 64 的余数,即占据的 long 类型的位数
        int r = bitmapIdx & 63;
        // 必须是 0, 表示这一块内存没有被分配
        assert (bitmap[q] >>> r & 1) == 0;
        // 将r对应二进制位设置为1,表示这一位代表的内存块已经被分配了
        bitmap[q] |= 1L << r;

        if (-- numAvail == 0) {
            // 如果可分配内存块的数量numAvail为0,
            // 那么就要这个 PoolSubpage 从 PoolArena 中
            // 对应尺寸容量的PoolSubpage链表中移除。
            removeFromPool();
        }

        // 使用 long类型的高32位储存 bitmapIdx 的值,即使用 PoolSubpage 中那一块的内存;
        // 低32位储存 memoryMapIdx 的值,即表示使用那一个 PoolSubpage
        return toHandle(bitmapIdx);
    }

    private long toHandle(int bitmapIdx) {
        // 虽然我们使用高 32 为表示 bitmapIdx,但是当bitmapIdx = 0 时,
        // 就无法确定是否表示 bitmapIdx 的值。
        // 所以这里就 0x4000000000000000L | (long) bitmapIdx << 32,那进行区分。
        // 放心  bitmapIdx << 32 是不可能超过 0x4000000000000000L
        return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
    }

方法流程:

  • 如果没有可用内存块了,就直接返回 -1
  • 通过 getNextAvail() 方法,获取下一个能分配的内存块的位图索引bitmapIdx
  • 根据位图索引bitmapIdxbitmap 位图数组中对应二进制位设置为1,表示已经被分配了。
  • 如果分配之后没有内存块了,就将这个PoolSubpagePoolArena中对应尺寸容量的PoolSubpage链表中删除,因为已经不能分配了。
  • 通过 toHandle(bitmapIdx) 返回 handle 值。

3.1.3 回收内存块

 /**
     * 返回 true,表示这个 PoolSubpage 还在使用,即上面还有其他小内存块被使用;
     * 返回 false,表示这个 PoolSubpage 上面分配的小内存块都释放了,可以回收整个 PoolSubpage。
     */
    boolean free(PoolSubpage<T> head, int bitmapIdx) {
        if (elemSize == 0) {
            return true;
        }
        // 得到位图 bitmap 中的下标
        int q = bitmapIdx >>> 6;
        // 得到使用 long 类型中那一位
        int r = bitmapIdx & 63;
        // 必须不能是 0, 表示这个 bitmapIdx 对应内存块肯定是在被使用
        assert (bitmap[q] >>> r & 1) != 0;
        // 将r对应二进制位设置为0,表示这一位代表的内存块已经被释放了
        bitmap[q] ^= 1L << r;

        // 将 bitmapIdx 设置为下一个可以使用的内存块索引,
        // 因为刚被释放,这样就不用进行搜索来查找可用内存块索引。
        setNextAvail(bitmapIdx);

        if (numAvail ++ == 0) {
            // 如果可分配内存块的数量numAvail从0开始增加,
            // 那么就要重新添加到 PoolArena 中对应尺寸容量的PoolSubpage链表中
            addToPool(head);
            return true;
        }

        if (numAvail != maxNumElems) {
            return true;
        } else {
            // 子页面未使用(numAvail == maxNumElems)
            if (prev == next) {
                // 如果 prev == next,即 subpage 组成的链表中没有其他 subpage,不能删除它
                return true;
            }

            // 如果 prev != next,即 subpage 组成的链表中还有其他 subpage,那么就删除它
            doNotDestroy = false;
            removeFromPool();
            return false;
        }
    }
  • 根据位图索引bitmapIdxbitmap 位图数组中对应二进制位设置为0,表示已经被释放了。
  • 调用 setNextAvail(bitmapIdx),加快下一次分配内存块的速度,不需要重新查找了。
  • 最后再处理一下 PoolArena中对应尺寸容量的PoolSubpage链表。

3.2 PoolChunk

3.2.1 分配内存块

3.2.1.1 allocate 方法

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        final long handle;
        if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
            //  >= pageSize,即 Normal 规格类型内存块,通过 allocateRun 方法分配
            handle =  allocateRun(normCapacity);
        } else {
            // 分配 Tiny 和 Small 规格类型内存块
            handle = allocateSubpage(normCapacity);
        }

        if (handle < 0) {
            // 小于 0, 说明当前PoolChunk都被分配完了
            return false;
        }
        ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
        // 使用 handle 来初始化 池化缓存区PooledByteBuf
        initBuf(buf, nioBuffer, handle, reqCapacity);
        return true;
    }
  • 通过 allocateRun(...) 方法,分配 Normal 规格类型内存块;通过 allocateSubpage(normCapacity)方法,分配TinySmall 规格类型内存块。
  • 通过 initBuf(...) 方法,使用申请的内存块handle 来初始化池化缓存区PooledByteBuf

3.2.1.2 allocateRun 方法

    private long allocateRun(int normCapacity) {
        /**
         * 默认情况下,maxOrder=11,pageShifts=13
         * normCapacity肯定是大于或者等于pageSize,即 log2(normCapacity) >= pageShifts
         *
         * d 表示在第几层可以分配这个尺寸容量normCapacity 的内存块。
         * 最底层,即11层,最多只能分配 pageSize尺寸容量的内存块。
         */
        int d = maxOrder - (log2(normCapacity) - pageShifts);
        /**
         * 得到尺寸容量normCapacity 内存块的索引id
         */
        int id = allocateNode(d);
        if (id < 0) {
            // 小于 0, 说明当前PoolChunk都被分配完了
            return id;
        }
        freeBytes -= runLength(id);
        return id;
    }
  • 通过 allocateNode(d) 方法获取 memoryMapIdx 的值。
  • 减少当前 PoolChunk 可用内存字节数freeBytes

3.2.1.3 allocateNode 方法

     private int allocateNode(int d) {
        // d 代表层数,其实也代表需要的内存容量,(1 << (maxOrder - d)) * pageSize
        // 当 d 和 maxOrder 相等,即需要内存容量就是 pageSize
        int id = 1;
        /**
         * 例如 d = 11
         * 那么 1 << d   就是 100000000000
         *  - (1 << d)  就是 1111111111111111111111111111111111111111111111111111100000000000
         *  所以 initial的作用就是用来快速判断某个值是不是小于 (1 << d)
         */
        int initial = - (1 << d); // has last d bits = 0 and rest all = 1
        byte val = value(id);
        if (val > d) { // unusable
            return -1;
        }
        /**
         * val < d 表示当前节点id对应的内存容量还大于 d 对应的内存容量,继续寻找。
         * (id & initial) == 0 只有当 id < (1 << d) 时成立,保证 id 是 d层的。
         */
        while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
            id <<= 1;
            val = value(id);
            if (val > d) {
                /**
                 * val > d,表示从 id 节点对应的内存容量已经不足要求内存块的大小了,
                 * 但是它能走到这一个判断,说明 id 节点的父节点对应的内存容量是可以满足内存块的大小的;
                 * 那一定是因为 id 节点兄弟节点对应内存容量能满足内存块的大小。
                 *
                 * id ^= 1 就是得到 id 节点的兄弟节点
                 */
                id ^= 1;
                val = value(id);
            }
        }
        byte value = value(id);
        assert value == d && (id & initial) == 1 << d : String.format("val = %d, id & initial = %d, d = %d",
                value, id & initial, d);
        // 将当前 memoryMap 的下标id 的值设置为 unusable,
        // 表示它已经被使用了, unusable = maxOrder + 1
        setValue(id, unusable); // mark as unusable
        // 更新这个 id 之上所有父节点的值,
        // 因为id 节点被使用了,那么它之上所有父节点代表的内存容量都收到影响。
        updateParentsAlloc(id);
        return id;
    }

    private void updateParentsAlloc(int id) {
        // 通过循环更新所有父节点的值。
        while (id > 1) {
            int parentId = id >>> 1;
            byte val1 = value(id);
            byte val2 = value(id ^ 1);
            // 寻找父节点对应子节点中较小的值
            byte val = val1 < val2 ? val1 : val2;
            setValue(parentId, val);
            id = parentId;
        }
    }
  • 现在满二叉树中,在d 对应的那层中寻找还没有被分配的节点(从左到右寻找),返回这个节点的下标值(即memoryMapIdx)。
  • 通过 setValue(id, unusable) 方法,将这个节点值设置成unusable,表示这个节点已经被分配了。
  • 通过 updateParentsAlloc(id) 方法更新父节点可分配的内存大小。
  • 因为 d 层有个节点被分配了,那么这个节点的父节点以及父节点的父节点等,它们的可分配的内存大小就和它们未分配的那个子节点大小一样了。

3.2.1.4 allocateNode 方法

    private long allocateSubpage(int normCapacity) {
        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        // 得到 PoolArena 中对应尺寸容量的PoolSubpage链表头
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        // 因为 Tiny 或者 Small类型容量都小于 pageSize,
        // 所以它们肯定只使用最低层一个 PoolSubpage
        int d = maxOrder;
        synchronized (head) {
            // 寻找没有被分配的 PoolSubpage 索引id
            int id = allocateNode(d);
            if (id < 0) {
                // 小于 0, 说明当前PoolChunk都被分配完了
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            // 当前PoolChunk 可用字节数
            freeBytes -= pageSize;

            // 得到 this.subpages 的下标
            int subpageIdx = subpageIdx(id);
            // 需要容量normCapacity的内存就从这个 subpage 中分配
            // 分配之后,这个 subpage 就只能存放这个尺寸容量normCapacity的内存
            // 这里的 subpage 肯定是没有分配过内存的,
            // 因为通过 allocateNode(d) 找到的肯定是没有分配过内存的,
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                // 创建 PoolSubpage 实例,构造方法中会调用 subpage.init(head, normCapacity) 方法
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else {
                // 这个 PoolSubpage 之前被用过,但是被释放了。
                // 初始化, 将这个 PoolSubpage 能分配的容量尺寸就是 normCapacity
                // 再将这个 PoolSubpage 添加到 PoolArena 对应容量尺寸 PoolSubpage<T> 数组中
                // 这样下次再请求这个尺寸的内存时,直接从 PoolSubpage<T> 数组中找到这个 PoolSubpage,
                // 进行内存分配
                subpage.init(head, normCapacity);
            }
            // 在PoolSubpage上进行内存分配
            return subpage.allocate();
        }
    }
  • 通过 allocateNode(d) 方法在最底层寻找未分配的内存块。
  • 减少当前 PoolChunk 可用内存字节数 freeBytes
  • 初始化一个 PoolSubpage, 通过它的subpage.allocate() 方法进行TinySmall 规格类型内存块分配。

3.2.1.5 initBuf 方法

    void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
        // 因为 handle 高32位表示 bitmapIdx, 低32位表示 memoryMapIdx
        int memoryMapIdx = memoryMapIdx(handle);
        int bitmapIdx = bitmapIdx(handle);
        // 因为 0x4000000000000000L | (long) bitmapIdx << 32,
        // 所以 bitmapIdx == 0时,一定是 Normal 类型
        if (bitmapIdx == 0) {
            byte val = value(memoryMapIdx);
            // 肯定是被使用状态
            assert val == unusable : String.valueOf(val);
            // runOffset(memoryMapIdx) 表示在当前这个 PoolChunk 的字节偏移量
            // runLength(memoryMapIdx) 这个内存块容量
            buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
                    reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
        } else {

            initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
        }
    }

    private int runOffset(int id) {
        // depth(id) 得到id对应层数
        // id ^ 1 << depth(id) 就是这个id节点在这一层的偏移值
        // 例如 id = 2049,depth(id) 就是 11,1 << depth(id) 就是 2048
        // 2049 ^ 2048 = 1
        int shift = id ^ 1 << depth(id);
        // 偏移量shift乘以 id对应内存块容量runLength(id),
        // 就得到最后的字节偏移量
        return shift * runLength(id);
    }

    private int runLength(int id) {
        // represents the size in #bytes supported by node 'id' in the tree
        // 节点id对应的内存块大小,单位是字节,一个字节就是8位bits
        // log2ChunkSize 是 chunkSize 的log2 的对数,
        // 而 chunkSize = pageSize * maxOrder, depth(id) 就是求id节点对应的层数,最低层就是maxOrder,
        // 所以id节点在最底层,那么depth(id)就是maxOrder,那么结果值就是 pageSize。
        return 1 << log2ChunkSize - depth(id);
    }

初始化 Normal 规格的 PooledByteBuf, 通过 runOffset(memoryMapIdx) 方法计算偏移量,通过 runLength(memoryMapIdx) 方法计算内存块大小。

3.2.1.6 initBuf 方法

    private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
                                    long handle, int bitmapIdx, int reqCapacity) {
        assert bitmapIdx != 0;

        int memoryMapIdx = memoryMapIdx(handle);

        // 通过 memoryMapIdx 找到 PoolSubpage
        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage.doNotDestroy;
        // 容量必须小于或等于 PoolSubpage 对应的块容量elemSize
        assert reqCapacity <= subpage.elemSize;

        // runOffset(memoryMapIdx) 表示这个 PoolSubpage 在当前这个 PoolChunk 的字节偏移量;
        // (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize
        // 就是表示这个 Tiny或者Small类型内存块在 中PoolSubpage 偏移量。
        buf.init(
            this, nioBuffer, handle,
            runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                reqCapacity, subpage.elemSize, arena.parent.threadCache());
    }

初始化 TinySmall 规格类型的 PooledByteBuf, 内存块大小通过 PoolSubpageelemSize 获取,偏移量要增加 (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize 的值。

3.2.2 回收内存块

    void free(long handle, ByteBuffer nioBuffer) {
        int memoryMapIdx = memoryMapIdx(handle);
        int bitmapIdx = bitmapIdx(handle);

        if (bitmapIdx != 0) { // free a subpage
            // bitmapIdx != 0 说明它是一个Tiny 或者 Small类型
            // 通过 memoryMapIdx 找到对应的PoolSubpage
            PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
            assert subpage != null && subpage.doNotDestroy;

            // 获取 PoolArena 中对应尺寸容量的PoolSubpage链表头
            PoolSubpage<T> head = arena.findSubpagePoolHead(subpage.elemSize);
            synchronized (head) {

                // 释放PoolSubpage中 bitmapIdx 对应那一个内存块
                if (subpage.free(head, bitmapIdx & 0x3FFFFFFF)) {
                    // 如果 free(...) 方法返回true,说明这个PoolSubpage还在被使用,
                    // 不能被回收,那么直接返回
                    return;
                }
            }
        }
        /**
         * 运行到这里,
         * 要么它是一个 Normal 类型内存块,那就释放这个内存块。
         * 要么它是一个 Tiny 或者 Small类型,但是它对应 PoolSubpage 那一块内存块都被释放了,这里就释放它
         */
        freeBytes += runLength(memoryMapIdx);
        // 将  memoryMapIdx 对应节点设置回原来值,又可以进行内存块分配了
        setValue(memoryMapIdx, depth(memoryMapIdx));
        // 因为子节点内存块释放,更新父节点的可分配容量
        updateParentsFree(memoryMapIdx);

        if (nioBuffer != null && cachedNioBuffers != null &&
                cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
            cachedNioBuffers.offer(nioBuffer);
        }
    }
  • 如果是TinySmall 规格类型的内存块,那么就要使用 PoolSubpagefree(...) 方法释放内存块。如果返回 true,表示这个 PoolSubpage 还在使用,直接返回;如果返回 false,表示这个 PoolSubpage 不在使用,可以被回收了,就要回收对应的整个内存块。
  • 增加当前 PoolChunk 可用内存字节数 freeBytes
  • 通过 setValue(...) 方法,将 memoryMapIdx 对应节点值改成初始层数值,表示这个节点有可以分配了。
  • 通过 updateParentsFree(memoryMapIdx) 方法,更新父节点的节点值,因为子节点内存块被释放,那么父节点可分配内存大小变了。
    private void updateParentsFree(int id) {
        int logChild = depth(id) + 1;
        while (id > 1) {
            // 父节点
            int parentId = id >>> 1;
            // 左子节点
            byte val1 = value(id);
            // 右子节点
            byte val2 = value(id ^ 1);
            // 子节点的标准值
            logChild -= 1; // in first iteration equals log, subsequently reduce 1 from logChild as we traverse up

            if (val1 == logChild && val2 == logChild) {
                // 左子节点和右子节点都是标准值,说明两个子节点都是空闲的
                // 那么父节点的容量就是两倍
                setValue(parentId, (byte) (logChild - 1));
            } else {
                // 取左子节点和右子节点中较大的容量,也是val比较小的值。
                byte val = val1 < val2 ? val1 : val2;
                setValue(parentId, val);
            }

            id = parentId;
        }
    }
  • 如果左子节点和右子节点都是标准值,说明两个子节点都是空闲的,么父节点的容量就是两倍。
  • 如果不是,那么就取左子节点和右子节点中较小值,即可分配内存大小更大。

3.3 PoolArena

3.3.1 allocate 方法

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        allocate(cache, buf, reqCapacity);
        return buf;
    }

先通过 newByteBuf(maxCapacity) 方法,创建对应类型的池化缓存区PooledByteBuf,然后调用allocate(cache, buf, reqCapacity) 方法进行内存分配。

  private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            // 如果容量小于 pageSize 值,即是 Tiny 或者 Small类型
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // 先从当前线程缓存中获取,如果能得到,直接返回
                    return;
                }
                // 得到Tiny 类型索引,因为 Tiny 类型是每个相隔16,所以索引就是 normCapacity >>> 4
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // 先从当前线程缓存中获取,如果能得到,直接返回
                    return;
                }
                // 得到Small 类型索引,每个Small 类型是成倍扩展的,即 512 1024 2048 4096, 小于 pageSize 的大小
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            // 得到符合容量尺寸的头PoolSubpage
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                // 使用 synchronized 防止并发
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    // 有这种尺寸容量的 PoolSubpage, 容量必须是 normCapacity
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    // 从 PoolSubpage 中分配内存
                    long handle = s.allocate();
                    assert handle >= 0;
                    // 将分配的
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                // 运行到这里,表示目前没有这个尺寸的 PoolSubpage,
                // 那么从PoolChunk 中分配
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            // 增加计数
            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge类型的内存块肯定不会缓存在当前线程中,直接调用 allocateHuge 分配
            allocateHuge(buf, reqCapacity);
        }
    }

这个方法看起来很复杂,但是其实逻辑很简单:

  • 通过 normalizeCapacity(reqCapacity) 方法来将用户申请内存大小转成规格化大小,例如 18 就变成 32; 990 就变成 1024
  • 根据 Tiny,Small,NormalHuge 不同类型,进行不同内存块分配。
  • 对于 TinySmall 规格类型:
    • 先从线程缓存PoolThreadCache 中获取,如果获取到,就直接返回,获取不到就继续下面步骤。
    • 再通过tinySubpagePoolssmallSubpagePools 进行快速分配内存块,如果 tinySubpagePools 中有对应的 PoolSubpage,那么就直接分配,如果没有,那么继续下面步骤。
    • 通过 allocateNormal(buf, reqCapacity, normCapacity) 方法进行内存块的分配。
  • 对于Normal 规格类型:
    • 先从线程缓存PoolThreadCache 中获取,如果获取到,就直接返回,获取不到就继续下面步骤。
    • 通过 allocateNormal(buf, reqCapacity, normCapacity) 方法进行内存块的分配。
  • 对于Huge 规格类型:

    这种规格是没有线程缓存的,所以直接通过 allocateHuge(buf, reqCapacity) 方法进行内存块的分配。

3.3.2 normalizeCapacity 方法

    int normalizeCapacity(int reqCapacity) {
        if (reqCapacity < 0) {
            throw new IllegalArgumentException("capacity: " + reqCapacity + " (expected: 0+)");
        }

        // 大于 chunkSize,表示是一个 Huge 类型
        if (reqCapacity >= chunkSize) {
            // 是否需要进行内存对齐
            return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
        }

        if (!isTiny(reqCapacity)) { // >= 512
            // Doubled

            // 得到与 reqCapacity 最近的 2的幂数,
            // 如果 reqCapacity 就是2的幂数,那么就是它自己
            int normalizedCapacity = reqCapacity;
            // 先减一,防止 reqCapacity 就是 2的幂数,导致结果值是 reqCapacity 的两步
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;

            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

            //
            return normalizedCapacity;
        }

        // 小于 512 数,Tiny 类型的数,是否需要进行内存对齐
        if (directMemoryCacheAlignment > 0) {
            return alignCapacity(reqCapacity);
        }

        // 能够被 16 整除,那么就直接返回
        if ((reqCapacity & 15) == 0) {
            return reqCapacity;
        }

        // 结果值是 16 的倍数
        return (reqCapacity & ~15) + 16;
    }
  • 对于 Huge 规格类型,只考虑是否需要进行内存对齐,即需要的内存块大小必须是某个数倍数;这个数必须是 2 的幂数。例如内存对齐数directMemoryCacheAlignment16,那么内存块大小必须能整除 16,也就是低四位都是 0
  • SmallNormal 规格类型,它们相隔都是1倍,那么只需要寻找最近的 2 的幂数就行了。
  • Tiny 规格类型,最小值是16,因此只需要16的倍数就可以了。

3.3.3 allocateNormal 方法

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        // 从 PoolChunkList 中分配内存
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // 如果没有从 PoolChunkList 中分配内存,
        // 那么就要新创建 PoolChunk 对象,
        // 默认情况下 pageSize=8192 maxOrder=11 pageShifts=13 chunkSize=16777216
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        boolean success = c.allocate(buf, reqCapacity, normCapacity);
        assert success;
        // 将新创建的 PoolChunk 添加到 qInit 中
        qInit.add(c);
    }
  • 先从 PoolChunkList 中寻找可用 PoolChunk 进行内存分配。
  • 找不到,那么就创建新的 PoolChunk 实例。
  • 通过 PoolChunkallocate(buf, reqCapacity, normCapacity) 方法进行内存分配;这个上面已经介绍。
  • 最后将这个 PoolChunk 添加到 PoolChunkList 中。

3.3.4 allocateHuge 方法

    private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
        // 创建 Huge 类型的PoolChunk,不会放在内存池中 unpooled = true
        PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
        activeBytesHuge.add(chunk.chunkSize());
        buf.initUnpooled(chunk, reqCapacity);
        allocationsHuge.increment();
    }

Huge 规格的内存块是不会进入内存池的。

3.3.5 释放内存块

    void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
        if (chunk.unpooled) {
            // 非池中内存块,直接回收
            int size = chunk.chunkSize();
            destroyChunk(chunk);
            activeBytesHuge.add(-size);
            deallocationsHuge.increment();
        } else {
            SizeClass sizeClass = sizeClass(normCapacity);
            // 根据不同内存规格,将回收的内存块优先放入线程缓存中
            if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                // cached so not free it.
                return;
            }

            // 线程缓存已经满了,那么就释放内存块
            freeChunk(chunk, handle, sizeClass, nioBuffer);
        }
    }

 void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) {
        final boolean destroyChunk;
        synchronized (this) {
            switch (sizeClass) {
            case Normal:
                ++deallocationsNormal;
                break;
            case Small:
                ++deallocationsSmall;
                break;
            case Tiny:
                ++deallocationsTiny;
                break;
            default:
                throw new Error();
            }
            // 调用 PoolChunkList 方法进行内存块释放,需要改变 PoolChunkList 中的一些值
            destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
        }
        if (destroyChunk) {
            // destroyChunk not need to be called while holding the synchronized lock.
            destroyChunk(chunk);
        }
    }
  • Huge 规格类型的内存块直接释放。
  • Tiny,SmallNormal 规格类型的内存块,优先放入线程缓存中,如果对应的线程缓存已经满了,那么才释放。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,875评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,569评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,475评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,459评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,537评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,563评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,580评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,326评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,773评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,086评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,252评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,921评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,566评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,190评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,435评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,129评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,125评论 2 352

推荐阅读更多精彩内容