关于@Async异步执行及TransmittableThreadLocal(TTL)使用时theadLocal数据错乱或者失效

@Async 是spring提供的非常方便的异步执行的注解,非常方便,可以指定线程池执行,但是它不是动态代理实现,也就是和其它动态代理注解(例如@Transactional)放在一起会导致动态代理失效。因为spring在拿到 @Async注解后直接委托给 AnnotationAsyncExecutionInterceptor 来执行@Async目标方法,而不是执行代理方法会走层层动态代理。
然后包装一个callable提交给TaskExecutor 来执行。
我们不会具体讨论@Async和线程池以及threadLocal的具体实现,只跟随我们的使用场景涉及到的源码

使用场景及问题
// 定义个 ttl threadLocal用于存储一些信息
    private static final TransmittableThreadLocal<FlowContext> FLOW_CONTEXT = new TransmittableThreadLocal<>();
//使用spring mvc 提供的HandlerInterceptor 接口的拦截能力做请求前放入threadLocal中一些信息 然后在 afterCompletion 时机清理掉
//如下例子
    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
        String processInstanceIdStr = request.getHeader(METADATA_FLOW_PROCESS_INSTANCE_ID);
        String processIdStr = request.getHeader(METADATA_FLOW_FIRST_PROCESS_ID);
        try {
            Long processInstanceId = null;
            Long processId = null;
            if (StringUtils.isNotBlank(processInstanceIdStr)) {
                processInstanceId = Long.valueOf(processInstanceIdStr);
            }
            if (StringUtils.isNotBlank(processIdStr)) {
                processId = Long.valueOf(processIdStr);
            }
            doFill(processInstanceId, processId);
        } catch (Exception e) {
            FLOW_CONTEXT.remove();
        }
        return true;
    }


    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        super.afterCompletion(request, response, handler, ex);
        FLOW_CONTEXT.remove();
    }

如上代码非常简单的 通过header透传一些信息。在servlet线程周期管理 theadLocal信息。但是我们在请求进来后有一些异步操作也想要获取threadLocal信息如下

    @Async
    public void test(Long a, TransmittableThreadLocal<FlowContext> local) throws InterruptedException {
        Long processId = MetadataFlowContextHandler.getFirstProcessId();
        if (MetadataFlowContextHandler.getLocal() == local) {
            System.out.println("会进入到这里");
        }
        if (!Objects.equals(a, processId)) {
            System.out.println("有问题,除了第一次都不相等");
        }
    }

在我们本地进行一次测试会发现ThreadLocal信息如预想般获取到了正确的值,但是如果你仔细测试,并发情况,或者你测试几下,然后等一会再测试就会出现错误的情况,那么下面列出了错误的情况和简略原因,然后分析一下源码

影响因素
  1. 内存足够
  2. @Aync 线程池core线程数量都已经创建
  3. @Aync 线程池任务队列没有排满
会出现theadLocal错误的情况
  1. 满足上述条件1,2,3新的请求进入就会错误
  2. 1不满足,满足2,3则可能会在子线程(也就是@Async)获取到null(未验证凭借猜想)
原理就是因为主线程在第一次传递theadLocal对象的引用给子线程后放到当前线程的threadLocalMap中,后续子线程由于线程复用会在get时先通过当前线程对象去theadLocalMap中获取缓存的值,如果获取到直接返回,那么大部分时候会一直返回第一次主线程传递过来的引用。而主线程remove是不会传递的。

为什么要满足上述3个影响因素,如果1不满足,jvm的gc会将theadLocalMap对象清理,因为他是一个弱引用 WeakReference,而TTL主线程传递给子线程时也是存入主线程的theadLocal对象weakHashMap返回,如果内存不足会清理掉后在子线程会调用ThreadLocal#setInitialValue方法委托到子类TransmittableThreadLocal#initialValue其实也是返回一个空的map就会获取失败(未验证),那么2,3也是这个道理,如果服务刚启动线程池可能会new新的thead那么主线程也一定传递正确的

@Async的执行代码

我们直接看ReflectiveMethodInvocation类进入拦截逻辑

    @Override
    @Nullable
    public Object proceed() throws Throwable {
        // We start with an index of -1 and increment early.
        if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
            return invokeJoinpoint();
        }
// 这里会获取到一个AnnotationAsyncExecutionInterceptor,它不属于动态代理,在下面不会执行其它所有动态代理了,至于这个@Async的排序是不是最前面的index,如果是后面的index其实前面的动态代理也是可以执行的,这里不详细研究了
        Object interceptorOrInterceptionAdvice =
                this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
        if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
            // Evaluate dynamic method matcher here: static part will already have
            // been evaluated and found to match.
            InterceptorAndDynamicMethodMatcher dm =
                    (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
            Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
// 这里算是将动态代理的方法作为一个适配器去匹配
            if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
// 如果匹配成功则会执行动态代理,而后又会执行到当前方法体内
                return dm.interceptor.invoke(this);
            }
            else {
                // Dynamic matching failed.
                // Skip this interceptor and invoke the next in the chain.
                return proceed();
            }
        }
        else {
            // It's an interceptor, so we just invoke it: The pointcut will have
            // been evaluated statically before this object was constructed.
// 如果不是动态代理这里直接执行目标方法拦截器,那么不会重复进入当前方法体内了,其它的动态代理会失效
            return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
        }
    }

