从ThreadLocal 到TransmittableThreadLocal

如何处理线程上下文之间的参数透传


1,ThreadLocal 的使用场景

1.1 介绍:

ThreadLocal是Java中的一个类,可以实现在多线程环境下安全地存储和访问线程局部变量。它可以看作是一个容器,用于存储特定线程所需的数据。

在多线程编程中,每个线程都有自己的执行上下文,包括自己的栈、本地变量和程序计数器等。线程局部变量就是指在每个线程的执行上下文中的本地变量。ThreadLocal可以帮助我们在多线程环境下,为每个线程维护一个独立的局部变量副本,从而避免多线程竞争和数据混乱的问题。

1.2 原理:

  1. 每个线程维护一个ThreadLocalMap

    ThreadLocalMap的键为ThreadLocal实例,即每个线程局部变量对应的ThreadLocal对象。这是因为ThreadLocal是线程局部变量的抽象类,每个ThreadLocal实例对应着一个独立的线程局部变量。
    在ThreadLocalMap中,ThreadLocal作为键,其内部实现是一个自定义的Entry数组,Entry中存储了键值对的信息,即ThreadLocal实例作为键,对应的线程局部变量副本作为值。当线程调用get()、set()和remove()等方法时,ThreadLocalMap会根据当前线程对象找到对应的Entry,并返回或者修改其中的值。
    值得注意的是,ThreadLocal实例是弱引用,这意味着如果ThreadLocal实例没有被任何线程引用时,可能会被垃圾回收机制回收。如果ThreadLocal实例被回收后,对应的键值对也会被清除,这就是ThreadLocal内存泄漏的主要原因之一。为了避免内存泄漏,我们通常会在不需要使用ThreadLocal时显式地调用remove()方法来清除键值。

1.3 使用场景

ThreadLocal的使用场景主要涉及到需要在多线程环境下实现数据隔离的情况

  1. 线程内部的变量共享问题:当在多个线程之间需要共享一个变量时,但又要保证线程安全,此时可以使用ThreadLocal实现局部变量的隔离,每个线程都有自己的副本,互不干扰。
  2. 用户信息:在Web应用中,需要存储用户信息或者会话信息,在多线程环境下,如果不使用ThreadLocal会产生线程安全问题,此时应该使用ThreadLocal来解决问题。
  3. 线程活动的状态:有些情况下,我们需要记录或者保存线程的状态,这时使用ThreadLocal来存储线程状态是比较合适的选择。

1.4 总结

总之,如果需要在多线程环境下实现数据隔离,或者需要跨线程存储变量信息,就可以考虑使用ThreadLocal来解决问题。但需要注意,使用ThreadLocal也可能存在内存泄漏的问题,使用时需要仔细设计和管理。

2,InheritableThreadLocal 跨线程使用ThreadLocal

2.1 介绍

多数情况下不会出现跨线程的使用,如果是需要线程上下文的情景,可能会跨线程

  • InheritableThreadLocal 的介绍
InheritableThreadLocal是ThreadLocal的一个变种,它允许在父子线程之间继承值。与ThreadLocal不同的是,当一个新线程由另一个线程(即父线程)创建时,它会从父线程中的InheritableThreadLocal复制值。 
InheritableThreadLocal除了实现了ThreadLocal的功能外,还增加了子线程从父线程那里继承ThreadLocal值的功能,这种特性使其更适合在多线程应用程序中传递上下文信息。

2.2 为什么InheritableThreadLocal 父子线程可以实现 ThreadLocal 的传递

这是由线程的创建决定的

