1. 概述
两个出现频率很高的工具类:
FastThreadLocal
从字面上看,作用和ThreadLocal相当,但是它的速度更快。ThreadLocal是在访问成员变量的时候,通过线程本地化的方式避免多线程的竞争,在保证状态一致性的同时优化程序性能。FastThreadLocal重新实现了jdk的ThreadLocal的功能,并且访问速度更快。Recycler
轻量级的对象池。作用快速的创建对象,另外一方面避免反复创建对象,能够减少jvm的gc压力。Netty使用Recycler来获取ByteBuf对象,因为ByteBuf对象创建非常频繁,并且ByteBuf也比较占用空间。
2. FastThreadLocal的使用
demo:
public class FastThreadLocalTest {
private static FastThreadLocal<Object> threadLocal = new FastThreadLocal<Object>() {
@Override
protected Object initialValue() throws Exception {
return new Object();
}
};
public static void main(String[] args) {
new Thread(() -> {
Object obj = threadLocal.get();
// ... do with obj
System.out.println(obj);
}).start();
new Thread(() -> {
Object obj = threadLocal.get();
// ... do with obj
System.out.println(obj);
}).start();
}
}
FastThreadLocal的作用是将变量封闭到线程里。每个线程从FastThreadLocal拿到的变量都是线程独享;一个线程对变量的修改不会影响其他的线程。
3. FastThreadLocal的实现机制
从以下三个方面讨论:
- FastThreadLocal的创建
- FastThreadLocal的get()方法实现
- FastThreadLocal的set() 方法实现
** FastThreadLocal的创建**
每个FastThreadLocal都有一个唯一的index值,这一点对于它很重要。
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
InternalThreadLocalMap的nextVariableIndex方法:
static final AtomicInteger nextIndex = new AtomicInteger();
public static int nextVariableIndex() {
int index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local indexed variables");
}
return index;
}
FastThreadLocal的get()方法实现
- 获取ThreadLocalMap
我们看FastThreadLocal的get()方法:
- 获取ThreadLocalMap
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
分析InternalThreadLocalMap的get(),可以看到如果线程是FastThreadLocalThread线程,走fastGet();如果不是则走slowGet():
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
}
}
我们来看fastGet(),实际上是FastThreadLocal直接绑定了一个ThreadLocalMap,返回它即可:
private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {
InternalThreadLocalMap threadLocalMap = thread.threadLocalMap(); // 直接返回线程里面的threadLocalMap成员变量
if (threadLocalMap == null) {
thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());
}
return threadLocalMap;
}
slowget()是使用Jdk的ThreadLocal,然后从ThreadLocal中获取Map。
private static InternalThreadLocalMap slowGet() {
ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = UnpaddedInternalThreadLocalMap.slowThreadLocalMap;
InternalThreadLocalMap ret = slowThreadLocalMap.get();
if (ret == null) {
ret = new InternalThreadLocalMap();
slowThreadLocalMap.set(ret);
}
return ret;
}
UnpaddedInternalThreadLocalMap.slowThreadLocalMap
即是Jdk的ThreadLocal
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<InternalThreadLocalMap>();
- 直接通过索引取出对象
索引就是每个FastThreadLocal的index。
- 直接通过索引取出对象
- 初始化
4. 轻量级对象池Recycler
我们来看Recycler简单的用法:
public class RecycleTest {
private static final Recycler<User> RECYCLER = new Recycler<RecycleTest.User>() {
@Override
protected RecycleTest.User newObject(Handle<RecycleTest.User> handle) {
return new RecycleTest.User(handle); // recycler中没对象可以用时,直接创建一个;handle是负责回收对象的
}
};
private static class User {
private final Recycler.Handle<User> handle;
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
public static void main(String[] args) {
User user = RECYCLER.get();
user.recycle();
User user1 = RECYCLER.get();
System.out.println(user == user1);
}
}
使用轻量级回收池的好处,避免内存的对象的重复创建和回收,显著减小Minor GC的频率。
5. Recycler的创建过程
每个Recycler都有一个FastThreadLocal:
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
8. 回收对象到Recycle
- 同线程回收对象
- 异线程回收对象(当前线程创建的对象,其他线程进行回收)
看recycle入口,user.recycle(),会调用handle.recycle():
private static class User {
private final Recycler.Handle<User> handle;
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
最终调用stack.push
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
看push方法:
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) { // 判断recycler保存的是否是当前线程
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item); //如果是当前线程直接入队
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread); // 如果不是,先发送入队通知
}
}
看pushNow:
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) { // 第一次回收,这两个值都应该是0
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID; // 原子类取得的值,它的值是唯一且固定
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) { // 达到32k,直接扔掉
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) { // 扩容,重新创建一个两倍大小的数组
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
分析dropHandle:
boolean dropHandle(DefaultHandle<?> handle) {
if (!handle.hasBeenRecycled) { // 如果之前没有被回收过
if ((++handleRecycleCount & ratioMask) != 0) { // handleRecycleCount - 当前为止被回收了几个;ratioMask -默认是7,回收7/8的对象
// Drop the object.
return true;
}
handle.hasBeenRecycled = true;
}
return false;
}
我们来看下DefaultHandle:
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;
boolean hasBeenRecycled;
private Stack<?> stack; // handle绑定一个stack,说明handle是在这个队列中的。
private Object value; // handle和value一一对此应
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
}
9. Recycler异线程回收对象
在一个线程中创建的对象,在另一个线程进行回收,称为异线程回收对象。
异线程回收对象的几个过程:
- 获取WeakOrderQueue(用于存放在其他线程创阿金的对象)
- 创建WeakOrderQueue(将其它线程绑定到该线程的WeakOrderQueue)
- 将对象追加到WorkOrderQueue(完成其它线程中创建的对象的回收)
获取WeakOrderQueue
pushLater就是异线程回收
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
看代码:
private void pushLater(DefaultHandle<?> item, Thread thread) {
// we don't want to have a ref to the queue as the value in our weak map
// so we null it out; to ensure there are no races with restoring it later
// we impose a memory ordering here (no-op on x86)
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) { // 表示线程2以前从未回收过线程1的对象
if (delayedRecycled.size() >= maxDelayedQueues) { // 超限,表明线程2 不能再回收其他线程的对象了
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY); // 做一个标记
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object,什么也不做相当于把这个要回收的东西丢掉了
return;
}
queue.add(item);
}
我们先来看DELAYED_RECYCLED,它是一个FastThreadLocal,里边封闭的对象是Map<Stack<?>, WeakOrderQueue>,表示每个线程都有一个Map<Stack<?>, WeakOrderQueue>,map的key是Stack,就表示对一个线程来说,我的Stack关联的是别的线程的WeakOrderQueue:
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED =
new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
假设是线程1 中创建了对象,线程2在执行pushLater。WeakOrderQueue queue = delayedRecycled.get(this);
这个this指的是线程1的Stack,表示通过线程1的Stack找到了它关联的WeakOrderQueue(线程2的)。
创建WeakOrderQueue
我们先来看下WeakOrderQueue的结构
WeakOrderQueue里边维护了一个Link链表,而Link中有一个Handle数组(默认有16个元素)和readIndex记录当前Link对应Link数组中哪个Link。而这个Handle正好是Stack中的Handle。
我们来看创建的WeakOrderQueue如何与待回收的线程进行绑定
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) { // 创建
// drop object
return;
}
delayedRecycled.put(this, queue); // 绑定
WeakOrderQueue.allocate(this, thread)
,这里的this指的是线程1 的Stack,thread指的是线程2。
/**是否有空间分配WeakOrderQueue,有就分配一个,没有返回null
* Allocate a new {@link WeakOrderQueue} or return {@code null} if not possible.
*/
static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
// We allocated a Link so reserve the space
// stack.availableSharedCapacity - 允许外部线程给它缓存多少k对象,LINK_CAPACITY表示当前线程能接纳多少需要缓存的对象。
return Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
? newQueue(stack, thread) : null;
}
我们来看Head.reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
:
// availableSharedCapacity 允许外部线程给它缓存多少个对象(默认16K)
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
// LINK_CAPACITY 默认16
LINK_CAPACITY = safeFindNextPositivePowerOfTwo(
max(SystemPropertyUtil.getInt("io.netty.recycler.linkCapacity", 16), 16));
看Head.reserveSpace方法:
// 比如第一个是50,第二个是10,就表示:thread2可以回收thread1中创建的10个对象(最多了,没空间了)。然后thread1中海油40个对象可以被别的线程回收
static boolean reserveSpace(AtomicInteger availableSharedCapacity, int space) {
assert space >= 0;
for (;;) {
int available = availableSharedCapacity.get();
if (available < space) {
return false;
}
if (availableSharedCapacity.compareAndSet(available, available - space)) {
return true;
}
}
}
创建WeakOrderQueue
static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
final WeakOrderQueue queue = new WeakOrderQueue(stack, thread); // 创建线程2的weakOrderQueue
// Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
// may be accessed while its still constructed.
stack.setHead(queue); // 把它插入线程1Stack的头部
return queue;
}
private WeakOrderQueue(Stack<?> stack, Thread thread) {
tail = new Link();
// Its important that we not store the Stack itself in the WeakOrderQueue as the Stack also is used in
// the WeakHashMap as key. So just store the enclosed AtomicInteger which should allow to have the
// Stack itself GCed.
head = new Head(stack.availableSharedCapacity);
head.link = tail;
owner = new WeakReference<Thread>(thread);
}
看示意图:
将对象追加到WeakOrderQueue
pushLater中的queue.add(item)
:
void add(DefaultHandle<?> handle) {
handle.lastRecycledId = id; // 这个id是WeakOrderQueue的id,表明是它进行回收的
Link tail = this.tail;
int writeIndex;
if ((writeIndex = tail.get()) == LINK_CAPACITY) { //如果当前link不可写
if (!head.reserveSpace(LINK_CAPACITY)) { // 再去分配一段Link,不允许直接drop掉
// Drop it.
return;
}
// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link(); // 允许的话,给tail进行赋值,并创建一个Link
writeIndex = tail.get(); // 拿到写指针
}
tail.elements[writeIndex] = handle;
handle.stack = null; // 这个handle已经不属于stack
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
// this also means we guarantee visibility of an element in the queue if we see the index updated
tail.lazySet(writeIndex + 1); // 写指针+1
}
我们来看new Link():
static final class Link extends AtomicInteger {
private final DefaultHandle<?>[] elements = new DefaultHandle[LINK_CAPACITY];
private int readIndex;
Link next;
}
static final class DefaultHandle<T> implements Handle<T> {
private int lastRecycledId;
private int recycleId;
boolean hasBeenRecycled;
private Stack<?> stack;
private Object value;
DefaultHandle(Stack<?> stack) {
this.stack = stack;
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this);
}
}
9. 异线程获取对象
第8小节已经提到,当size > 0,会从当前线程的stack获取对象,如果size == 0,则会从当前线程WeakOrderQueue队列中获取对象。
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) { // 从WeakOrderQueue中获取对象
return null;
}
size = this.size;
}
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
我们来看scavenge()方法,将当前WeakOrderQueue中的Link尽可能传输到当前Stack中,如果成功返回true,如果失败返回false:
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) { // 已经回收到了就返回
return true;
}
// reset our scavenge cursor,节点重置,下次从头部开始回收
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor; // 当前需要回收的WeakOrderQueue
if (cursor == null) {
prev = null;
cursor = head;
if (cursor == null) { // WeakOrderQueue列表为空
return false;
}
} else {
prev = this.prev;
}
boolean success = false;
do { // 遍历WeakOrderQueue链表
if (cursor.transfer(this)) { // 把WeakOrderQueue中的对象传输到Stack
success = true;
break;
}
WeakOrderQueue next = cursor.next;
if (cursor.owner.get() == null) { // owner中装的是线程,如果这个WeakOrderQueue的线程不存在了,就做一些清理工作
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) { // 将数据传到stack,一次传输WeakOrderQueue中的一个Link
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
来看cursor.transfer()
方法,传输WeakOrderQueue中的数据到Stack。
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) { // 说明当前Link中的元素已被取完
if (head.next == null) { // WeakOrderQueue没有Link了
return false;
}
this.head.link = head = head.next; // 有多个Link时就指向下一个Link
}
final int srcStart = head.readIndex; // 取对象的开始位置
int srcEnd = head.get(); // head就是Link,Link继承自AtomicInteger,这个表示当前Link里面长度是多少
final int srcSize = srcEnd - srcStart; // 需要转移多少个元素
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size; // 当前stack大小
final int expectedCapacity = dstSize + srcSize; // 预期容量
if (expectedCapacity > dst.elements.length) { // 底层是数组,扩容
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) { // 开始具体传输
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) { // 为0表示没有被回收过
element.recycleId = element.lastRecycledId; // 更新RecycledId
} else if (element.recycleId != element.lastRecycledId) { // 这里说明它被其他线程回收过
throw new IllegalStateException("recycled already");
}
srcElems[i] = null; // 将Link的当前element置为null
if (dst.dropHandle(element)) { // 扔掉7/8元素,控制回收的频率
// Drop the object.
continue;
}
element.stack = dst; // 简单赋值
dstElems[newDstSize ++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) { // 当前Link已经回收完毕,且Link后边还有Link
// Add capacity back as the Link is GCed.
this.head.reclaimSpace(LINK_CAPACITY); // 已经转移了一个Link,告诉WeakOrderQueue以后我这里可以存放你回收的对象啦
this.head.link = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) { // 说明没有向Stack传输任何对象,直接返回
return false;
}
dst.size = newDstSize; // 更新stack大小
return true;
} else { // 表示当前Stack已经满了
// The destination stack is full already.
return false;
}
}
10. 性能优化总结
FastThreadLocal
FastThreadLocal主要解决线程变量隔离的问题。
FastThreadLocal实现机制,每个FastThreadLocal在Jvm中都有一个唯一的id,在当前线程获取FastThreadLocal的时候,如果当前线程本身就是FastThreadLocal,那么它会从当前线程直接拿到ThreadLocalMap,因为它是FastThreadLocal的一个成员变量;拿出之后,ThreadLocalMap本身是一个数组,如果我们要拿当前线程的对象,直接用id作为下标从ThreadLocalMap的数组中拿出,算法复杂度是O(1)。Jdk中从map中拿存在Hash碰撞,效率没有用下标从数组中取高。如果当前线程不是FastThreadLocal,netty会返回一个Jdk的ThreadLocal,然后从里面取对象。
轻量级对象池Recycler
一方面可以快速创建对象,一方面不需要频繁创建新的对象,减少了Yong GC的频率。
对象池的存储机制有两部分组成,首先是当前线程的一个栈Stack,另外绑定到当前线程的其它线程的WeakOrderQueue。获取对象的时候,先从当前线程的Stack中获取,如果获取不到从Stack关联的WeakOrderQueue获取,还是获取不到,就通过new的方式去创建对象。
回收过程也类似,如果线程1创建的对象,就直接通过线程1的Stack去回收;去其它线程回收是怎样的呢?如果当前Stack没有与之关联的WeakOrderQueue,它就会创建一个,然后就绑定Stack与WeakOrderQueue;然后就将回收到的对象放入到WeakOrderQueue中。