内存分配概述
介绍netty内存分配,最为底层,负责从底层读据到ByteBuf。
三个问题
+内存类别有哪些
+如何减少多线程内存分配之间的竞争
+不同大小的内存是如何进行分配的
主要内容:
- 内存与内存管理器的抽象
- 不同规格大小和不同类型的内存的分配策略
- 内存回收过程
ByteBuf结构以及重要API
- ByteBuf结构
ByteBuf内存结构
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
+read、write、set方法
调用read方法时,会移动readerIndex指针;
调用write方法时,会移动writerIndex指针;
set方法不移动任何指针。
mark和reset方法
mark方法的作用时保存指针。
reset方法复原指针位置。readbleBytes和writableBytes、maxWritableBytes
见名思义。
ByteBuf分类
ByteBuf类图
- Pooled和Unpooled
Pooled从预先分配好的内存中分配;Unpooled直接调用系统api进行内存分配。 - Unsafe和非Unsafe
Unsafe:调用jdk的Unsafe拿到ByteBuf的内存地址。
非Unsafe:不依赖底层unsafe。 - Heap和Direct
Heap:直接在堆上内存分配,分配的内存参与gc,分配的内存不需要手动释放,底层是byte[]数组。
Direct:分配的内存不受 的控制,分配的内存不参与gc,分配的内存需要手动释放,调用jdk的ByteBuffer.allocateDirect(initialCapacity)进行内存分配。
内存分配器ByteBufAllocator
ByteBufAllocator功能
最顶层抽象ByteBufAllocator,功能:重载的buffer方法,重载的ioBuffer(更希望分配directBuffer),heapBuffer在堆上进行内存分配,directBuffer直接内存分配,compositeBuffer可以把两个byteBuffer合并在一起。AbstractByteBufAllocator
实现了ByteBufAllocator的大部分功能,留下了两个抽象接口newHeapBuffer,newDirectBuffer进行扩展,从而区分heap和direct内存。ByteBufAllocator两大子类
ByteBufAllocator分类:
ByteBufAllocator的两大子类PooledByteBufAllocator和UnpooledByteBufAllocator,这里是通过子类区分Pooled和Unpooled。
那么Unsafe和非Unsafe是如何区分的呢?netty是自动判别的,如果底层有unsafe
对象netty就直接通过Unsafe来分配内存。
UnpooledByteBufAllocator分析
- heap内存分配逻辑
- direct内存分配逻辑
unsafe会通过内存地址+偏移量的方式去拿到对应的数据;而非unsafe是通过数组+下标或者jdk底层的ByteBuffer的api拿数据。一般情况下通过unsafe操作内存比非unsafe的方式效率要高。
PooledByteBufAllocator概述
首先它下面分了两类内存,newHeapBuffer和newDirectBuffer,这两类内存的分配过程大致相同,我们来分析newDirectBuffer。
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
- 拿到线程局部缓存PoolThreadCache
因为newDirectBuffer可能被多线程调用
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
// 拿到 heapArena 和 directArena ;然后创建PoolThreadCache
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
FastThreadLocal实际上是一个更快的ThreadLocal,从这里看出每个线程都有一个PoolThreadCache 。
- 在线程局部缓存的Arena上进行内存分配
线程局部缓存维护着两大内存,一个是堆相关的内存,一个是堆外相关的内存。我们拿堆外内存相关的逻辑进行分析。
heapArena和directArena是在创建PoolThreadCache的时候传递进来的,见上面initialValue代码。
在创建内存构造器PooledByteBufAllocator的时候会创建两大内存heapArena和directArena,我们来看构造函数。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
//heapArena初始化
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
//directArena初始化
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
heapArena 初始化heapArenas = newArenaArray(nHeapArena);
;directArena初始化directArenas = newArenaArray(nDirectArena);
。
我们来看上述构造函数的nHeapArena和nDirectArena从哪里来的,往上跟代码:
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
看DEFAULT_NUM_DIRECT_ARENA怎么来的。默认情况下defaultMinNumArena是小于runtime.maxMemory() / defaultChunkSize / 2 / 3)的,所以 DEFAULT_NUM_DIRECT_ARENA默认情况下是两倍的cpu核数。DEFAULT_NUM_HEAP_ARENA也是同理。为什么要创建两倍的cpu核心数的Arena?因为在前面创建NIO线程的时候也是默认两倍的cpu核心数,也就是说每个线程都有一个独享的Arena,对arena数组中的每个Arena它其实在分配线程的时候是不用加锁的。
/*
* We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
* number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
* allocation and de-allocation needs to be synchronized on the PoolArena.
*
* See https://github.com/netty/netty/issues/3888.
*/
// 两倍的cpu核心数
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
我们来看下PooledByteBufAllocator内存分配器(假设有四个NIO线程)的结构示意图:
图中有四个NIO线程,通过我们前面的代码分析我们知道分别有4个heapArena和4个directArena,逻辑基本上是相同的,我们在图中统称为Arena。
PooledByteBufAllocator在分配ByteBuf时候是怎么做的呢?首先通过PoolThreadCache拿到对应的Arena对象;PooledThreadCache的作用通过ThreadLocal把内存分配器中其中的一个Arena塞到它的成员变量里边,然后当每个NIO线程去调用它的get方法的时候,会拿到它底层的一个Arena,这样就可以把线程和Arena进行一个绑定。PooledByteBufAllocator除了可以在Arena上进行分配内存还可以在它底层维护的ByteBuf缓存列表上分配内存。
举个例子,当我第一次分配了1024个字节的内存大小使用完了之后,需要第二次分配1024字节的内存。这个时候其实不需要在Arena上进行内存分配,而是通过PoolThreadCache里边维护的一个缓存列表中取出返回即可。
PooledByteBufAllocator里边维护了三个类型的ByteBuf缓存的大小,tinyCacheSize,smallCaheSize,normalCacheSize,在PoolThreadCache初始化的时候使用到了这三个值,
PoolThreadCache的构造函数
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 创建缓存对象
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
创建缓存对象的方法:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
// 创建cache
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
创建缓存对象中的每个元素:
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
MemoryRegionCache(int size, SizeClass sizeClass) {
// size缓存的内存规格
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
// queue这种内存规格的缓存最终有多少个
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
7. directArena分配direct内存的流程
- 从对象池里拿到PooledByteBuf进行复用
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
看 directArena.allocate
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 创建PooledByteBuf
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 从cache中为PooledByteBuf分配内存
allocate(cache, buf, reqCapacity);
return buf;
}
我们来看newByteBuf,DirectArena中的实现分配对外内存
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) { // 默认采用unsafe方式
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
看newInstance:
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
// 从可回收的对象池中拿到ByteBuf,对象池中没有就直接创建一个
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity); // 进行复用,设置capacity,引用次数,readerIndex,writerIndex,重置标志位
return buf; // 拿到纯净的ByteBuf对象
}
我们看RECYCLE:
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0); // RECYCLER 没有就创建一个ByteBuf,handle负责ByteBuf对象的回收
}
};
内存分配第一步拿到了ByteBuf,接下来从PoolThreadCache上进行内存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 以上尝试在缓存上进行内存分配,如果没有成功,会进行实际内存分配
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) { // 这里是个特例,如果分配的内存大于chunkSize就分配一个allocateHuge
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// allocateHuge不是从缓存上分配的
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
allocate实际上包含两大步骤,第一步先从缓存上进行内存分配,第二步从内存堆里面进行内存分配。
- allocate 从缓存上进行内存分配
- 从内存堆里面进行内存分配
8. 内存规格介绍
内存临界值:0,512B,8K,16M
tiny: 0-512B
small: 512B-8K
normal:8k-16M
huge:>16M
为什么把16M作为一个内存分界点?16M对应的一个chunk,所有的内存申请是以chunk为单位到操作系统进行申请的;然后所有ByteBuf的内存分配,都是在chunk里边进行操作;比如要分配一个1M的内存,我要先从操作系统中申请一个16M的chunk,然后从16M里取一段内存当作1M,然后把这1M对应的连续内存分配给ByteBuf。
为什么会有一个8k的内存临界点?netty里面把8k当作一个page进行内存分配的。从系统申请到了16M的内存,这是比较大的。这时候netty对16M的内存进行切分,切分的方式就是以Page进行切分。也就是一个chunk切分成了2048个page,分配16k内存时只需要取2个page。
0-8k的内存对象在netty中叫做subPage。如果申请一个10B的内存还是以page进行分配内存,这样就会很浪费,这时候就能看到了subPage的作用。
9. 命中缓存的分配流程
看allocate方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize,pageSize是8k
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
- 首先进行内存的规格化;
int normalizeCapacity(int reqCapacity) {
checkPositiveOrZero(reqCapacity, "reqCapacity");
if (reqCapacity >= chunkSize) { // >16M直接返回
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
分配缓存的大致步骤:
- 找到对应size的MemoryRegionCache;
- 从queue中弹出一个entry给ByteBuf初始化
entry里面有chunk,代表一段连续的内存,chunk分配一段连续内存给ByteBuf,ByteBuf就可以对这段内存进行数据读写。 - 将弹出的entry丢到对象池中进行复用
netty为了尽量对分配的内存进行复用,是通过RECYCLE进行管理内存的。减少gc,减少对象池重复的创建和销毁。
以cache.allocateTiny(this, buf, reqCapacity, normCapacity)
为例,来说明上述三个步骤。
- 找到对应size的MemoryRegionCache
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity); // 计算数组下标,根据下标去取MemoryRegionCache
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
// 容量除以16就能求出下标
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
- 从queue中弹出一个entry给ByteBuf初始化
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 初始化
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf( // subPage初始化
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk; // 向系统申请的内存块
memory = chunk.memory;
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle; // 指向内存块
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
}
- 将弹出的entry丢到对象池中进行复用
看entry.recycle();
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() { // 参数初始化设置
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this); // 压入堆栈中
}
10. 命中缓存的分配逻辑
-
netty中缓存相关的数据结构
Netty中缓存相关的数据结构叫做MemoryRegionCache,它有三部分组成:第一部分是queue,第二部分是sizeClass,第三部分是size。
首先queue中的每个元素都是一个实体,每个实体中都有一个chunk一个handler。netty中的内存都是以chunk为单位进行分配的,handler都唯一指向一段连续的内存;所以chunk和handler合在一起就可以确定一块内存的大小及其位置,所有的实体组合起来就变成了cache的一个链。从缓存中找对应的链,就可以定位到queue中的一个实体。
sizeClass是netty的内存规格,huge内存规格是直接分配的,所以MemoryRegionCache中没有。
size是一小块内存的大小。
一个MemoryRegionCahe中,每个小块的内存大小是固定的。如果某个MemoryRegionCache中缓存了一个1k的内存块,那么这个MemoryRegionCache中queue缓存的都是1k大小的ByteBuf。内存大小的种类,如果内存规格是tiny的,它的内存大小种类16B的整数倍且不大于512B,别的内存规格可从图示直接看出。
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
// ...
}
MemoryRegionCahe 在PoolThreadCache中维护。
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// ...
}
创建MemoryRegionCache:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 创建tiny[32]数组
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
来看创建tiny[32]
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
tinyCacheSize是在内存分配器中维护的,默认512;PoolArena.numTinySubpagePools,默认512 >>> 4,512右移4位,相当于512除以16,也就是32。
创建数组,数组长度就是32:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
// cacheSize,这里是512
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
继续跟进:
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); // 这里还是512
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
分析到这里可以看到tinySubPageDirectCaches(MemoryRegionCache)最外层有32个节点(SubPageMemoryRegionCache),每个节点表示不同内存规格(16B,32B,...,496B)的一个队列,每个队列的长度默认是512个。
11. arena、chunk、page、subpage
-
Arena结构
最外层是chunckList的数据结构,每个chunkList通过双向链表进行连接,每个节点都是一个chunk,每个chunk是向操作系统申请内存的最小单位16M。chunkList为什么通过双向链表连接起来呢,netty会实时计算chunk的实时分配情况,按照内存使用率归为不同的chunkList,这样进行内存分配时,netty会根据一定的算法定位到合适的chunkList,然后取其中的一个chunk进行内存分配
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// ...
}
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 初始化chunkList
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
-
Chunk的结构
chunk将里边的内存按8k拆分成了page,每个page又拆分为了4个subPage。
12. page级别的内存分配:allocateNormal()
我们看PoolArena中allocateNormal代码片段:
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // 从cache中分配
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity); // 从arena中分配
++allocationsNormal;
}
}
从arena中allocateNormal:
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk. 创建一个chunk进行内存分配
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
- 尝试从现有的chunkList分配内存
+创建一个chunk进行内存分配
+初始化ByteBuf
13. subpage级别的内存分配:allocateTiny()
- 定位一个Subpage对象
- 初始化subpage
- 初始化PooledByteBuf
通过代码来调试:
public class TinyAllocate {
public static void main(String[] args) {
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
allocator.directBuffer(16);
}
}
来看allocate的部分代码片段:
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) { // 默认情况下,头节点是没有任何subpage相关的信息
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
我们来看tinySubpagePools的结构,默认情况下是和MemoryRegionCache的tiny结构是一样的。
tiny[32] 0 -> 16B -> 32B -> 48B -> ... 480B
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
14. ByteBuf的释放
- 连续的内存区段加到缓存
- 标记连续的内存区段为未使用
- ByteBuf加到对象池