//创建线程的构造方法,其中父子线程的传递在init 实现
public Thread() {
        init(null, null, "Thread-" + nextThreadNum(), 0);
    }
  1. init 方法的实现

    private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
       ...省略其他代码,只保留核心
       //inheritThreadLocals 是有线程维护的InheritableThreadLocalcal 重写的getMap 就是获取此变量,此变量也是一个threadLocalMap
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
        //
            this.inheritableThreadLocals =
    //再此实现父ttl 的值复制        ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
         
    }
    
    //创建map
    static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
            return new ThreadLocalMap(parentMap);
        }
        //复值ttl 核心代码
        private ThreadLocalMap(ThreadLocalMap parentMap) {
                Entry[] parentTable = parentMap.table;
                int len = parentTable.length;
                setThreshold(len);
                table = new Entry[len];
    
                for (int j = 0; j < len; j++) {
                    Entry e = parentTable[j];
                    if (e != null) {
                        @SuppressWarnings("unchecked")
                        ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                        if (key != null) {
                        //childValue 就是 InheritableThreadLocalcal
                        重写的方法,返回父线程的值
                            Object value = key.childValue(e.value);
                            Entry c = new Entry(key, value);
                            int h = key.threadLocalHashCode & (len - 1);
                            while (table[h] != null)
                                h = nextIndex(h, len);
                            table[h] = c;
                            size++;
                        }
                    }
                }
            }
    

2.3 代码演示

2.3.1 普通ThreadLocal 的使用,用父子线程的值传递

@Test
public void test1() {

    ThreadLocal<String> local = new ThreadLocal<>();
    local.set("init");
    log.info("线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
    //子线程获取
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            log.info("子线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
        }
    });
    thread.start();
}
//
18:04:48.220 [main] INFO com.h3c.mp.thread.ThreadTest3 - 线程:main获取threadLocal:init 值
18:04:48.235 [Thread-0] INFO com.h3c.mp.thread.ThreadTest3 - 子线程:Thread-0获取threadLocal:null

2.3.2 InheritableThreadLocalcal 的演示与使用

 @Test
    public void test2() {
        InheritableThreadLocal<String> local = new InheritableThreadLocal<>();
        local.set("init");
        log.info("线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());
        //子线程获取
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                log.info("子线程:{}获取threadLocal:{} 值", Thread.currentThread().getName(), local.get());

            }
        });
        thread.start();
    }
18:08:20.291 [main] INFO com.h3c.mp.thread.ThreadTest3 - 线程:main获取threadLocal:init 值
18:08:20.345 [Thread-0] INFO com.h3c.mp.thread.ThreadTest3 - 子线程:Thread-0获取threadLocal:init 值

2.4 总结

因此 InheritableThreadLocal 可以用来做父子线程的值参数传递

但是 由源码可以看出,真正进行值传递时,是在线程创建的时候触发执行的,所以在使用线程池的时候,当发生线程复用时,可能造成值获取错乱

3 , Ttl 技术的使用

3.1 介绍

JDKInheritableThreadLocal类可以完成父线程到子线程的值传递。但对于使用线程池等会池化复用线程的执行组件的情况,线程由线程池创建好,并且线程是池化起来反复使用的;**这时父子线程关系的ThreadLocal值传递已经没有意义,应用需要的实际上是把 任务提交给线程池时的ThreadLocal值传递到 任务执行时****。

3.2 原理

3.2.1 简单模拟

  public final class DelegatingContextRunnable implements Runnable {

    private final Runnable delegate;

    private final Optional<String> delegateContext;

    public DelegatingContextRunnable(Runnable delegate,
                                       Optional<String> context) {
        assert delegate != null;
        assert context != null;

        this.delegate = delegate;
        this.delegateContext = context;
    }

    public DelegatingContextRunnable(Runnable delegate) {
        // 修饰原有的任务,并保存当前线程的值
        this(delegate, ContextHolder.get());
    }

    public void run() {
        Optional<String> originalContext = ContextHolder.get();

        try {
            ContextHolder.set(delegateContext);
            delegate.run();
        } finally {
            ContextHolder.set(originalContext);
        }
    }
}

public final void execute(Runnable task) {
  // 递交给真正的执行线程池前,对任务进行修饰
  executor.execute(wrap(task));
}