下面是AnnotationAsyncExecutionInterceptor#invoke方法

    @Override
    @Nullable
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
        Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
        final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);

        AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 这里必须是 异步的
        if (executor == null) {
            throw new IllegalStateException(
                    "No executor specified and no default executor set on AsyncExecutionInterceptor either");
        }

        Callable<Object> task = () -> {
            try {
                Object result = invocation.proceed();
                if (result instanceof Future) {
                    return ((Future<?>) result).get();
                }
            }
            catch (ExecutionException ex) {
                handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
            }
            catch (Throwable ex) {
                handleError(ex, userDeclaredMethod, invocation.getArguments());
            }
            return null;
        };
// 提交到线程池执行
        return doSubmit(task, executor, invocation.getMethod().getReturnType());
    }

下面是 doSubmit方法

    @Nullable
    protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 这里根据返回值进行了封装,如果是CompletableFuture 则将这个callable 封装为CompletableFuture 返回给客户端自由操作
        if (CompletableFuture.class.isAssignableFrom(returnType)) {
            return CompletableFuture.supplyAsync(() -> {
                try {
                    return task.call();
                }
                catch (Throwable ex) {
                    throw new CompletionException(ex);
                }
            }, executor);
        }
// 这是 spring提供的一个可以添加监听的Future,也就是将返回值设置为ListenableFuture的子类便可以添加一些监听例如异步方法成功后,或者抛出异常的后进行一些信息收集和逻辑判断日志打印之类
// 例如 StompBrokerRelayMessageHandler#forward 方法,这是spring-messaging中的stomp协议的一个future监听实现

        else if (ListenableFuture.class.isAssignableFrom(returnType)) {
            return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
        }
// 这是一个基础的 Future封装返回
        else if (Future.class.isAssignableFrom(returnType)) {
            return executor.submit(task);
        }
        else {
// 其它返回值或者没有返回值的
            executor.submit(task);
            return null;
        }
    }

再下面进入了线程池的逻辑,为什么要知道线程池的逻辑,因为影响了ttl传递threadLocal的逻辑,因为在子线程是new的情况下会将当前主线程的threadLocal的引用传递给异步的子线程,如果是复用时则什么也不会做!那为什么你在测试代码时已经复用的线程还是好用呢,因为子线程通过弱引用的threadLocalMap保存了第一次在new Thread时的主线程threadLocal信息,你换个信息的值再试试!
/*
*分三步进行:

  • 1。如果运行的线程少于corePoolSize,则尝试
    *以给定的命令作为第一个启动一个新线程
    *任务。对addWorker的调用自动地检查runState和
  • workerCount,这样可以防止添加错误警报
    *当它不应该返回false线程。
  • 2。如果一个任务可以成功排队,那么我们仍然需要
    *再次检查我们是否应该添加一个线程
    *(因为已经存在的线程在上次检查后已经死亡)
    *池在进入该方法后关闭。所以我们
    *重新检查状态,必要时回滚排队
    *停止,或启动一个新的线程,如果没有。
  • 3。如果不能对任务进行排队,则尝试添加一个新的
    *线程。如果失败,我们就知道系统关闭或饱和了
    *等拒绝该任务。
    */
    上面复制于源码的注释,具体大家可以百度其它的文章来学习,或者直接看源码ThreadPoolExecutor#execute(Runnable command)的代码
ttl在子线程为new Thread时传递逻辑

直接看Thread#init代码

  private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
      // 省略部分代码...
        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
// 这里判断主线程是否含有inheritableThreadLocals && 当前子线程是否可以传递线程私有变量
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 子线程创建 threadLocal 并传入父线程的 threadLocalMap
            this.inheritableThreadLocals =
              ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

下面会调用ThreadLocal#ThreadLocalMap(ThreadLocalMap parentMap) -> ThreadLocal#childValue(T parentValue)

// 这是ThreadLocal 的代码,会直接报错不支持,只有InheritableThreadLocal及其子类支持,而TTL继承了InheritableThreadLocal类
T childValue(T parentValue) {
        throw new UnsupportedOperationException();
    }
// 而InheritableThreadLocal的实现如下直接返回主线程的值,虽然传递了但是客户端不容易拿到
  protected T childValue(T parentValue) {
        return parentValue;
    }
