上一节眼研究了PooledByteBufAllocator
分配内存的前两个步骤,通过ThreadLocal
的方式拿到PoolThreadCache
之后,获取对应的Arena
。那么之后就是Arena
具体分配内存的步骤,正是本节研究学习的内容。
- 入口
PooledByteBufAllocator#newDirectBuffer()
方法种有如下代码:
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- 可以看到分配的过程如下:
- 拿到
PooledByteBuf
对象 - 从
cache
中分配内存,并重置相关属性
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//拿到PooledByteBuf对象,仅仅是一个对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//从cache种分配内存,并初始化buf种内存地址相关的属性
allocate(cache, buf, reqCapacity);
return buf;
}
- 先看第一步
newByteBuf(maxCapacity);
拿到PooledByteBuf
对象
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) {
//获取一个PooledByteBuf
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
//从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
//buf可能是从回收站拿出来的,要进行复用
buf.reuse(maxCapacity);
return buf;
}
Recycler
是一个基于线程本地堆栈的对象池。Recycler
维护了一个ThreadLocal
成员变量,用于返回一个stack
给回收处理器DefaultHandle
,该处理器通过维护这个堆栈来维护PooledUnsafeDirectByteBuf
缓存。
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
//Recycler负责用回收处理器handler维护PooledUnsafeDirectByteBuf
//handler底层持有一个stack作为对象池,维护对象池,handle同时负责对象回收
//存储handler为成员变量,使用完该ByteBuf可以调用回收方法回收
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
//维护了一个`ThreadLocal`,`initialValue`方法返回一个堆栈。
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
- 再跟踪
Recycler#get()
方法
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
//获取对应的堆栈,相当一个回收站
Stack<T> stack = threadLocal.get();
//从栈顶拿出一个来DefaultHandle(回收处理器)
//DefaultHandle持有一个value,其实是PooledUnsafeDirectByteBuf
DefaultHandle<T> handle = stack.pop();
//没有回收处理器,说明没有闲置的ByteBuf
if (handle == null) {
//新增一个处理器
handle = stack.newHandle();
//回调,还记得么?该回调返回一个PooledUnsafeDirectByteBuf
//让处理器持有一个新的PooledUnsafeDirectByteBuf
handle.value = newObject(handle);
}
//如果有,则可直接重复使用
return (T) handle.value;
}
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
//回调initialize
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
//回调
v = initialValue();
} catch (Exception e) {
PlatformDependent.throwException(e);
}
threadLocalMap.setIndexedVariable(index, v);
addToVariablesToRemove(threadLocalMap, this);
return v;
}
DefaultHandle<T> newHandle() {
//实例化一个处理器并并且初四话成员变量,该成员变量stack从threalocal中初始化
return new DefaultHandle<T>(this);
}
DefaultHandle
用stack
作为缓存池维护PooledUnsafeDirectByteBuf
,同理PooledDirectByteBuf
也是一样的。只不过实例化的对象的实现不一样而已。
同时,处理器定义了回收的方法是将兑现存回栈内,使用的时候则是从栈顶取出。
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;
boolean hasBeenRecycled;
//对象缓存池
private Stack<?> stack;
private Object value;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
/**
* 定义回收方法,回收对象到stack
* @param object
*/
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
//回收:将自己存进栈中缓存起来
stack.push(this);
}
}
- 到这我们刚刚看完第一步,到第二步重置缓存内指针的时候了 ,获取到
PooledUnsafeDirectByteBuf
的时候,有可能是从缓存中取出来的。因此需要复用
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
//从带有回收特性的对象池RECYCLER获取一个PooledUnsafeDirectByteBuf
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
//buf可能是从回收站拿出来的,要进行复用
buf.reuse(maxCapacity);
return buf;
}
final void reuse(int maxCapacity) {
//重置最大容量
maxCapacity(maxCapacity);
//设置引用
setRefCnt(1);
//重置指针
setIndex0(0, 0);
//重置标记值
discardMarks();
}
- 到这才刚刚完成分配内存的第一步(拿到
PooledByteBuf
对象),以上都是仅仅是获取并且用回收站和回收处理器管理这些对象,这些对象仍然只是一个对象,还没有分配实际的内存。
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
//拿到PooledByteBuf对象,仅仅是一个对象
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
//从cache种分配内存,并初始化buf种内存地址相关的属性
allocate(cache, buf, reqCapacity);
return buf;
}
- 跟踪
PoolArena#allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity)
其整体分配内存的逻辑是根据不同规格大小的内存需要来的,显示tiny
和small
规格的,再是normal
规格的。分配也是先尝试从缓存中进行内存分配,如果分配失败再从内存堆中进行内存分配。 当然,分配出来的内存回和第一步拿到的PooledByteBuf
进行绑定起来。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
//不同的规格大小进行内存分配
/**
* 分配整体逻辑(先判断tiny和small规格的,再判断normal规格的)
* 1. 尝试从缓存上进行内存分配,成功则返回
* 2. 失败则再从内存堆中进行分配内存
*/
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
//尝试tiny和small规格的缓存内存分配
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
//tiny和small规格的缓存内存分配尝试失败
//从内存堆中分配内存
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
//normal规格
//如果分配处出来的内存大于一个值(chunkSize),则执行allocateHuge
if (normCapacity <= chunkSize) {
//从缓存上进行内存分配
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
//缓存没有再从内存堆中分配内存
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}