protected final Runnable wrap(Runnable task) {
  return new DelegatingContextRunnable(task);
}

总结: 实际上通过三部,

  1. 获取提交任务时的线程ttl
  2. 重放提交任務時的threadLocal
  3. 恢复执行任务是线程的threadLocal

3.3 Tt l 技术的实现

3.3.1 github 地址

https://github.com/alibaba/transmittable-thread-local

3.3.2 maven 依赖

         <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>transmittable-thread-local</artifactId>
            <version>2.14.2</version>
         </dependency>

3.3.3 核心类 TransmittableThreadLocal

使用类TransmittableThreadLocal来保存值,并跨线程池传递。

TransmittableThreadLocal继承InheritableThreadLocal,使用方式也类似。相比InheritableThreadLocal,添加了protectedtransmitteeValue()方法,用于定制 任务提交给线程池时ThreadLocal值传递到 任务执行时 的传递方式,缺省是简单的赋值传递。

3.3.4 简单使用

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

// =====================================================

// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();

3.3.5 线程池中传值

  • 修饰Runnable

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);

// =====================================================

// Task中可以读取,值是"value-set-in-parent"
String value = context.get();
  • 修饰线程池

    省去每次RunnableCallable传入线程池时的修饰,这个逻辑可以在线程池中完成。

    通过工具类TtlExecutors完成,有下面的方法:

    • getTtlExecutor:修饰接口Executor
    • getTtlExecutorService:修饰接口ExecutorService
    • getTtlScheduledExecutorService:修饰接口ScheduledExecutorService
 ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);

// =====================================================

// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();

3.4 TtlRunnable 的实现

3.4.1 TtlRunnable 的 核心源码

//省略非核心代码
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
   //存放上下文
    private final AtomicReference<Object> capturedRef;
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;

    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
       this.capturedRef = new AtomicReference<Object>( capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
     }

    /**
     * wrap method {@link Runnable#run()}.
     */
    @Override
    public void run() {

        //@2去出任务提交时的线程上下文
        final Object captured = capturedRef.get();
        //
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        //@3  重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量
        final Object backup = replay(captured);
        try {

            runnable.run();
        } finally {
            //@4恢复线程池中当前执行任务的线程的上下文环境 ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复
            restore(backup);

        }
    }
    }
  • captured == null:这个条件检查了保存的上下文引用是否为null。如果为null,意味着没有保存的上下文,可能是因为没有正确捕获上下文或者上下文已经被释放。
  • releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null):这个条件判断了是否需要在任务执行后释放保存的上下文引用,并尝试将保存的上下文引用设为null。如果releaseTtlValueReferenceAfterRuntrue且无法将保存的上下文引用设为null,意味着上下文引用在任务执行后已经被释放或被其他线程修改。

如果上述任何一个条件判断为true,则会抛出IllegalStateException异常,表示保存的上下文引用在任务执行后已经被释放,出现了异常情况。

这个检查的目的是确保在任务执行前保存的上下文引用在执行期间是有效的,以避免在无效的上下文环境中执行任务导致的问题。

  • ==capture()方法的话,这里的操作其实就是将父线程的TTL变量集合生成相应的快照记录,并随着任务创建包装的时候,保存到生成的 AtomicReference<Object> capturedRef;,由此实现了异步线程在线程池下的变量传递==。

  • ==replay(captured) 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量==

  • ==restore(backup) 随后恢复线程的任务至初始的备份状态==;

  • ==capture() 和剩余的replay 和restore ,大多数不在一个线程,==

3.5 TransmittableThreadLocal 的实现与核心原理

3.5.1 核心holder