// TransmittableThreadLocal 的实现是返回一个以TransmittableThreadLocal对象为key的weakHashMap,作为InheritableThreadLocal的增强保持了弱引用的语义及传入主线程值的引用,并且可以在子线程通过TTL的get时从这个weakHashMap直接获取到从主线程传递过来的引用
    private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
        protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
            return new WeakHashMap();
        }
//这里看出TTL 不单单是将主线程的 threadLocal的值引用传递,并且将主线程的TransmittableThreadLocal 对象作为key传入到子线程的ThreadLocalMap中
        protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
            return new WeakHashMap(parentValue);
        }
    };
下面时TTL 的get方法
   @Override
    public final T get() {
// 调用父类 ThreadLocal的get
        T value = super.get();
// 这里很重要,
        if (null != value) addValue();
        return value;
    }
//  threadLocal 的get方法
    public T get() {
        Thread t = Thread.currentThread();
// 获取当前线程的 ThreadLocalMap 
        ThreadLocalMap map = getMap(t);
        if (map != null) {
// 这里的this其实是当前TransmittableThreadLocal 对象
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
// 如果没有获取到值会初始化一下新的theadLocal中的对象,先会调用子类中的initialValue然后如果ThreadLocalMap没有被回收直接返回init的值并返回,如果被回收了会create新的map,实际TTL也只会new一个空的map返回
        return setInitialValue();
    }
     private Entry getEntry(ThreadLocal<?> key) {
            int i = key.threadLocalHashCode & (table.length - 1);
// 通过threadLocal对象的hashCode从ThreadLocalMap获取到缓存的对象
            Entry e = table[i];
            if (e != null && e.get() == key)
                return e;
            else
                return getEntryAfterMiss(key, i, e);
        }

我们尽量不讨论 TTL之外的代码,上述代码是一个标准的从ThreadLocal中get对象的流程,但是TTL的get有一个addValue的操作

// 这里的逻辑是在TTL 传递childValue时的 map重新灌入的逻辑,目前还不知道为什么这样做,后续文章会仔细探讨
    private void addValue() {
        if (!holder.get().containsKey(this)) {
            holder.get().put(this, null); // WeakHashMap supports null value.
        }
    }

如上述这些代码,虽然使用TTL在new Thread时将主线程的引用灌入了子线程中,并处理业务对象本身还放入了一个weakHashMap以threadLocal对象为key,但是在get时候并没有什么不同啊,我们通过测试发现了问题后debug这里也看不出什么猫腻,但是发现就是子线程一直获取第一次传递过来的对象引用,实际实现的逻辑也没有用到TTL重写的childValue方法中构造的map,而是直接使用了InheritableThreadLocal实现的业务对象的直接引用。然后看TTL的代码中内部类Transmitter有大量的复制,重放,还原的逻辑如下

       @Nonnull
        public static Object replay(@Nonnull Object captured) {
// 这个方法一看就是在复制替换数据,实际就是在各种线程池的工作线程执行前的重放(替换threadLocal变量)具体逻辑后续文章探讨,那么看来这个动作在debug中一直没有执行,所以没有产生TTL线程私有变量的正确传递,我们看看是谁在调用它
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();
                // backup
                backup.put(threadLocal, threadLocal.get());
                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }
            // set values to captured TTL
            setTtlValuesTo(capturedMap);
            // call beforeExecute callback
            // 执行时机 为 目标方法执行前
            doExecuteCallback(true);
            return backup;
        }

看看是谁在调用这个replay方法


谁在调用replay

我们进入一个Runnable的地方TtlRunnable类,看的出来是一个装饰器做了增强,然后对目标Runnable执行前执行后对threadLocal进行了重放,还原工作 如下 TtlRunnable

    @Override
    public void run() {
        Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            restore(backup);
        }
    }

那么继续,TtlRunnable又是谁搞的呢


image.png

原来是ExecutorServiceTtlWrapper 类,另一个先忽略一看就是和定时相关的。那么这是一个ExecutorService的装饰器,也是做了增强,目的是可以使用TtlRunnable这个增强再往下看


image.png

TtlExecutors 这个类有一堆静态方法,都是返回传入目标对象返回其装饰器的方法,那就是我们在构造ExecutorService线程池时可以直接使用这个类的返回装饰器应该就可以了

    @Bean(name = "taskExecutor")
    public ExecutorService threadPoolTaskExecutor() {
// 返回装饰器
        return TtlExecutors.getTtlExecutorService(
            new ThreadPoolExecutor(50, 300, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()));
    }

经过测试没有问题,嗐,原来是自己不知道TTL还需要结合线程池的装饰器来实现threadLocal的正确传递!菜是原罪啊T.T
网上很多关于TTL的实现原理的讲解,我们后续也会通过这次经验来详细了解一下TTL的实现机制和设计思想

TransmittableThreadLocal线程间传递逻辑 - 简书 (jianshu.com)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容