如何处理线程上下文之间的参数透传
1,ThreadLocal 的使用场景
1.1 介绍:
ThreadLocal是Java中的一个类,可以实现在多线程环境下安全地存储和访问线程局部变量。它可以看作是一个容器,用于存储特定线程所需的数据。
在多线程编程中,每个线程都有自己的执行上下文,包括自己的栈、本地变量和程序计数器等。线程局部变量就是指在每个线程的执行上下文中的本地变量。ThreadLocal可以帮助我们在多线程环境下,为每个线程维护一个独立的局部变量副本,从而避免多线程竞争和数据混乱的问题。
1.2 原理:
-
每个线程维护一个ThreadLocalMap
ThreadLocalMap的键为ThreadLocal实例,即每个线程局部变量对应的ThreadLocal对象。这是因为ThreadLocal是线程局部变量的抽象类,每个ThreadLocal实例对应着一个独立的线程局部变量。
在ThreadLocalMap中,ThreadLocal作为键,其内部实现是一个自定义的Entry数组,Entry中存储了键值对的信息,即ThreadLocal实例作为键,对应的线程局部变量副本作为值。当线程调用get()、set()和remove()等方法时,ThreadLocalMap会根据当前线程对象找到对应的Entry,并返回或者修改其中的值。
值得注意的是,ThreadLocal实例是弱引用,这意味着如果ThreadLocal实例没有被任何线程引用时,可能会被垃圾回收机制回收。如果ThreadLocal实例被回收后,对应的键值对也会被清除,这就是ThreadLocal内存泄漏的主要原因之一。为了避免内存泄漏,我们通常会在不需要使用ThreadLocal时显式地调用remove()方法来清除键值。
1.3 使用场景
ThreadLocal的使用场景主要涉及到需要在多线程环境下实现数据隔离的情况
- 线程内部的变量共享问题:当在多个线程之间需要共享一个变量时,但又要保证线程安全,此时可以使用ThreadLocal实现局部变量的隔离,每个线程都有自己的副本,互不干扰。
- 用户信息:在Web应用中,需要存储用户信息或者会话信息,在多线程环境下,如果不使用ThreadLocal会产生线程安全问题,此时应该使用ThreadLocal来解决问题。
- 线程活动的状态:有些情况下,我们需要记录或者保存线程的状态,这时使用ThreadLocal来存储线程状态是比较合适的选择。
1.4 总结
总之,如果需要在多线程环境下实现数据隔离,或者需要跨线程存储变量信息,就可以考虑使用ThreadLocal来解决问题。但需要注意,使用ThreadLocal也可能存在内存泄漏的问题,使用时需要仔细设计和管理。
2,InheritableThreadLocal 跨线程使用ThreadLocal
2.1 介绍
多数情况下不会出现跨线程的使用,如果是需要线程上下文的情景,可能会跨线程
- InheritableThreadLocal 的介绍
InheritableThreadLocal是ThreadLocal的一个变种,它允许在父子线程之间继承值。与ThreadLocal不同的是,当一个新线程由另一个线程(即父线程)创建时,它会从父线程中的InheritableThreadLocal复制值。
InheritableThreadLocal除了实现了ThreadLocal的功能外,还增加了子线程从父线程那里继承ThreadLocal值的功能,这种特性使其更适合在多线程应用程序中传递上下文信息。
2.2 为什么InheritableThreadLocal 父子线程可以实现 ThreadLocal 的传递
这是由线程的创建决定的
//创建线程的构造方法,其中父子线程的传递在init 实现
public Thread() {
init(null, null, "Thread-" + nextThreadNum(), 0);
}
-
init 方法的实现
private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { ...省略其他代码,只保留核心 //inheritThreadLocals 是有线程维护的InheritableThreadLocalcal 重写的getMap 就是获取此变量,此变量也是一个threadLocalMap if (inheritThreadLocals && parent.inheritableThreadLocals != null) // this.inheritableThreadLocals = //再此实现父ttl 的值复制 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); } //创建map static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) { return new ThreadLocalMap(parentMap); } //复值ttl 核心代码 private ThreadLocalMap(ThreadLocalMap parentMap) { Entry[] parentTable = parentMap.table; int len = parentTable.length; setThreshold(len); table = new Entry[len]; for (int j = 0; j < len; j++) { Entry e = parentTable[j]; if (e != null) { @SuppressWarnings("unchecked") ThreadLocal<Object> key = (ThreadLocal<Object>) e.get(); if (key != null) { //childValue 就是 InheritableThreadLocalcal 重写的方法,返回父线程的值 Object value = key.childValue(e.value); Entry c = new Entry(key, value); int h = key.threadLocalHashCode & (len - 1); while (table[h] != null) h = nextIndex(h, len); table[h] = c; size++; } } } }
2.3 代码演示
2.3.1 普通ThreadLocal 的使用,用父子线程的值传递
@Test
public void test1() {
ThreadLocal<String> local = new ThreadLocal<>();
local.set("init");
log.info("线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
//子线程获取
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
log.info("子线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
}
});
thread.start();
}
//
18:04:48.220 [main] INFO com.h3c.mp.thread.ThreadTest3 - 线程:main获取threadLocal:init 值
18:04:48.235 [Thread-0] INFO com.h3c.mp.thread.ThreadTest3 - 子线程:Thread-0获取threadLocal:null
2.3.2 InheritableThreadLocalcal 的演示与使用
@Test
public void test2() {
InheritableThreadLocal<String> local = new InheritableThreadLocal<>();
local.set("init");
log.info("线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
//子线程获取
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
log.info("子线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
}
});
thread.start();
}
18:08:20.291 [main] INFO com.h3c.mp.thread.ThreadTest3 - 线程:main获取threadLocal:init 值
18:08:20.345 [Thread-0] INFO com.h3c.mp.thread.ThreadTest3 - 子线程:Thread-0获取threadLocal:init 值
2.4 总结
因此 InheritableThreadLocal 可以用来做父子线程的值参数传递
但是 由源码可以看出,真正进行值传递时,是在线程创建的时候触发执行的,所以在使用线程池的时候,当发生线程复用时,可能造成值获取错乱
3 , Ttl 技术的使用
3.1 介绍
JDK
的InheritableThreadLocal
类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;**这时父子线程关系的ThreadLocal
值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal
值传递到 任务执行时****。
3.2 原理
3.2.1 简单模拟
public final class DelegatingContextRunnable implements Runnable {
private final Runnable delegate;
private final Optional<String> delegateContext;
public DelegatingContextRunnable(Runnable delegate,
Optional<String> context) {
assert delegate != null;
assert context != null;
this.delegate = delegate;
this.delegateContext = context;
}
public DelegatingContextRunnable(Runnable delegate) {
// 修饰原有的任务,并保存当前线程的值
this(delegate, ContextHolder.get());
}
public void run() {
Optional<String> originalContext = ContextHolder.get();
try {
ContextHolder.set(delegateContext);
delegate.run();
} finally {
ContextHolder.set(originalContext);
}
}
}
public final void execute(Runnable task) {
// 递交给真正的执行线程池前,对任务进行修饰
executor.execute(wrap(task));
}
protected final Runnable wrap(Runnable task) {
return new DelegatingContextRunnable(task);
}
总结: 实际上通过三部,
- 获取提交任务时的线程ttl
- 重放提交任務時的threadLocal
- 恢复执行任务是线程的threadLocal
3.3 Tt l 技术的实现
3.3.1 github 地址
https://github.com/alibaba/transmittable-thread-local
3.3.2 maven 依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.2</version>
</dependency>
3.3.3 核心类 TransmittableThreadLocal
使用类TransmittableThreadLocal
来保存值,并跨线程池传递。
TransmittableThreadLocal
继承InheritableThreadLocal
,使用方式也类似。相比InheritableThreadLocal
,添加了protected
的transmitteeValue()
方法,用于定制 任务提交给线程池时 的ThreadLocal
值传递到 任务执行时 的传递方式,缺省是简单的赋值传递。
3.3.4 简单使用
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
// =====================================================
// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();
3.3.5 线程池中传值
-
修饰Runnable
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// =====================================================
// Task中可以读取,值是"value-set-in-parent"
String value = context.get();
-
修饰线程池
省去每次
Runnable
和Callable
传入线程池时的修饰,这个逻辑可以在线程池中完成。通过工具类
TtlExecutors
完成,有下面的方法:-
getTtlExecutor
:修饰接口Executor
-
getTtlExecutorService
:修饰接口ExecutorService
-
getTtlScheduledExecutorService
:修饰接口ScheduledExecutorService
-
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
3.4 TtlRunnable 的实现
3.4.1 TtlRunnable 的 核心源码
//省略非核心代码
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
//存放上下文
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.capturedRef = new AtomicReference<Object>( capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
//@2去出任务提交时的线程上下文
final Object captured = capturedRef.get();
//
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
//@3 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量
final Object backup = replay(captured);
try {
runnable.run();
} finally {
//@4恢复线程池中当前执行任务的线程的上下文环境 ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复
restore(backup);
}
}
}
-
captured == null
:这个条件检查了保存的上下文引用是否为null
。如果为null
,意味着没有保存的上下文,可能是因为没有正确捕获上下文或者上下文已经被释放。 -
releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)
:这个条件判断了是否需要在任务执行后释放保存的上下文引用,并尝试将保存的上下文引用设为null
。如果releaseTtlValueReferenceAfterRun
为true
且无法将保存的上下文引用设为null
,意味着上下文引用在任务执行后已经被释放或被其他线程修改。
如果上述任何一个条件判断为true
,则会抛出IllegalStateException
异常,表示保存的上下文引用在任务执行后已经被释放,出现了异常情况。
这个检查的目的是确保在任务执行前保存的上下文引用在执行期间是有效的,以避免在无效的上下文环境中执行任务导致的问题。
==capture()方法的话,这里的操作其实就是将父线程的TTL变量集合生成相应的快照记录,并随着任务创建包装的时候,保存到生成的 AtomicReference<Object> capturedRef;,由此实现了异步线程在线程池下的变量传递==。
==replay(captured) 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量==
==restore(backup) 随后恢复线程的任务至初始的备份状态==;
==capture() 和剩余的replay 和restore ,大多数不在一个线程,==
3.5 TransmittableThreadLocal 的实现与核心原理
3.5.1 核心holder
// Note about the holder:
// 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
// 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
// 2.1 but the WeakHashMap is used as a *Set*:
// the value of WeakHashMap is *always* null, and never used.
// 2.2 WeakHashMap support *null* value.
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
//创建一个初始为null的set集合
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
==这段代码创建了一个匿名内部类,并通过重写initialValue()
和childValue()
方法来指定InheritableThreadLocal
的初始值和子线程值的生成方式==
-
initialValue()
方法:这个方法在获取线程本地变量的初始值时被调用。在这里,它返回一个WeakHashMap<TransmittableThreadLocal<Object>, ?>
对象,作为初始值存储在当前线程中。 -
childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue)
方法:这个方法在父线程中设置了线程本地变量后,在子线程中获取该变量时被调用。它接收父线程中的值作为参数,并返回一个新的WeakHashMap<TransmittableThreadLocal<Object>, ?>
对象,将父线程的值复制到子线程中。
这个holder
变量的作用是在线程之间传递TransmittableThreadLocal
对象和其对应的值的映射关系。通过使用InheritableThreadLocal
,它可以确保在父线程中设置的TransmittableThreadLocal
对象和值在子线程中可以继承和访问。这对于实现线程上下文传递非常有用,可以在不同线程之间共享上下文信息
3.5.2 ==holder 的 作用及原理==
- TransmittableThreadLocal set 方法
@Override
public final void set(T value) {
if (!disableIgnoreNullValueSemantics && null == value) {
// may set null to remove value
remove();
} else {
super.set(value);
addThisToHolder();
}
}
-
TransmittableThreadLocal 的方法
public final T get() { T value = super.get(); if (disableIgnoreNullValueSemantics || null != value) { addThisToHolder(); } return value; }
-
==addThisToHolder==
private void addThisToHolder() { if (!holder.get().containsKey(this)) { WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get(); holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value. } }
holder.get()
:这个语句获取了当前线程中存储的WeakHashMap<TransmittableThreadLocal<Object>, ?>
对象,它是holder
的值,用于存储TransmittableThreadLocal
对象和其对应的值的映射关系。!holder.get().containsKey(this)
:这个条件判断检查当前的TransmittableThreadLocal
对象是否已经存在于holder
中。如果不存在,则执行后续操作。transmittableThreadLocalWeakHashMap
:这个变量引用了当前线程中存储的WeakHashMap<TransmittableThreadLocal<Object>, ?>
对象,方便后续操作。holder.get().put((TransmittableThreadLocal<Object>) this, null)
:这行代码将当前的TransmittableThreadLocal
对象作为键,null
作为值,添加到holder
中。WeakHashMap
支持null
值。
这个方法的作用是将当前的TransmittableThreadLocal
对象添加到holder
中,确保线程在使用该对象时可以从holder
中获取对应的值。通过containsKey()
方法判断对象是否已存在于holder
中,避免重复添加。
-
holder
存储的是所有线程在运行过程中的TransmittableThreadLocal
对象和其对应的值的映射关系。在使用
TransmittableThreadLocal
进行线程间传递时,每个线程都可以通过holder
来获取自己线程中存储的TransmittableThreadLocal
对象和值的映射关系。这样就可以在不同的线程中获取和设置相应的上下文信息。当一个线程设置了
TransmittableThreadLocal
对象和值后,它会将该对象添加到holder
中,以便其他线程可以从holder
中获取到相应的对象和值。这样就实现了线程间的上下文传递和共享。需要注意的是,
holder
中使用WeakHashMap
来存储映射关系。WeakHashMap
的特性是当TransmittableThreadLocal
对象没有被其他对象引用时,即没有强引用存在时,该映射关系会被自动清除,从而避免内存泄漏。这样可以确保只有活跃的线程和相关的TransmittableThreadLocal
对象被保留在holder
中。
3.6 ExecutorTtlWrapper 的实现
3.6.1 核心源码
class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
private final Executor executor;
protected final boolean idempotent;
ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
this.executor = executor;
this.idempotent = idempotent;
}
@Override
public void execute(@NonNull Runnable command) {
executor.execute(TtlRunnable.get(command, false, idempotent));
}
@Override
@NonNull
public Executor unwrap() {
return executor;
}
}
3.6.2 使用
public final class TtlExecutors {
public static Executor getTtlExecutor(@Nullable Executor executor) {
if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
return executor;
}
return new ExecutorTtlWrapper(executor, true);
}
//...省略
}
//实际还是修饰runnable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) return null;
if (runnable instanceof TtlEnhanced) {
// avoid redundant decoration, and ensure idempotency
if (idempotent) return (TtlRunnable) runnable;
else throw new IllegalStateException("Already TtlRunnable!");
}
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
3.7 CRR 模式的介绍
-
TtlRunnable 的 run
@Override public void run() { //@2去出任务提交时的线程上下文 final Object captured = capturedRef.get(); // if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) { throw new IllegalStateException("TTL value reference is released after run!"); } //@3 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量 final Object backup = replay(captured); try { runnable.run(); } finally { //@4恢复线程池中当前执行任务的线程的上下文环境, ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复 restore(backup); } }
上下文的传递流程或说生命周期可以规范化成:捕捉、回放和恢复这3个操作
-
框架/中间件集成
TTL
传递,通过TransmittableThreadLocal.Transmitter
抓取当前线程的所有TTL
值并在其他线程进行回放;在回放线程执行完业务操作后,恢复为回放线程原来的TTL
值。TransmittableThreadLocal.Transmitter
提供了所有TTL
值的抓取、回放和恢复方法(即CRR
操作):capture
方法:抓取线程(线程A)的所有TTL
值。replay
方法:在另一个线程(线程B)中,回放在capture
方法中抓取的TTL
值,并返回 回放前TTL
值的备份restore
方法:恢复线程B执行replay
方法之前的TTL
值(即备份)
3.7.1线程池的拒绝策略
-
线程池的拒绝策略
- AbortPolicy(默认策略):当线程池无法接受新任务时,会直接抛出RejectedExecutionException异常,拒绝执行新的任务。
- CallerRunsPolicy:当线程池无法接受新任务时,会使用提交任务的线程来执行该任务。也就是说,如果线程池饱和了,新任务会由提交任务的线程直接执行,这样可以避免任务丢失,但是会影响提交任务的线程的性能。
- DiscardPolicy:当线程池无法接受新任务时,会直接丢弃这个任务,不会抛出任何异常。如果对任务的执行结果没有特殊要求,且不关心任务是否被执行,可以选择此策略。
- DiscardOldestPolicy:当线程池无法接受新任务时,会丢弃线程池中最早被提交的任务,然后尝试再次提交新任务。
除了上述四种常见的拒绝策略外,Java中的线程池还提供了一种可自定义的拒绝策略:
- 自定义拒绝策略:可以实现RejectedExecutionHandler接口,并自定义拒绝策略的具体行为。可以根据实际需求,自定义拒绝策略来处理被拒绝的任务。例如,可以将被拒绝的任务重新放入队列等待执行,或者记录日志等。
在使用线程池时,可以根据实际需求选择合适的拒绝策略,以便对被拒绝的任务进行适当的处理。
3.7.2 线程池的执行流程
[图片上传失败...(image-68a131-1688372604519)]
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("value-set-in-parent");
// (1) 抓取当前线程的所有TTL值
final Object captured = TransmittableThreadLocal.Transmitter.capture();
// ===========================================================================
// 线程 B(异步线程)
// ===========================================================================
// (2) 在线程 B中回放在capture方法中抓取的TTL值,并返回 回放前TTL值的备份
final Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
try {
// 你的业务逻辑,这里你可以获取到外面设置的TTL值
String value = context.get();
System.out.println("Hello: " + value);
...
String result = "World: " + value;
} finally {
// (3) 恢复线程 B执行replay方法之前的TTL值(即备份)
TransmittableThreadLocal.Transmitter.restore(backup);
}
3.7.2为甚麽要replay (回放线程值)
- 在回放线程B执行replay方法之前的TTL值时,是为了确保线程B在执行业务逻辑时能够使用原始的TTL值。这是因为在回放过程中,TTL值被修改为线程A中的值,而不是线程B的原始值。
3.7.3为什么要restore
- 当线程B执行replay方法时,它会从线程A中回放TTL值。这样做是为了使线程B能够访问到在线程A中设置的TTL值,以便在执行业务逻辑时使用。然而,一旦回放完成,线程B可能需要恢复到它自己的原始TTL值,以确保不会影响后续的操作或其他线程。
- 通过调用TransmittableThreadLocal.Transmitter的restore方法,可以将回放之前捕获的TTL值恢复回线程B的原始状态。这样,线程B在执行完业务逻辑后,就可以继续使用它自己的TTL值,而不会受到线程A中设置的TTL值的影响。
- 回复以恢复线程B执行replay方法之前的TTL值是为了确保线程的隔离性和正确性。它允许每个线程在自己的上下文中执行,并在需要的时候共享TTL值,而不会相互干扰或造成不一致的状态。这对于需要在线程之间传递上下文或状态信息的场景非常重要,以保持正确的业务逻辑执行和数据一致性。
3.7.4 backup 什么时候有值
- 典型的业务场景下,
replay
操作的线程,与来源的capture
线程,是不同的。 - 当
capture
的线程 在 业务中立的线程池 时,这样的线程 往往 也没有/不需要 有上下文。
这2个前提成立时,backup
往往 不会有值。
当上面2点不成立时,如
- 上面提到的场景,线程池满了 且 线程池使用的是『
CallerRunsPolicy
』,
则 提交到线程池的任务 在capture
线程直接执行,也就是 直接在业务线程中同步执行; - 使用
ForkJoinPool
(包含并行执行Stream
与CompletableFuture
,底层使用ForkJoinPool
)的场景,展开的ForkJoinTask
会在调用线程中直接执行。
这时 backup
是有值的,如果不做restore backup
业务线程里的上下文就丢了,
业务后续的执行就会有Bug
3.8 时序图
[图片上传失败...(image-28f06b-1688372604519)]
3.9 CRR 核心源码
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef;
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
//@1 捕捉上下文
this.capturedRef = new AtomicReference<Object>(capture());
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
/**
* wrap method {@link Runnable#run()}.
*/
@Override
public void run() {
//@2去出任务提交时的线程上下文
final Object captured = capturedRef.get();
//
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
//@3 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在于子线程中的上下文环境变量
final Object backup = replay(captured);
try {
runnable.run();
} finally {
//@4恢复线程池中当前执行任务的线程的上下文环境, ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复
restore(backup);
}
}
}
3.9.1 ==capture() 方法的实现(此过程逻辑实际发生在提交任务的线程)==
快照对象 (实际上是对父线程的 ttl 的一个快照 )
private static class Snapshot {
final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
this.ttl2Value = ttl2Value;
this.threadLocal2Value = threadLocal2Value;
}
}
Snapshot
类包含两个字段:
-
ttl2Value
:一个HashMap
对象,用于存储TransmittableThreadLocal
对象及其对应的值。TransmittableThreadLocal
是一个特殊的线程局部变量,可以在线程间传递值。 -
threadLocal2Value
:一个HashMap
对象,用于存储普通的ThreadLocal
对象及其对应的值。
这个 Snapshot
类主要用于在多线程环境下保存线程局部变量的快照,以便后续在其他线程中回放这些值。它将 TransmittableThreadLocal
和普通的 ThreadLocal
对象及其对应的值存储在两个 HashMap
中,以便在需要时进行检索和恢复。
通常情况下,Snapshot
对象会在线程切换时创建,以捕获当前线程的线程局部变量的值,并用于后续的传递和回放操作。
- holder
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
//get 的时候调用
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
}
@Override
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
//set 的时候调用
return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
}
};
- captureTtlValues() 的实现
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>();
WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get();
//@2
for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
//@3
Object o = threadLocal.copyValue();
ttl2Value.put(threadLocal, o);
}
return ttl2Value;
}
@1holder 实际上就是一个可在父子线程之间传递的threadLocal 存储的是一个Map 类型的值, key 为TransmittableThreadLocal
-
@2 通过
holder.get()
方法获取当前线程的WeakHashMap
对象 ,该WeakHashMap
存储了当前线程的所WeakHashMap<TransmittableThreadLocal<Object>, ?> 有TransmittableThreadLocal
对象及其对应的值 比如当前线程 有2个TransmittableThreadLocal分别为TransmittableThreadLocal<String> local1 和 TransmittableThreadLocal local2
-
@3 实际上是
private T copyValue() { //get() 就是ThreadLocal.get() return copy(get()); } //copy 方法是TransmittableThreadLocal 实现了TtlCopier 接口 用来拓展,threadLocal 值传递的方法,深拷贝,还是浅拷贝 @FunctionalInterface public interface TtlCopier<T> { T copy(T parentValue); } public T copy(T parentValue) { return parentValue; }
-
captureThreadLocalValues() 的实现 实际上是抓取当前线程普通的threadLocal 值,用来增强ThreadLocal 使用的
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(); public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) { if (threadLocal instanceof TransmittableThreadLocal) { logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!"); return true; } synchronized (threadLocalHolderUpdateLock) { if (!force && threadLocalHolder.containsKey(threadLocal)) return false; WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder); newHolder.put((ThreadLocal<Object>) threadLocal, (TtlCopier<Object>) copier); threadLocalHolder = newHolder; return true; } }
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>();
for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
final TtlCopier<Object> copier = entry.getValue();
Object copy = copier.copy(threadLocal.get());
threadLocal2Value.put(threadLocal, copy);
}
return threadLocal2Value;
}
3.9.2 ==replay(captured) 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在于子线程中的上下文环境变量==
public static Object replay(@NonNull Object captured) {
final Snapshot capturedSnapshot = (Snapshot) captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
-
replayTtlValues(capturedSnapshot.ttl2Value)
用来重放并备份执行任务线程的ttl2Value
@NonNull private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) { //@1存储执行任务线程的ttl2Value HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>(); // for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) { //获取线程可能是继承来的 TransmittableThreadLocal<Object> threadLocal = iterator.next(); // backup backup.put(threadLocal, threadLocal.get()); @2 如果父线程快照中没有包含的threadLocal if (!captured.containsKey(threadLocal)) { iterator.remove(); threadLocal.superRemove(); } } // set TTL values to captured //把线程的上下文复制到当前的线程 setTtlValuesTo(captured); // call beforeExecute callback doExecuteCallback(true); return backup; }
@1用来备份执行任务线程的threadLocal
-
@2 如果
captured
不包含当前的threadLocal
,则将其从holder
中移除,并调用threadLocal.superRemove()
方法进行清理操作的目的是清除在回放线程中不再需要的TransmittableThreadLocal
。当执行回放操作时,
captured
中只包含在抓取阶段捕获的TransmittableThreadLocal
对象及其对应的值。如果在回放阶段发现某个TransmittableThreadLocal
对象在captured
中不存在,说明该对象在回放线程中不再需要使用。因此,为了避免回放线程中出现多余的
TransmittableThreadLocal
对象,需要将其从holder
中移除,并调用threadLocal.superRemove()
方法进行清理操作。这样可以确保在回放线程中只保留需要的TransmittableThreadLocal
,避免产生不必要的内存占用 (举例,执行前该线程 只有一个TransmittableThreadLocal<Integer> ,在执行中 又创建了新的TransmittableThreadLocal<String> ,实际上我们只要提交任务前的变量值)
private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
TransmittableThreadLocal<Object> threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}
当在子线程中调用父线程的 TransmittableThreadLocal
对象的 set
方法时,实际上会在子线程中创建一个新的 TransmittableThreadLocal
对象,并将父线程的值复制到子线程的对象中。
3.9.3 restore(backup) 的實現
public static void restore(@NonNull Object backup) {
final Snapshot backupSnapshot = (Snapshot) backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
// call afterExecute callback
doExecuteCallback(false);
WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get();
for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
TransmittableThreadLocal<Object> threadLocal = iterator.next();
// clear the TTL values that is not in backup
// avoid the extra TTL values after restore
//删除异步操作中新增的threadLocal 变量
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// restore TTL values
setTtlValuesTo(backup);
}
private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
final ThreadLocal<Object> threadLocal = entry.getKey();
threadLocal.set(entry.getValue());
}
}