// Note about the holder:
    // 1. holder self is a InheritableThreadLocal(a *ThreadLocal*).
    // 2. The type of value in the holder is WeakHashMap<TransmittableThreadLocal<Object>, ?>.
    //    2.1 but the WeakHashMap is used as a *Set*:
    //        the value of WeakHashMap is *always* null, and never used.
    //    2.2 WeakHashMap support *null* value.
     private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
            new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                    //创建一个初始为null的set集合
                    return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
                }

                @Override
                protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                     return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
                }
            };

==这段代码创建了一个匿名内部类,并通过重写initialValue()childValue()方法来指定InheritableThreadLocal的初始值和子线程值的生成方式==

  • initialValue()方法:这个方法在获取线程本地变量的初始值时被调用。在这里,它返回一个WeakHashMap<TransmittableThreadLocal<Object>, ?>对象,作为初始值存储在当前线程中。
  • childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue)方法:这个方法在父线程中设置了线程本地变量后,在子线程中获取该变量时被调用。它接收父线程中的值作为参数,并返回一个新的WeakHashMap<TransmittableThreadLocal<Object>, ?>对象,将父线程的值复制到子线程中。

这个holder变量的作用是在线程之间传递TransmittableThreadLocal对象和其对应的值的映射关系。通过使用InheritableThreadLocal,它可以确保在父线程中设置的TransmittableThreadLocal对象和值在子线程中可以继承和访问。这对于实现线程上下文传递非常有用,可以在不同线程之间共享上下文信息

3.5.2 ==holder 的 作用及原理==

  • TransmittableThreadLocal set 方法
@Override
public final void set(T value) {
    if (!disableIgnoreNullValueSemantics && null == value) {
        // may set null to remove value
        remove();
    } else {
        super.set(value);
        addThisToHolder();
    }
}
  • TransmittableThreadLocal 的方法

    public final T get() {
        T value = super.get();
         if (disableIgnoreNullValueSemantics || null != value) {
    
            addThisToHolder();
        }
        return value;
    }
    
  • ==addThisToHolder==

    private void addThisToHolder() {
        if (!holder.get().containsKey(this)) {
            WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get();
            holder.get().put((TransmittableThreadLocal<Object>) this, null); // WeakHashMap supports null value.
        }
    }
    
  • holder.get():这个语句获取了当前线程中存储的WeakHashMap<TransmittableThreadLocal<Object>, ?>对象,它是holder的值,用于存储TransmittableThreadLocal对象和其对应的值的映射关系。

  • !holder.get().containsKey(this):这个条件判断检查当前的TransmittableThreadLocal对象是否已经存在于holder中。如果不存在,则执行后续操作。

  • transmittableThreadLocalWeakHashMap:这个变量引用了当前线程中存储的WeakHashMap<TransmittableThreadLocal<Object>, ?>对象,方便后续操作。

  • holder.get().put((TransmittableThreadLocal<Object>) this, null):这行代码将当前的TransmittableThreadLocal对象作为键,null作为值,添加到holder中。WeakHashMap支持null值。

这个方法的作用是将当前的TransmittableThreadLocal对象添加到holder中,确保线程在使用该对象时可以从holder中获取对应的值。通过containsKey()方法判断对象是否已存在于holder中,避免重复添加。

  • holder存储的是所有线程在运行过程中的TransmittableThreadLocal对象和其对应的值的映射关系。

    在使用TransmittableThreadLocal进行线程间传递时,每个线程都可以通过holder来获取自己线程中存储的TransmittableThreadLocal对象和值的映射关系。这样就可以在不同的线程中获取和设置相应的上下文信息。

    当一个线程设置了TransmittableThreadLocal对象和值后,它会将该对象添加到holder中,以便其他线程可以从holder中获取到相应的对象和值。这样就实现了线程间的上下文传递和共享。

    需要注意的是,holder中使用WeakHashMap来存储映射关系。WeakHashMap的特性是当TransmittableThreadLocal对象没有被其他对象引用时,即没有强引用存在时,该映射关系会被自动清除,从而避免内存泄漏。这样可以确保只有活跃的线程和相关的TransmittableThreadLocal对象被保留在holder中。

