在 第二章 ThreadLocal源码解析 详细分析了jdk本身的ThreadLocal源码。本节来看一下netty的FastThreadLocal的设计与源码解析。相关的源码已经抽取为一个框架:https://github.com/zhaojigang/concurrent-framework。
- 一、使用姿势
- 二、数据结构
- 三、源码分析
- 四、回收机制
- 五、FastThreadLocal优缺点
- 六、小工具:基于ConcurrentHashMap实现的ConcurrentSet
- 七、总结
一、使用姿势
private static final FastThreadLocal<Integer> fastThreadLocal1 = new FastThreadLocal<Integer>(){
@Override
protected Integer initialValue() throws Exception {
return 100;
}
@Override
protected void onRemoved(Integer value) throws Exception {
System.out.println(value + ":我被删除了");
}
};
private static final FastThreadLocal<String> fastThreadLocal2 = new FastThreadLocal<String>(){
@Override
protected String initialValue() throws Exception {
return "haha";
}
@Override
protected void onRemoved(String value) throws Exception {
System.out.println(value + ":我被删除了");
}
};
创建了两个FastThreadLocal实例,都重写了其initialValue()和onRemoved(Integer value)方法。
1.1、普通线程使用FastThreadLocal
@Test
public void testSetAndGetByCommonThread() {
Integer x = fastThreadLocal1.get();
String s = fastThreadLocal2.get();
fastThreadLocal1.set(200);
fastThreadLocal2.set("hehe");
}
1.2、FastThreadLocalThread使用FastThreadLocal
@Test
public void testSetAndGetByFastThreadLocalThread() {
new FastThreadLocalThread(()->{
Integer x = fastThreadLocal1.get();
String s = fastThreadLocal2.get();
fastThreadLocal1.set(200);
fastThreadLocal2.set("hehe");
}).start();
}
1.3、FastThreadLocalThread线程池使用FastThreadLocal
private static final Executor executor = FastThreadExecutors.newCachedFastThreadPool("test");
@Test
public void testSetAndGetByFastThreadLocalThreadExecutor() {
executor.execute(()->{
Integer x = fastThreadLocal1.get();
String s = fastThreadLocal2.get();
fastThreadLocal1.set(200);
fastThreadLocal2.set("hehe");
});
}
FastThreadExecutors是笔者自己写的一个线程全部是FastThreadLocalThread的线程池构造器。
1.4、FastThreadLocalThread线程池与promise结合使用
private static final AtomicInteger index = new AtomicInteger();
private static final Executor promiseTest = FastThreadExecutors.newCachedFastThreadPool("nettyPromiseTest");
@Test
public void testFastThreadPoolWithPromise() {
DefaultPromise<Void> promise = new DefaultPromise<>();
// 1、为promise添加10个监听器FutureListener
for (int i = 0; i < 10; i++) {
promise.addListener(future -> System.out.println("haha:" + index.getAndIncrement()));
}
// 2、使用线程池执行业务逻辑(这里只是设置promise的值,触发promise的监听器执行操作)
promiseTest.execute(() -> promise.setSuccess(null));
}
二、数据结构
对于jdk的ThreadLocal来讲,其底层数据结构就是一个Entry[]数组,key为ThreadLocal,value为对应的值(hash表);通过线性探测法解决hash冲突。对于FastThreadLocal来讲,底层数据结构就是单纯的简单数组Object[]。说明:
- 左图是FastThreadLocalThread(简称ftlt)使用FastThreadLocal(简称ftl)的结构图;
- 右图是普通线程使用FastThreadLocal和ThreadLocal(简称tl)的结构图。
2.1、ftlt使用ftl
- 每一个ftlt内部都有一个InternalThreadLocalMap实例,其底层数据结构就是一个Object[],初始length==32
- 数组的第一个元素index0存储一个
Set<FastThreadLocal<?>>
的set集合,存储所有有效的ftl。每当有一个ftl的value设置到数组中的时候,就会将当前的ftl对象添加到Object[0]的set集合中;每当有一个ftl的value被从数组中删除的时候,就会将当前的ftl对象从Object[0]的set集合中删除 - Object[]的其余元素存储ftl的value,注意存储的不是key-value对象,只是value,而数组下标index是一个ftl的实例属性(二者唯一对应)
2.2、普通线程使用ftl
ftl支持普通线程对其进行使用。InternalThreadLocalMap中有一个tl变量
static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<>();
普通线程在设置或者初始化(包括get时进行的初始化)ftl时,会将slowThreadLocalMap作为key,通过其hashcode进行计算之后,存储到ThreadLocalMap的Entry[]的某一个节点上。值得注意的是,其value就是一个InternalThreadLocalMap对象。之后该线程上所有对于ftl的操作,都将先获取slowThreadLocalMap这个tl的value--InternalThreadLocalMap对象,然后和2.1一样对其进行操作。
三、源码分析
3.1、ftl的创建
======================FastThreadLocal========================
/**
* 每一个FastThreadLocal都有一个唯一标识
*/
private final int index;
/**
* 每一个FastThreadLocal类都会将自己添加到indexedVariables[variablesToRemoveIndex]处的Set<FastThreaLocal<?>>
*/
private static final int VARIABLES_TO_REMOVE_INDEX = InternalThreadLocalMap.nextVariableIndex();
/**
* 创建一个FastThreadLocal
*/
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
}
====================== InternalThreadLocalMap ========================
/**
* FastThreadLocal的唯一索引生成器
*/
private static final AtomicInteger nextIndex = new AtomicInteger();
/**
* 获取FastThreadLocal的唯一索引
*/
public static Integer nextVariableIndex() {
Integer index = nextIndex.getAndIncrement();
if (index < 0) {
nextIndex.decrementAndGet();
throw new IllegalStateException("too many thread-local variable index");
}
return index;
}
ftl的创建很简单。
- 初始化一个类变量:VARIABLES_TO_REMOVE_INDEX,通常是0。该索引位置用于存储所有有效的ftl对象的set集合;
- 初始化一个index实例变量,该变量就是ftl的唯一标识,类比tl的threadLocalHashCode(实际上二者不同的是,ftl的index可以直接作为数组下标进行操作,而tl需要threadLocalHashCode&(len-1)才可以);该index也是绑定的ftl对象的value在Object[]数组中的索引位置;
- index的生成使用对一个全局不可变变量的cas操作,保证唯一性;最多不能超过Integer.MAX_VALUE
3.2、ftl的获取
/**
* 获取当前线程的InternalThreadLocalMap中的当前ftl的value
*/
public V get() {
// 1、获取InternalThreadLocalMap
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
// 2、从InternalThreadLocalMap获取索引为index的value,如果该索引处的value是有效值,不是占位值,则直接返回
Object value = threadLocalMap.indexedVariable(index);
if (value != InternalThreadLocalMap.UNSET) {
return (V) value;
}
// 3、indexedVariables[index]没有设置有效值,执行初始化操作,获取初始值
V initialValue = initialize(threadLocalMap);
// 4、注册资源清理器:当该ftl所在的线程不强可达(没有强引用指向该线程对象)时,清理其上当前ftl对象的value和set<FastThreadLocal<?>>中当前的ftl对象
registerCleaner(threadLocalMap);
return initialValue;
}
3.2.1 获取InternalThreadLocalMap
/**
* 获取InternalThreadLocalMap实例
*/
public static InternalThreadLocalMap get() {
Thread current = Thread.currentThread();
if (current instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) current);
}
return slowGet();
}
如果当前线程是ftlt线程,则使用fastGet进行获取;否则使用slowGet进行获取。先来看fastGet:
private static InternalThreadLocalMap fastGet(FastThreadLocalThread current) {
InternalThreadLocalMap threadLocalMap = current.threadLocalMap();
if (threadLocalMap == null) {
threadLocalMap = new InternalThreadLocalMap();
current.setThreadLocalMap(threadLocalMap);
}
return threadLocalMap;
}
对于每一个ftlt,都有一个属性:
/**
* 类比Thread的ThreadLocal.ThreadLocalMap threadLocals属性
*/
private InternalThreadLocalMap threadLocalMap;
如果该threadLocalMap已经实例化过,则直接返回,否则,先创建一个InternalThreadLocalMap实例,然后将该实例设置到ftlt的threadLocalMap属性中。
/**
* InternalThreadLocalMap的底层数据结构
* 其index就是FastThreadLocal的唯一标记index,
* value是相对应的FastThreadLocal所要存储的值
*/
private Object[] indexedVariables;
/**
* 无效的value值(占位符),不使用null做无效值的原因是因为netty认为null也是一个有效值,
* 例如:假设没有重写FastThreadLocal的initialValue()方法,则该方法返回为null,netty会将null作为有效值直接存储起来
*/
public static final Object UNSET = new Object();
/**
* 创建indexedVariables数组,并将每一个元素初始化为UNSET
*/
public InternalThreadLocalMap() {
indexedVariables = new Object[32];
Arrays.fill(indexedVariables, UNSET);
}
再来看一下slowGet:
/**
* 兼容非FastThreadLocalThread
*/
private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap = new ThreadLocal<>();
private static InternalThreadLocalMap slowGet() {
InternalThreadLocalMap threadLocalMap = slowThreadLocalMap.get();
if (threadLocalMap == null) {
threadLocalMap = new InternalThreadLocalMap();
slowThreadLocalMap.set(threadLocalMap);
}
return threadLocalMap;
}
这里就是典型的tl的操作了。之所以成为slowGet的原因是因为:
- fastGet可以直接从当前线程的属性获取;而slowGet需要根据slowThreadLocalMap的索引值与数组长度进行计算之后进行获取,如果没有直接根据索引命中的话,还可能需要进行线性探测的向后循环查找操作,当然还可能有一些清理和整理逻辑。
- fastGet设置InternalThreadLocalMap,直接给当前线程的属性赋值,而slowGet的set操作需要使用线性探测法进行设置,并会至少执行一次log级别的资源回收整理操作。
如上两点也是ftl比tl快的原因。但是可以看出tl在不断的回收无效的Entry使得新的Entry可以插入而不需要额外空间,但是ftl只能不断的增加index,不断向后增加,而index前边被remove掉的位置不能被重用,所以Object[]数组的size会越来越大。
3.2.2 从InternalThreadLocalMap获取值
/**
* 获取指定位置的元素
*/
public Object indexedVariable(int index) {
Object[] lookup = indexedVariables;
return index < lookup.length ? lookup[index] : UNSET;
}
3.2.3 初始化操作
private V initialize(InternalThreadLocalMap threadLocalMap) {
V v = null;
try {
//1、获取初始值
v = initialValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
// 2、设置value到InternalThreadLocalMap中
threadLocalMap.setIndexedVariables(index, v);
// 3、添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
addToVariablesToRemove(threadLocalMap, this);
return v;
}
/**
* 初始化参数:由子类复写
*/
protected V initialValue() throws Exception {
return null;
}
首先调用由子类复写的initialValue(),如果没有复写,则直接返回null;之后进行值的设置操作;
/**
* 设置值
*/
public boolean setIndexedVariables(int index, Object value) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
Object oldValue = lookup[index];
lookup[index] = value;
return oldValue == UNSET;
} else {
expandIndexedVariables(index, value);
return true;
}
}
如果索引小于indexedVariables.length,直接获取indexedVariables[index];否则,进行扩容设置。
值得注意的是,setIndexedVariables方法仅仅当时新增的时候返回true,假设是修改的话,oldValue就不等于UNSET了,则返回false。
private void expandIndexedVariables(int index, Object value) {
Object[] oldArray = indexedVariables;
int oldCapacity = oldArray.length;
/**
* 计算新数组容量:获取>index的最小的2的n次方的数,例如:1->2 2->4 3->4 4->8 5->8 6->8 7->8 8->16
* Returns a power of two size for the given target capacity.
* <pre>
*
* {@link java.util.HashMap#tableSizeFor(int)}
* static final int tableSizeFor(int cap) {
* int n = cap - 1;
* n |= n >>> 1;
* n |= n >>> 2;
* n |= n >>> 4;
* n |= n >>> 8;
* n |= n >>> 16;
* return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
* }
* </pre>
*/
int newCapacity = index;
newCapacity |= newCapacity >>> 1;
newCapacity |= newCapacity >>> 2;
newCapacity |= newCapacity >>> 4;
newCapacity |= newCapacity >>> 8;
newCapacity |= newCapacity >>> 16;
newCapacity++;
/**
* 创建新数组并拷贝旧数组的元素到新数组
*/
Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
/**
* 初始化扩容出来的部分的元素
*/
Arrays.fill(newArray, oldCapacity, newCapacity, UNSET);
/**
* 设置变量
*/
newArray[index] = value;
/**
* 将新数组设置给成员变量
*/
indexedVariables = newArray;
}
首先获取旧数组及其长度;然后进行新数组容量的计算(计算方式与1.8的HashMap一样:都是获取比给定值大的最小的2的n次方的数);然后创建新数组并拷贝旧数组元素到新数组,最后对扩容多出来的元素初始化为UNSET,然后设置value值,最后将新数组赋值给indexedVariables成员变量。
到此为止设置值的操作就结束了,最后:添加当前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中。
/**
* 将当前的FastThreadLocal添加到indexedVariables[variablesToRemoveIndex]位置上的Set<FastThreadLocal<?>>中
*/
private void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<V> threadLocal) {
// 1、首先从InternalThreadLocalMap获取Set,如果存在,直接往Set里添加值FastThreadLocal;
// 如果不存在,则先创建一个Set,然后将创建的Set添加到InternalThreadLocalMap中,最后将FastThreadLocal添加到这个Set中
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
// Set中的FastThreadLocal可能有多个类型,所以此处的泛型使用?,而不是用指定的V
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariables(VARIABLES_TO_REMOVE_INDEX, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
}
variablesToRemove.add(threadLocal);
}
步骤比较简单,先获取set集合,如果set集合不存在,则先创建set集合,之后将当前的ftl添加;否则直接进行添加。
3.2.4 注册资源清理器
/**
* 注册资源清理器:当该ftl所在的线程不强可达时,清理其上当前ftl的value和set<FastThreadLocal<?>>中当前的ftl
*/
private void registerCleaner(InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
// 如果已经开启了自动清理功能 或者 已经对threadLocalMap中当前的FastThreadLocal开启了清理线程
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlags(index)) {
return;
}
// 设置是否已经开启了对当前的FastThreadLocal清理线程的标志
threadLocalMap.setCleanerFlags(index);
// 将当前线程和清理任务注册到ObjectCleaner上去
ObjectCleaner.register(current, () -> remove(threadLocalMap));
}
这一块儿在回收机制中进行分析。
3.3、ftl的设置
/**
* 设置一个value
*/
public void set(V value) {
// 1、如果value是UNSET,表示删除当前的ThreadLocal对应的value;
// 如果不是UNSET,则可能是修改,也可能是新增;
// 如果是修改,修改value结束后返回,
// 如果是新增,则先新增value,然后新增ThreadLocal到Set中,最后注册Cleaner清除线程
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
if (setKnownNotUnset(threadLocalMap, value)) {
registerCleaner(threadLocalMap);
}
} else {
// 如果设置的值是UNSET,表示清除该FastThreadLocal的value
remove();
}
}
步骤见注释。
/**
* 返回true:如果是新添加了一个value;
* 返回false:如果是修改了一个value。
*/
private boolean setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
// 新增value
if (threadLocalMap.setIndexedVariables(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
return true;
}
// 修改value
return false;
}
3.4、ftl的删除
ftl有两种删除操作:
- 单个删除当前的ftl对象的值;
- 删除当前线程上的InternalThreadLocalMap中的每一个value以及threadLocalMap本身。
首先来看单个删除:
======================FastThreadLocal======================
/**
* 清除当前的FastThreadLocal
*/
private void remove() {
remove(InternalThreadLocalMap.getIfSet());
}
private void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
return;
}
// 1、从 InternalThreadLocalMap 中删除当前的FastThreadLocal对应的value
Object v = threadLocalMap.removeIndexedVariable(index);
// 2、从 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
removeFromVariablesToRemove(threadLocalMap, this);
// 3、如果删除的是有效值,则进行onRemove方法的回调
if (v != InternalThreadLocalMap.UNSET) {
try {
// 4、回调子类复写的onRemoved方法,默认为空实现
onRemoved((V) v);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private void removeFromVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<V> threadLocal) {
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
if (v == InternalThreadLocalMap.UNSET || v == null) {
return;
}
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
variablesToRemove.remove(threadLocal);
}
/**
* 当前的threadLocal被删除后的回调:由子类复写
*/
protected void onRemoved(V value) throws Exception {
}
====================== InternalThreadLocalMap ======================
/**
* 获取InternalThreadLocalMap实例,如果为null,则直接返回,不会创建;如果不为null,也直接返回
*/
public static InternalThreadLocalMap getIfSet() {
Thread current = Thread.currentThread();
if (current instanceof FastThreadLocalThread) {
return ((FastThreadLocalThread) current).threadLocalMap();
} else {
return slowThreadLocalMap.get();
}
}
/**
* 删除指定位置的对象
*/
public Object removeIndexedVariable(int index) {
Object[] lookup = indexedVariables;
if (index < lookup.length) {
// 1、获取旧值
Object v = lookup[index];
// 2、设置为UNSET
lookup[index] = UNSET;
// 3、返回旧值
return v;
} else {
return UNSET;
}
}
删除当前线程上的InternalThreadLocalMap中的每一个value以及threadLocalMap本身:
======================FastThreadLocal======================
/**
* 删除当前线程上的InternalThreadLocalMap中的每一个value以及threadLocalMap本身
*/
public static void removeAll() {
// 1、获取当前线程的InternalThreadLocalMap,如果当前的InternalThreadLocalMap为null,则直接返回
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
return;
}
try {
// 2、从indexedVariables[VARIABLES_TO_REMOVE_INDEX]获取目前InternalThreadLocalMap存储的有效的FastThreadLocal的值,之后遍历Set,进行remove操作
// 注意:这也是为什么我们会将有效的FastThreadLocal存储在一个Set中的原因(另外,如果没有Set<FastThreadLocal<?>>这个集合的话,我们需要直接去遍历整个indexedVariables数组,可能其中有效的并不多,影响效率)
Object v = threadLocalMap.indexedVariable(VARIABLES_TO_REMOVE_INDEX);
if (v != null && v != InternalThreadLocalMap.UNSET) {
Set<FastThreadLocal<?>> threadLocals = (Set<FastThreadLocal<?>>) v;
/**
* 这里为什么需要将set先转换为数组?
* 因为set的for-remove模式会报并发修改异常,array不会
*/
FastThreadLocal[] threadLocalArray = threadLocals.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> threadLocal : threadLocalArray) {
threadLocal.remove();
}
}
} finally {
// 3、删除当前线程的InternalThreadLocalMap
threadLocalMap.remove();
}
}
====================== InternalThreadLocalMap ======================
/**
* 删除当前线程的InternalThreadLocalMap
*/
public void remove() {
Thread current = Thread.currentThread();
if (current instanceof FastThreadLocalThread) {
((FastThreadLocalThread) current).setThreadLocalMap(null);
} else {
slowThreadLocalMap.remove();
}
}
代码很简单,但是这里我们需要注意一点的就是封装性:例如对于InternalThreadLocalMap的remove操作,就应该封装在InternalThreadLocalMap类中(充血);而很多时候我们编写的代码会直接将InternalThreadLocalMap.remove()这一段代码写到用到的地方,这样就打破了封装性。
到此为止,ftl的基本源码除了回收机制之外,其他的就分析完了。
3.5、ftlt对普通Runnable任务的包装处理
public class FastThreadLocalRunnable implements Runnable {
private Runnable runnable;
public FastThreadLocalRunnable(Runnable runnable) {
this.runnable = ObjectUtil.checkNotNull(runnable, "runnable");
}
public static Runnable wrap(Runnable runnable) {
return runnable instanceof FastThreadLocalRunnable ? runnable : new FastThreadLocalRunnable(runnable);
}
@Override
public void run() {
try {
// 运行任务
this.runnable.run();
} finally {
/**
* 线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其InternalThreadLocalMap和其内的FastThreadLocal信息,
* 否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用时的信息;
* 另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其上的ThreadLocal将发生资源泄露。
*/
FastThreadLocal.removeAll();
}
}
}
使用该类将一个普通的Runnable对象进行wrap装饰,之后在调用FastThreadLocalRunnable.run()的时候,实际上会调用真实对象(即普通的Runnable对象)的run(),执行完成之后,会进行对当前线程的全量回收操作(删除当前线程上的InternalThreadLocalMap中的每一个value以及threadLocalMap本身),这样就可以有效的在线程池中复用当前线程而不必关心ftl的错乱和泄漏问题。该类只会在ftlt中使用。下面来看一下ftlt:
public class FastThreadLocalThread extends Thread {
/**
* 类比Thread的ThreadLocal.ThreadLocalMap threadLocals属性
*/
private InternalThreadLocalMap threadLocalMap;
/**
* ThreadLocal与线程池问题:
* 线程池中的线程由于会被复用,所以线程池中的每一条线程在执行task结束后,要清理掉其InternalThreadLocalMap和其内的FastThreadLocal信息,
* 否则,当这条线程在下一次被复用的时候,其ThreadLocalMap信息还存储着上一次被使用时的信息;
* 另外,假设这条线程不再被使用,但是这个线程有可能不会被销毁(与线程池的类型和配置相关),那么其InternalThreadLocalMap和其内的FastThreadLocal信息将发生了资源泄露。
*
* 所以,如果一个Runnable任务被FastThreadLocalRunnable包裹,那么其InternalThreadLocalMap和其内的FastThreadLocal信息会被自动清理,此时:cleanupFastThreadLocals==true
* 否则,cleanupFastThreadLocals==false,此时线程需要注册到ObjectCleaner上,当线程不强可达时,由清理线程清理 其InternalThreadLocalMap和其内的FastThreadLocal信息
* 值得注意的是,如果在netty中如果普通线程执行任务(不会被FastThreadLocalRunnable包裹),还是要注意"ThreadLocal与线程池问题",
* netty对于普通线程仅仅是当线程不强可达时才会进行清理操作。
*/
private final boolean cleanupFastThreadLocals;
public FastThreadLocalThread() {
cleanupFastThreadLocals = false;
}
public FastThreadLocalThread(Runnable runnable) {
super(FastThreadLocalRunnable.wrap(runnable));
cleanupFastThreadLocals = true;
}
public FastThreadLocalThread(Runnable runnable, ThreadGroup threadGroup, String threadName) {
super(threadGroup, FastThreadLocalRunnable.wrap(runnable), threadName);
cleanupFastThreadLocals = true;
}
public void setThreadLocalMap(InternalThreadLocalMap threadLocalMap) {
this.threadLocalMap = threadLocalMap;
}
public InternalThreadLocalMap threadLocalMap() {
return this.threadLocalMap;
}
/**
* 是否会自动清理当前线程的"InternalThreadLocalMap和其内的FastThreadLocal信息"
*/
public boolean willCleanupFastThreadLocals() {
return this.cleanupFastThreadLocals;
}
/**
* 只有FastThreadLocalThread会作自动清理操作,其他类型的线程不会,
* 因为如果是普通线程执行一个被FastThreadLocalRunnable进行wrap过的Runnable任务,清除操作是FastThreadLocal.removeAll(),所以只能清除当前线程的ThreadLocalMap中key==slowThreadLocalMap的那个Entry,其他普通的Entry无法进行清除。
*/
public static boolean willCleanupFastThreadLocals(Thread current) {
return current instanceof FastThreadLocalThread
&& ((FastThreadLocalThread) current).willCleanupFastThreadLocals();
}
}
从上述两个类我们可以看出,只要是使用了ftlt,并且其执行的Runnable任务使用FastThreadLocalRunnable进行wrap,就可以达到自动清理的目的。二者缺一不可!如果是一个普通的线程执行一个被FastThreadLocalRunnable进行wrap过的Runnable任务,由于清除操作是FastThreadLocal.removeAll()
,所以只能清除当前线程的ThreadLocalMap中key==slowThreadLocalMap的那个Entry,其他普通的Entry无法进行清除。
3.6、ftlt线程工厂
public class FastThreadLocalThreadFactory implements ThreadFactory {
/**
* 线程池的ID,所有 FastThreadLocalThreadFactory实例共有
*/
private static AtomicInteger poolId = new AtomicInteger();
/**
* 线程池中的线程ID,FastThreadLocalThreadFactory实例私有
*/
private AtomicInteger nextId = new AtomicInteger();
/**
* 线程名称前缀
*/
private String prefix;
/**
* 是否是后台线程
*/
private boolean deamon;
/**
* 线程的优先级
*/
private int priority;
/**
* 线程组
*/
private ThreadGroup threadGroup;
public FastThreadLocalThreadFactory(String poolName) {
this(poolName, Thread.NORM_PRIORITY, false, Thread.currentThread().getThreadGroup());
}
public FastThreadLocalThreadFactory(String poolName, int priority, boolean deamon, ThreadGroup threadGroup) {
if (poolName == null) {
throw new NullPointerException("poolName");
}
if (priority > Thread.MAX_PRIORITY || priority < Thread.MIN_PRIORITY) {
throw new IllegalArgumentException("priority");
}
this.prefix = poolName + "-" + poolId.getAndIncrement() + "-";
this.priority = priority;
this.deamon = deamon;
this.threadGroup = threadGroup;
}
@Override
public Thread newThread(Runnable r) {
// 线程名称 poolName-poolId-nextId
Thread thread = new FastThreadLocalThread(r, threadGroup, prefix + nextId.incrementAndGet());
if (thread.isDaemon() != deamon) {
thread.setDaemon(deamon);
}
if (thread.getPriority() != priority) {
thread.setPriority(priority);
}
return thread;
}
}
从ftlt工厂的newThread(Runnable r)方法中可以看出,每一个普通的r都会被ftlt进行包裹,所以该ftlt创造出来的ftlt都具有自动清理的功能。
3.7、ftlt线程池构造器
为了方便使用ftlt线程池,这里创建了一个工具类,类比java.util.concurrent.Executors。
/**
* 类比 {@link java.util.concurrent.Executors}
*
* 线程池与Promise的结合使用见 promise-framework io.hulk.promise.framework.DefaultPromiseTest
*
* @author zhaojigang
* @date 2018/8/2
*/
public class FastThreadExecutors {
/**
* 创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池
* 核心线程会一直存在,不被回收
* 如果一个核心线程由于异常跪了,会新创建一个线程
* 无界队列LinkedBlockingQueue
*/
public static Executor newFixedFastThreadPool(int nThreads, String poolName) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), new FastThreadLocalThreadFactory(poolName));
}
/**
* corePoolSize==0
* maximumPoolSize==Integer.MAX_VALUE
* 队列:SynchronousQueue
* 创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
* 这种池将会在执行许多耗时短的异步任务的时候提高程序的性能
* 60秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
*/
public static Executor newCachedFastThreadPool(String poolName) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new FastThreadLocalThreadFactory(poolName));
}
/**
* 自定义各种参数
*/
public static Executor newLimitedFastThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName,
RejectedExecutionHandler handler) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
new FastThreadLocalThreadFactory(poolName), handler);
}
}
四、回收机制
在netty中对于ftl提供了三种回收机制:
- 自动:使用ftlt执行一个被FastThreadLocalRunnable wrap的Runnable任务,在任务执行完毕后会自动进行ftl的清理。
- 手动:ftl和InternalThreadLocalMap都提供了remove方法,在合适的时候用户可以(有的时候也是必须,例如普通线程的线程池使用ftl)手动进行调用,进行显示删除。
- 自动:为当前线程的每一个ftl注册一个Cleaner,当线程对象不强可达的时候,该Cleaner线程会将当前线程的当前ftl进行回收。(netty推荐如果可以用其他两种方式,就不要再用这种方式,因为需要另起线程,耗费资源,而且多线程就会造成一些资源竞争)
下面来介绍第三种:在进行set操作时,当确定是新增一个ftl的时候,会执行如下方法:
======================FastThreadLocal======================
/**
* 注册资源清理器:当该ftl所在的线程不强可达时,清理其上当前ftl的value和set<FastThreadLocal<?>>中当前的ftl
*/
private void registerCleaner(InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
// 如果已经开启了自动清理功能 或者 已经对threadLocalMap中当前的FastThreadLocal的开启了清理线程
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || threadLocalMap.isCleanerFlags(index)) {
return;
}
// 设置是否已经开启了对当前的FastThreadLocal清理线程的标志
threadLocalMap.setCleanerFlags(index);
// 将当前线程和清理任务注册到ObjectCleaner上去
ObjectCleaner.register(current, () -> remove(threadLocalMap));
}
====================== InternalThreadLocalMap ======================
/**
* BitSet简要原理:
* BitSet默认底层数据结构是一个long[]数组,开始时长度为1,即只有long[0],而一个long有64bit。
* 当BitSet.set(1)的时候,表示将long[0]的第二位设置为true,即0000 0000 ... 0010(64bit),则long[0]==2
* 当BitSet.get(1)的时候,第二位为1,则表示true;如果是0,则表示false
* 当BitSet.set(64)的时候,表示设置第65位,此时long[0]已经不够用了,扩容处long[1]来,进行存储
*
* 存储类似 {index:boolean} 键值对,用于防止一个FastThreadLocal多次启动清理线程
* 将index位置的bit设为true,表示该InternalThreadLocalMap中对该FastThreadLocal已经启动了清理线程
*/
private BitSet cleanerFlags;
/**
* 设置当前索引位置index(FastThreadLocal)的bit为1
*/
public void setCleanerFlags(int index) {
if (cleanerFlags == null) {
cleanerFlags = new BitSet();
}
cleanerFlags.set(index);
}
/**
* 获取 当前index的bit值,1表示true,0表示false(默认值)
*/
public boolean isCleanerFlags(int index) {
return cleanerFlags != null && cleanerFlags.get(index);
}
首先获取当前线程;然后判断当前线程是否开启了自定清理功能(也就是三种清理方式的第一种),如果开启了,则直接返回,否则,判断是否已经对threadLocalMap中当前的FastThreadLocal的开启了清理线程,如果是,也直接返回,否则先设置是否已经开启了对当前的FastThreadLocal清理线程的标志(关于BitSet的简要原理见注释),然后将当前线程和清理任务注册到ObjectCleaner上去。
/**
* from netty4.1
*/
public class ObjectCleaner {
/**
* 不强可达的Object的队列
*/
private static final ReferenceQueue<Object> REFERENCE_QUEUE = new ReferenceQueue<>();
/**
* 设置从REFERENCE_QUEUE阻塞获取值的等待时长,在等待期间,不会释放cpu资源,所以会占用一个核。
* 这个时间设置的太短,会进行空循环;设置的太长会占用核,所以netty提供了参数来进行设置:
* io.netty.util.internal.ObjectCleaner.refQueuePollTimeout,默认为10s
*/
private static final int REFERENCE_QUEUE_POLL_TIMEOUT_MS = 10000;
/**
* 存活的 AutomaticCleanerReference 对象
* <p>
* 为什么需要并发SET?
* 如果是同一个线程的set多个ThreadLocal一定不会有问题,因为同一个线程是顺序执行;
* 如果是两个线程同时set各自的FastThreadLocal,就会同时调用ObjectCleaner#register方法,由于LIVE_SET是一个类变量,即是一个共享变量,
* 此时就可能发生并发问题。
*/
private static Set<Object> LIVE_SET = new ConcurrentSet<>();
/**
* cleaner线程是否已经在运行中
* 尽量保证在整个系统的运行中,只有一个CLEAN_TASK在运行(类似于gc线程)
*/
private static final AtomicBoolean CLEANER_RUNNING = new AtomicBoolean();
/**
* 清理线程的名称
*/
private static final String CLEANUP_THREAD_NAME = ObjectCleaner.class.getSimpleName() + "Thread";
/**
* 清理线程
*/
private static final Runnable CLEAN_TASK = () -> {
boolean interrupted = false;
for (; ; ) {
while (!LIVE_SET.isEmpty()) {
AutomaticCleanerReference reference;
try {
// 1、从REFERENCE_QUEUE阻塞获取不强可达的reference
reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
} catch (InterruptedException e) {
interrupted = true;
continue;
}
if (reference != null) {
try {
// 2、执行reference的清理操作:
// 清理当前的线程的InternalThreadLocalMap中注册时的FastThreadLocal的值 和 其在Set<FastThreadLocal<?>>的值
/**
* 执行清理线程(当referent是一个线程对象时,此时表示这个线程对象已经不强可达了,
*
* 就会对该线程对象中的InternalThreadLocalMap中的Object[]中删除index(FastThreadLocal标识)位置的value元素,
* 然后从Set<FastThreadLocal<?>>中删除当前的FastThreadLocal对象
*
* 注意:注册到ObjectCleaner是每一个FastThreadLocal做的事儿,所以这里的删除也是对每一个FastThreadLocal进行操作,
* 而不会对线程的InternalThreadLocalMap整体或者其内的所有FastThreadLocal做操作。
* 另外,要注意:LIVE_SET存储的key不是当前线程,而是一个AutomaticCleanerReference,该对象在每次register的时候都会进行new,
* 所以同一个线程的InternalThreadLocalMap中不同的ThreadLocal会分别被封装为一个AutomaticCleanerReference
*/
reference.cleanup();
} catch (Exception e) {
}
// 3、将reference从存活列表中删除
LIVE_SET.remove(reference);
}
}
// 4、设置清理线程的运行状态为false
CLEANER_RUNNING.set(false);
// 5、再次检测(优化)
// 假设此处又来了一个任务,LIVE_SET中添加了这个任务;这时候设置CLEANER_RUNNING由false变为true,
// 如果设置成功,则继续进行clean操作,就不需要再创建一个线程来执行CLEANER_TASK任务了;(这也是外层for(;;)循环和此处的if判断的意义所在)
// 如果设置失败,说明,已经有线程开始执行CLEANER_TASK任务了,那么当前线程直接退出就ok了
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
break;
}
}
if (interrupted) {
// As we caught the InterruptedException above we should mark the Thread as interrupted.
Thread.currentThread().interrupt();
}
};
/**
* Register the given {@link Object} for which the {@link Runnable} will be executed once there are no references
* to the object anymore.
*
* This should only be used if there are no other ways to execute some cleanup once the Object is not reachable
* anymore because it is not a cheap way to handle the cleanup.
*/
public static void register(Thread current, Runnable runnable) {
// 1、创建 AutomaticCleanerReference
AutomaticCleanerReference reference = new AutomaticCleanerReference(current, runnable);
// 2、将当前的 AutomaticCleanerReference 添加到LIVE_SET
LIVE_SET.add(reference);
// 3、cas启动cleaner线程:确保只有一个清理线程在run
if (CLEANER_RUNNING.compareAndSet(false, true)) {
Thread cleanupThread = new FastThreadLocalThread(CLEAN_TASK);
cleanupThread.setName(CLEANUP_THREAD_NAME);
cleanupThread.setPriority(Thread.MIN_PRIORITY);
cleanupThread.setDaemon(true);
cleanupThread.start();
}
}
private static final class AutomaticCleanerReference extends WeakReference<Object> {
private final Runnable cleanupTask;
/**
* 将object包裹为referent,并关联ReferenceQueue为REFERENCE_QUEUE;
* 当referent不强可达时,整个reference对象会进入REFERENCE_QUEUE,之后我们对REFERENCE_QUEUE进行一些操作
*/
public AutomaticCleanerReference(Object referent, Runnable cleanupTask) {
super(referent, REFERENCE_QUEUE);
this.cleanupTask = cleanupTask;
}
public void cleanup() {
cleanupTask.run();
}
@Override
public Thread get() {
return null;
}
@Override
public void clear() {
LIVE_SET.remove(this);
super.clear();
}
}
}
直接贴出全貌,依然是使用WeakReference,类似于ThreadLocal的Entry。
五、FastThreadLocal优缺点
优点:
- ftl使用了单纯的数组操作来替代了tl的hash表操作,所以在高并发的情况下,ftl操作速度更快。
- set操作:ftl直接根据index进行数组set,而tl需要先根据tl的hashcode计算数组下标(而ftl是直接获取),然后再根据线性探测法进行set操作,其间如果发生hash冲突且有无效的Entry时,还要进行Entry的清理和整理操作。最后不管是否冲突,都要进行一次log级别的Entry回收操作,所以慢了。
- get操作:ftl直接根据index进行获取,而tl需要先根据tl的hashcode计算数组下标,然后再根据线性探测法进行get操作,如果不能根据直接索引获取到value的话并且在向后循环遍历的过程中发现了无效的Entry,则会进行无效Entry的清理和整理操作。
- remove操作:ftl直接根据index从数组中删除当前的ftl的value,然后从Set集合中删除当前的ftl,之后还可以进行删除回调操作(功能增强);而tl需要先根据tl的hashcode计算数组下标,然后再根据线性探测法进行remove操作,最后还需要进行无效Entry的整理和清理操作。
- tl由于使用线性探测法,需要在get、set以及remove时做一些资源清理和整理操作,所以代码看上去不如ftl清晰明了。
缺点:
- ftl相较于tl不好的地方就是内存占用大,不会重复利用已经被删除(用UNSET占位)的数组位置,只会一味增大,是典型的“空间换时间”的操作。
六、小工具:基于ConcurrentHashMap实现的ConcurrentSet
/**
* from netty4.1
* 基于ConcurrentHashMap实现的支持并发的Set(实际上就是使用map的set,原理类比:HashSet和HashMap),所以在mina中该类被称为ConcurrentHashSet
*/
public class ConcurrentSet<E> extends AbstractSet<E> implements Serializable {
private static final long serialVersionUID = -1244664838594578508L;
/**
* 基本数据结构
*/
private ConcurrentMap<E, Boolean> map;
/**
* 创建ConcurrentSet实例
* 并初始化内部的ConcurrentHashMap
*/
public ConcurrentSet() {
map = new ConcurrentHashMap<>();
}
@Override
public boolean contains(Object o) {
return map.containsKey(o);
}
@Override
public boolean add(E e) {
return map.putIfAbsent(e, Boolean.TRUE) == null;
}
@Override
public boolean remove(Object o) {
return map.remove(o);
}
@Override
public Iterator<E> iterator() {
return map.keySet().iterator();
}
@Override
public int size() {
return map.size();
}
}
七、总结
7.1、FastThreadLocal的get流程
- 根据线程类型获取InternalThreadLocalMap,如果是ftlt类型,直接获取currentThread.InternalThreadLocalMap;如果是普通类型,获取currentThread.ThreadLocalMap中key为slowThreadLocalMap的Entry,其value就是InternalThreadLocalMap实例。如果获取不到,创建InternalThreadLocalMap实例(底层是一个Object[]),之后设置到currentThread.InternalThreadLocalMap或者currentThread.ThreadLocalMap中key为slowThreadLocalMap的Entry中。
- 根据ftl的index值从InternalThreadLocalMap对象中查找value,如果查到的是有效值(不是UNSET),则直接返回;否则,执行初始化操作。
- 初始化操作:首先调用子类复写的initialValue()方法获取初始值,然后设置到InternalThreadLocalMap中当前的ftl的index值所在的位置处,因为是初始化,所以一定是新增的,所以此处直接将当前的ftl存储到InternalThreadLocalMap的Object[0]处的
Set<FastThreadLocal<?>>
set集合中;最后返回初始值。 - 注册资源清理器:如果已经开启了自动清理功能或者已经注册过cleaner了,则直接返回,否则创建ftl清除线程
() -> remove(threadLocalMap)
;然后将当前线程current和ftl清除线程封装到AutomaticCleanerReference中;然后将该AutomaticCleanerReference对象添加到LIVE_SET并发set集合中;之后使用cas启动终极清理线程CLEAN_TASK,不断轮询LIVE_SET,从不强可达的AutomaticCleanerReference的引用队列中获取AutomaticCleanerReference对象,调用其内的清除线程进行清理操作(三步:从InternalThreadLocalMap中删除当前的ftl对应的value;从InternalThreadLocalMap中的Set<FastThreadLocal<?>>
中删除当前的ftl对象;回调子类复写的onRemoved(V value)方法)。
7.2、FastThreadLocal的set流程
- 如果设置的值是UNSET,则进行删除操作;
- 如果设置的值不是UNSET,首先获取InternalThreadLocalMap;
- 然后设置value到InternalThreadLocalMap中当前ftl的index值处,可能是新增,也可能是修改;如果是新增,将当前的ftl存储到InternalThreadLocalMap的Object[0]处的
Set<FastThreadLocal<?>>
set集合中; - 注册资源清理器:同5.1
7.3、FastThreadLocal的remove流程
单个删除:
- 获取当前线程的InternalThreadLocalMap对象,如果为null,直接返回,否则,
- 从InternalThreadLocalMap对象中删除当前的ftl的value
- 从InternalThreadLocalMap对象的Object[0]处的
Set<FastThreadLocal<?>>
set集合中删除当前的ftl对象 - 回调onRemoved(V v)方法
全部删除:
- 获取当前线程的InternalThreadLocalMap对象,如果为null,直接返回,否则,
- 从InternalThreadLocalMap对象的Object[0]处获取
Set<FastThreadLocal<?>>
set集合 - 遍历set集合中的每一个ftl,执行上述的“单个删除”操作
- 最后将当前线程的InternalThreadLocalMap对象置null 或者 将当前线程的ThreadLocalMap中的key为slowThreadLocalMap的tl移除掉。