[Netty源码分析]ByteBuf(四)

Arena Chunk Page SubPage

Arena


Arena.png
Arena数据结构.png

结构:最外层是一个ChunkList,ChunkList通过链表连接,Chunk大小16M,采取这种结构是Netty会实时计算内存分配情况

final class PoolThreadCache {
    //上下不同的是Arena是开辟了一块内存,而Cache是缓存了一块连续内存
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
}

Chunk


Chunk结构.png
abstract class PoolArena<T> implements PoolArenaMetric {
    //表示不同使用率的Chunk集合
    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;

    //PoolChunkList(PoolArena<T> arena, PoolChunkList<T> nextList, int minUsage, int maxUsage, int chunkSize)
    q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
    q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
    q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
}

SubPage

final class PoolSubpage<T> implements PoolSubpageMetric {
    final PoolChunk<T> chunk;
    private final int memoryMapIdx;
    private final int runOffset;
    private final int pageSize;
    private final long[] bitmap;//记录子页分配情况
    PoolSubpage<T> prev;
    PoolSubpage<T> next;
    boolean doNotDestroy;
    int elemSize;//子页划分大小
    private int maxNumElems;
    private int bitmapLength;
    private int nextAvail;
    private int numAvail;
}

Page级别的内存分配(例:allocateNormal())

    // Method must be called inside synchronized(this) { ... }block
    1. 尝试在现有的Chunk上分配
    2. 如果不能则创建一个chunk进行内存分配,newChunk(...)
    3. 初始化PooledByteBuf,initBuf(...)
    //PoolArena
    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // Add a new chunk.
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        long handle = c.allocate(normCapacity);
        assert handle > 0;
        c.initBuf(buf, handle, reqCapacity);
        qInit.add(c);
    }

    ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
    //PoolChunkList
    //从head节点往下尝试分配,一旦使用率大于maxUsage,从当前节点移除加到下一节点
    boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (head == null || normCapacity > maxCapacity) {
            // Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
            // be handled by the PoolChunks that are contained in this PoolChunkList.
            return false;
        }

        for (PoolChunk<T> cur = head;;) {
            long handle = cur.allocate(normCapacity);
            if (handle < 0) {
                cur = cur.next;
                if (cur == null) {
                    return false;
                }
            } else {
                cur.initBuf(buf, handle, reqCapacity);
                if (cur.usage() >= maxUsage) {
                    remove(cur);
                    nextList.add(cur);
                }
                return true;
            }
        }
    }
Chunk分配节点.png
  PoolChunk(PoolArena<T> arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize, int offset) {
      //略
      int memoryMapIndex = 1;
      for (int d = 0; d <= maxOrder; ++ d) { // move down the tree one level at a time
          int depth = 1 << d;
          for (int p = 0; p < depth; ++ p) {
              // in each level traverse left to right and set value to the depth of subtree
              memoryMap[memoryMapIndex] = (byte) d;
              depthMap[memoryMapIndex] = (byte) d;
              memoryMapIndex ++;
          }
      }
  }
  ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  long allocate(int normCapacity) {
      if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
          return allocateRun(normCapacity);
      } else {
          return allocateSubpage(normCapacity);
      }
  }
  ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  private long allocateRun(int normCapacity) {
      int d = maxOrder - (log2(normCapacity) - pageShifts);//算出分配在第几层
      int id = allocateNode(d);
      if (id < 0) {
          return id;
      }
      freeBytes -= runLength(id);
      return id;
  }
  ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  private int allocateNode(int d) {
      int id = 1;
      int initial = - (1 << d); // has last d bits = 0 and rest all = 1
      byte val = value(id);
      if (val > d) { // unusable
          return -1;
      }
      while (val < d || (id & initial) == 0) { // id & initial == 1 << d for all ids at depth d, for < d it is 0
          id <<= 1;
          val = value(id);
          if (val > d) {
              id ^= 1;
              val = value(id);
          }
      }
      byte value = value(id);
      assert value == d && (id & initial) == 1 << d : String.format("val = %d, id initial = %d, d = %d",
                  value, id & initial, d);
      setValue(id, unusable); // mark as unusable
      updateParentsAlloc(id);//标志父节点使用情况
      return id;
   }

  ↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
  void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
      int memoryMapIdx = memoryMapIdx(handle);
      int bitmapIdx = bitmapIdx(handle);
      if (bitmapIdx == 0) {
          byte val = value(memoryMapIdx);
          assert val == unusable : String.valueOf(val);
          buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
                   arena.parent.threadCache());//调用ByteBuf初始化
      } else {
          initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
      }
  }

SubPage级别的内存分配(例:allocateTiny())
涉及数据结构:


TinySubpagePools.png
//PooledThreadCache
1. 定位到一个SubPage对象
2. 初始化SubPage
3. 初始化PooledByteBuf
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
    //略
    if (isTiny(normCapacity)) { // < 512
        if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
            // was able to allocate out of the cache so move on
            return;
        }
        tableIdx = tinyIdx(normCapacity);
        table = tinySubpagePools;
    }
    //略
    synchronized (this) {
          final PoolSubpage<T> head = table[tableIdx];
                final PoolSubpage<T> s = head.next;
              if (s != head) {
                  assert s.doNotDestroy && s.elemSize == normCapacity;
                  long handle = s.allocate();
                  assert handle >= 0;
                  s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                  return;
          }
      }
      //略
      allocateNormal(buf, reqCapacity, normCapacity);
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolArena
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
        q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
        q075.allocate(buf, reqCapacity, normCapacity)) {
          return;
        }
    // Add a new chunk.
    PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    c.initBuf(buf, handle, reqCapacity);
    qInit.add(c);
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolChunk
long allocate(int normCapacity) {
    if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
        return allocateRun(normCapacity);
    } else {
        return allocateSubpage(normCapacity);//SubPage走这一步
    }
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolChunk
private long allocateSubpage(int normCapacity) {
    // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
    // This is need as we may add it back and so alter the linked-list structure.
    PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
    synchronized (head) {
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        int id = allocateNode(d);
        if (id < 0) {
          return id;
        }
        final PoolSubpage<T>[] subpages = this.subpages;
        final int pageSize = this.pageSize;
        freeBytes -= pageSize;
        int subpageIdx = subpageIdx(id);
        PoolSubpage<T> subpage = subpages[subpageIdx];
        if (subpage == null) {
            subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
            subpages[subpageIdx] = subpage;
        } else {
            subpage.init(head, normCapacity);
        }
        return subpage.allocate();
    }
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolSubPage
void init(PoolSubpage<T> head, int elemSize) {//elemSize指可以划分多少份
    doNotDestroy = true;
    this.elemSize = elemSize;
    if (elemSize != 0) {
        maxNumElems = numAvail = pageSize / elemSize;
        nextAvail = 0;
        bitmapLength = maxNumElems >>> 6;
        if ((maxNumElems & 63) != 0) {
            bitmapLength ++;
        }
        for (int i = 0; i < bitmapLength; i ++) {
           bitmap[i] = 0;//标识哪个SubPage是被分配的,0未分配,1已分配
        }
    }
    addToPool(head);
}
//PoolSubPage
private void addToPool(PoolSubpage<T> head) {
    assert prev == null && next == null;
    prev = head;
    next = head.next;
    next.prev = this;
    head.next = this;
}

初始化SubPage完成


初始化完成SubPage.png
//PoolChunk
private long allocateSubpage(int normCapacity) {
    //略
    return subpage.allocate();
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolSubPage
/**
  * Returns the bitmap index of the subpage allocation.
  * 1. 找到一个未被使用的SubPage
  */
long allocate() {
    if (elemSize == 0) {
        return toHandle(0);
    }
    if (numAvail == 0 || !doNotDestroy) {
        return -1;
    }
    final int bitmapIdx = getNextAvail();
    int q = bitmapIdx >>> 6;
    int r = bitmapIdx & 63;
    assert (bitmap[q] >>> r & 1) == 0;
    bitmap[q] |= 1L << r;
    if (-- numAvail == 0) {
        removeFromPool();
    }
    return toHandle(bitmapIdx);
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolSubPage
private long toHandle(int bitmapIdx) {
    return 0x4000000000000000L | (long) bitmapIdx << 32 | memoryMapIdx;
 }
handle的构成.png
//PoolArena
// Method must be called inside synchronized(this) { ... }block
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
    //略
    long handle = c.allocate(normCapacity);
    assert handle > 0;
    c.initBuf(buf, handle, reqCapacity);
    //略
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolChunk
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
    int memoryMapIdx = memoryMapIdx(handle);
    int bitmapIdx = bitmapIdx(handle);
    if (bitmapIdx == 0) {
        byte val = value(memoryMapIdx);
        assert val == unusable : String.valueOf(val);
        buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
                 arena.parent.threadCache());
    } else {
        initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
    }
}
↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓
//PoolChunk
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
    assert bitmapIdx != 0;
    int memoryMapIdx = memoryMapIdx(handle);
    PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
    assert subpage.doNotDestroy;
    assert reqCapacity <= subpage.elemSize;
    //memoryMapIdx+第几个subPage与0x3FFFFFFF的结果*SubPage大小为内存偏移量
    buf.init(
        this, handle,
        runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
            reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容