3.6 ExecutorTtlWrapper 的实现

3.6.1 核心源码

class ExecutorTtlWrapper implements Executor, TtlWrapper<Executor>, TtlEnhanced {
    private final Executor executor;
    protected final boolean idempotent;

    ExecutorTtlWrapper(@NonNull Executor executor, boolean idempotent) {
        this.executor = executor;
        this.idempotent = idempotent;
    }

    @Override
    public void execute(@NonNull Runnable command) {
        executor.execute(TtlRunnable.get(command, false, idempotent));
    }

    @Override
    @NonNull
    public Executor unwrap() {
        return executor;
    }

}

3.6.2 使用

public final class TtlExecutors {

public static Executor getTtlExecutor(@Nullable Executor executor) {
    if (TtlAgent.isTtlAgentLoaded() || null == executor || executor instanceof TtlEnhanced) {
        return executor;
    }
    return new ExecutorTtlWrapper(executor, true);
}
   //...省略 
}

//实际还是修饰runnable

public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
    if (null == runnable) return null;

    if (runnable instanceof TtlEnhanced) {
        // avoid redundant decoration, and ensure idempotency
        if (idempotent) return (TtlRunnable) runnable;
        else throw new IllegalStateException("Already TtlRunnable!");
    }
    return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}

3.7 CRR 模式的介绍

  • TtlRunnable 的 run

    @Override
    public void run() {
    
        //@2去出任务提交时的线程上下文
        final Object captured = capturedRef.get();
        //
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        //@3  重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在与子线程中的上下文环境变量
        final Object backup = replay(captured);
        try {
    
            runnable.run();
        } finally {
            //@4恢复线程池中当前执行任务的线程的上下文环境, ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复
            restore(backup);
    
        }
    }
    

上下文的传递流程或说生命周期可以规范化成:捕捉、回放和恢复这3个操作

  • 框架/中间件集成TTL传递,通过TransmittableThreadLocal.Transmitter 抓取当前线程的所有TTL值并在其他线程进行回放;在回放线程执行完业务操作后,恢复为回放线程原来的TTL值。

    TransmittableThreadLocal.Transmitter提供了所有TTL值的抓取、回放和恢复方法(即CRR操作):

    1. capture方法:抓取线程(线程A)的所有TTL值。

    2. replay方法:在另一个线程(线程B)中,回放在capture方法中抓取的TTL值,并返回 回放前TTL值的备份

    3. restore方法:恢复线程B执行replay方法之前的TTL值(即备份)

3.7.1线程池的拒绝策略

  • 线程池的拒绝策略

    1. AbortPolicy(默认策略):当线程池无法接受新任务时,会直接抛出RejectedExecutionException异常,拒绝执行新的任务。
    2. CallerRunsPolicy:当线程池无法接受新任务时,会使用提交任务的线程来执行该任务。也就是说,如果线程池饱和了,新任务会由提交任务的线程直接执行,这样可以避免任务丢失,但是会影响提交任务的线程的性能。
    3. DiscardPolicy:当线程池无法接受新任务时,会直接丢弃这个任务,不会抛出任何异常。如果对任务的执行结果没有特殊要求,且不关心任务是否被执行,可以选择此策略。
    4. DiscardOldestPolicy:当线程池无法接受新任务时,会丢弃线程池中最早被提交的任务,然后尝试再次提交新任务。

    除了上述四种常见的拒绝策略外,Java中的线程池还提供了一种可自定义的拒绝策略:

    1. 自定义拒绝策略:可以实现RejectedExecutionHandler接口,并自定义拒绝策略的具体行为。可以根据实际需求,自定义拒绝策略来处理被拒绝的任务。例如,可以将被拒绝的任务重新放入队列等待执行,或者记录日志等。

    在使用线程池时,可以根据实际需求选择合适的拒绝策略,以便对被拒绝的任务进行适当的处理。

    3.7.2 线程池的执行流程

    [图片上传失败...(image-68a131-1688372604519)]

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
context.set("value-set-in-parent");

// (1) 抓取当前线程的所有TTL值
final Object captured = TransmittableThreadLocal.Transmitter.capture();

// ===========================================================================
// 线程 B(异步线程)
// ===========================================================================

// (2) 在线程 B中回放在capture方法中抓取的TTL值,并返回 回放前TTL值的备份
final Object backup = TransmittableThreadLocal.Transmitter.replay(captured);
try {
    // 你的业务逻辑,这里你可以获取到外面设置的TTL值
    String value = context.get();

    System.out.println("Hello: " + value);
    ...
    String result = "World: " + value;
} finally {
    // (3) 恢复线程 B执行replay方法之前的TTL值(即备份)
    TransmittableThreadLocal.Transmitter.restore(backup);
}

3.7.2为甚麽要replay (回放线程值)

  • 在回放线程B执行replay方法之前的TTL值时,是为了确保线程B在执行业务逻辑时能够使用原始的TTL值。这是因为在回放过程中,TTL值被修改为线程A中的值,而不是线程B的原始值。

3.7.3为什么要restore

  1. 当线程B执行replay方法时,它会从线程A中回放TTL值。这样做是为了使线程B能够访问到在线程A中设置的TTL值,以便在执行业务逻辑时使用。然而,一旦回放完成,线程B可能需要恢复到它自己的原始TTL值,以确保不会影响后续的操作或其他线程。
  2. 通过调用TransmittableThreadLocal.Transmitter的restore方法,可以将回放之前捕获的TTL值恢复回线程B的原始状态。这样,线程B在执行完业务逻辑后,就可以继续使用它自己的TTL值,而不会受到线程A中设置的TTL值的影响。
  3. 回复以恢复线程B执行replay方法之前的TTL值是为了确保线程的隔离性和正确性。它允许每个线程在自己的上下文中执行,并在需要的时候共享TTL值,而不会相互干扰或造成不一致的状态。这对于需要在线程之间传递上下文或状态信息的场景非常重要,以保持正确的业务逻辑执行和数据一致性。

3.7.4 backup 什么时候有值

  • 典型的业务场景下,replay操作的线程,与来源的capture线程,是不同的。
  • capture的线程 在 业务中立的线程池 时,这样的线程 往往 也没有/不需要 有上下文。

这2个前提成立时,backup往往 不会有值。

当上面2点不成立时,如

  • 上面提到的场景,线程池满了 且 线程池使用的是CallerRunsPolicy
    则 提交到线程池的任务 在capture线程直接执行,也就是 直接在业务线程中同步执行;
  • 使用ForkJoinPool(包含并行执行StreamCompletableFuture,底层使用ForkJoinPool)的场景,展开的ForkJoinTask会在调用线程中直接执行。

这时 backup是有值的,如果不做restore backup业务线程里的上下文就丢了,
业务后续的执行就会有Bug

3.8 时序图

[图片上传失败...(image-28f06b-1688372604519)]

3.9 CRR 核心源码

public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
    private final AtomicReference<Object> capturedRef;
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;

    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
       //@1 捕捉上下文
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
     }

    /**
     * wrap method {@link Runnable#run()}.
     */
    @Override
    public void run() {

        //@2去出任务提交时的线程上下文
        final Object captured = capturedRef.get();
        //
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        //@3  重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在于子线程中的上下文环境变量
        final Object backup = replay(captured);
        try {

            runnable.run();
        } finally {
            //@4恢复线程池中当前执行任务的线程的上下文环境, ,会直接继承父线程中的上下文环境,但会将原先存在该线程的线程上下文环境进行备份,在任务执行完后通过执行restore方法进行恢复
            restore(backup);

        }
    }
    }

3.9.1 ==capture() 方法的实现(此过程逻辑实际发生在提交任务的线程)==

快照对象 (实际上是对父线程的 ttl 的一个快照 )

  private static class Snapshot {
            final HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value;
            final HashMap<ThreadLocal<Object>, Object> threadLocal2Value;
            private Snapshot(HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value, HashMap<ThreadLocal<Object>, Object> threadLocal2Value) {
                this.ttl2Value = ttl2Value;
                this.threadLocal2Value = threadLocal2Value;
            }
        }

Snapshot 类包含两个字段:

  1. ttl2Value:一个 HashMap 对象,用于存储 TransmittableThreadLocal 对象及其对应的值。TransmittableThreadLocal 是一个特殊的线程局部变量,可以在线程间传递值。
  2. threadLocal2Value:一个 HashMap 对象,用于存储普通的 ThreadLocal 对象及其对应的值。

这个 Snapshot 类主要用于在多线程环境下保存线程局部变量的快照,以便后续在其他线程中回放这些值。它将 TransmittableThreadLocal 和普通的 ThreadLocal 对象及其对应的值存储在两个 HashMap 中,以便在需要时进行检索和恢复。

通常情况下,Snapshot 对象会在线程切换时创建,以捕获当前线程的线程局部变量的值,并用于后续的传递和回放操作。

  • holder
    private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder =
        new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
                //get 的时候调用
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>();
            }

            @Override
            protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
                //set 的时候调用
                return new WeakHashMap<TransmittableThreadLocal<Object>, Object>(parentValue);
            }
        };
  • captureTtlValues() 的实现
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
    HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap<TransmittableThreadLocal<Object>, Object>();
    WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get();
    //@2
    for (TransmittableThreadLocal<Object> threadLocal : holder.get().keySet()) {
    //@3
        Object o = threadLocal.copyValue();
        ttl2Value.put(threadLocal, o);
    }
    return ttl2Value;
}
  1. @1holder 实际上就是一个可在父子线程之间传递的threadLocal 存储的是一个Map 类型的值, key 为TransmittableThreadLocal

  2. @2 通过 holder.get() 方法获取当前线程的 WeakHashMap 对象 ,该 WeakHashMap 存储了当前线程的所WeakHashMap<TransmittableThreadLocal<Object>, ?> 有 TransmittableThreadLocal 对象及其对应的值 比如当前线程 有2个TransmittableThreadLocal

    分别为TransmittableThreadLocal<String> local1 和 TransmittableThreadLocal local2

  3. @3 实际上是

       private T copyValue() {
           //get() 就是ThreadLocal.get()
            return copy(get());
        }
    //copy 方法是TransmittableThreadLocal 实现了TtlCopier 接口 用来拓展,threadLocal 值传递的方法,深拷贝,还是浅拷贝
    @FunctionalInterface
    public interface TtlCopier<T> {
        
        T copy(T parentValue);
    }
    
       public T copy(T parentValue) {
            return parentValue;
        }
    
  • captureThreadLocalValues() 的实现 实际上是抓取当前线程普通的threadLocal 值,用来增强ThreadLocal 使用的

    private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> 
    threadLocalHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>();
    
      public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, 
                                                    @NonNull TtlCopier<T> copier, boolean force) {
                if (threadLocal instanceof TransmittableThreadLocal) {
                    logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!");
                    return true;
                }
                synchronized (threadLocalHolderUpdateLock) {
                    if (!force && threadLocalHolder.containsKey(threadLocal)) return false;
    
                    WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>>(threadLocalHolder);
                    newHolder.put((ThreadLocal<Object>) threadLocal, (TtlCopier<Object>) copier);
                    threadLocalHolder = newHolder;
                    return true;
                }
            }
    

private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
    final HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap<ThreadLocal<Object>, Object>();
    for (Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry : threadLocalHolder.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        final TtlCopier<Object> copier = entry.getValue();
        Object copy = copier.copy(threadLocal.get());

        threadLocal2Value.put(threadLocal, copy);
    }
    return threadLocal2Value;
}

3.9.2 ==replay(captured) 重放父线程的本地环境变量,即使用从父线程中捕获过来的上下文环境,在子线程中重新执行一遍,并返回原先存在于子线程中的上下文环境变量==

public static Object replay(@NonNull Object captured) {
    final Snapshot capturedSnapshot = (Snapshot) captured;
    return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
  • replayTtlValues(capturedSnapshot.ttl2Value)

    用来重放并备份执行任务线程的ttl2Value

    @NonNull
    private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
    //@1存储执行任务线程的ttl2Value 
        HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap<TransmittableThreadLocal<Object>, Object>();
         // 
        for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        //获取线程可能是继承来的
            TransmittableThreadLocal<Object> threadLocal = iterator.next();
    
            // backup
            backup.put(threadLocal, threadLocal.get());
     
            @2 如果父线程快照中没有包含的threadLocal
            if (!captured.containsKey(threadLocal)) {
                iterator.remove();
                threadLocal.superRemove();
            }
        }
    
        // set TTL values to captured
        //把线程的上下文复制到当前的线程
        setTtlValuesTo(captured);
    
        // call beforeExecute callback
        doExecuteCallback(true);
    
        return backup;
    }
    
  1. @1用来备份执行任务线程的threadLocal

  2. @2 如果 captured 不包含当前的 threadLocal,则将其从 holder 中移除,并调用 threadLocal.superRemove() 方法进行清理操作的目的是清除在回放线程中不再需要的 TransmittableThreadLocal

    当执行回放操作时,captured 中只包含在抓取阶段捕获的 TransmittableThreadLocal 对象及其对应的值。如果在回放阶段发现某个 TransmittableThreadLocal 对象在 captured 中不存在,说明该对象在回放线程中不再需要使用。

    因此,为了避免回放线程中出现多余的 TransmittableThreadLocal 对象,需要将其从 holder 中移除,并调用 threadLocal.superRemove() 方法进行清理操作。这样可以确保在回放线程中只保留需要的 TransmittableThreadLocal,避免产生不必要的内存占用 (举例,执行前该线程 只有一个TransmittableThreadLocal<Integer> ,在执行中 又创建了新的TransmittableThreadLocal<String> ,实际上我们只要提交任务前的变量值)

private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
    for (Map.Entry<TransmittableThreadLocal<Object>, Object> entry : ttlValues.entrySet()) {
        TransmittableThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}

当在子线程中调用父线程的 TransmittableThreadLocal 对象的 set 方法时,实际上会在子线程中创建一个新的 TransmittableThreadLocal 对象,并将父线程的值复制到子线程的对象中。

3.9.3 restore(backup) 的實現

public static void restore(@NonNull Object backup) {
    final Snapshot backupSnapshot = (Snapshot) backup;
    restoreTtlValues(backupSnapshot.ttl2Value);
    restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
    // call afterExecute callback
    doExecuteCallback(false);
    WeakHashMap<TransmittableThreadLocal<Object>, ?> transmittableThreadLocalWeakHashMap = holder.get();
    for (final Iterator<TransmittableThreadLocal<Object>> iterator = holder.get().keySet().iterator(); iterator.hasNext(); ) {
        TransmittableThreadLocal<Object> threadLocal = iterator.next();

        // clear the TTL values that is not in backup
        // avoid the extra TTL values after restore
        //删除异步操作中新增的threadLocal 变量
        if (!backup.containsKey(threadLocal)) {
            iterator.remove();
            threadLocal.superRemove();
        }
    }

    // restore TTL values
    setTtlValuesTo(backup);
}


private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
    for (Map.Entry<ThreadLocal<Object>, Object> entry : backup.entrySet()) {
        final ThreadLocal<Object> threadLocal = entry.getKey();
        threadLocal.set(entry.getValue());
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容