TransmittableThreadLocal—父子线程间线程本地变量
问题:若是在线程中开启子线程,那么子线程也会存在一个ThreadLocalMap对象,但是它不存在父线程ThreadLocalMap对象中的值。
@Slf4jpublicclasstestThreadLocal{@TestpublicvoidtestThreadLocal2()throws InterruptedException{ThreadLocal<String>local=newThreadLocal<>();try{local.set("我是主线程");ExecutorServiceexecutorService=Executors.newFixedThreadPool(1);CountDownLatchc1=newCountDownLatch(1);CountDownLatchc2=newCountDownLatch(1);executorService.execute(()->{System.out.println("线程1"+local.get());c1.countDown();});c1.await();executorService.execute(()->{System.out.println("线程2"+local.get());c2.countDown();});c2.await();executorService.shutdownNow();}finally{//使用完毕,清除线程中ThreadLocalMap中的key。local.remove();}}}
可继承的InheritableThreadLocal
@Slf4jpublicclasstestThreadLocal{@TestpublicvoidtestInheritableThreadLocal()throws InterruptedException{ThreadLocal<String>local=newInheritableThreadLocal<>();local.set("我是主线程");newThread(()->{System.out.println("子线程1"+local.get());}).start();Thread.sleep(2000);}}
最终解决方案TransmittableThreadLocal
阿里巴巴开源解决方案:https://github.com/alibaba/transmittable-thread-local
alibaba提供了一系列的包装类,可以对ExecutorServic及其子类进行装饰,由TtlExecutors类实现。
@TestpublicvoidtestInheritableThreadLocal3()throws InterruptedException{TransmittableThreadLocal<String>local=newTransmittableThreadLocal<>();local.set("我是主线程");//生成额外的代理ExecutorServiceexecutorService=Executors.newFixedThreadPool(1);//核心装饰代码executorService=TtlExecutors.getTtlExecutorService(executorService);CountDownLatchc1=newCountDownLatch(1);CountDownLatchc2=newCountDownLatch(1);executorService.submit(()->{System.out.println("我是线程1:"+local.get());c1.countDown();});c1.await();local.set("修改主线程");System.out.println(local.get());executorService.submit(()->{System.out.println("我是线程2:"+local.get());c2.countDown();});c2.await();}
agent实现
需配置启动参数-javaagent:path/to/transmittable-thread-local-2.x.x.jar
@TestpublicvoidtestTTL()throws InterruptedException{TransmittableThreadLocal<String>local=newTransmittableThreadLocal<>();local.set("我是主线程");//生成额外的代理CountDownLatchc1=newCountDownLatch(1);CountDownLatchc2=newCountDownLatch(1);CompletableFuture.supplyAsync(()->{System.out.println("开启了线程1:"+local.get());c1.countDown();return"线程1的值";});c1.await();local.set("修改主线程");System.out.println(local.get());CompletableFuture.supplyAsync(()->{System.out.println("开启了线程2:"+local.get());c1.countDown();return"线程2的值";});c2.await();System.out.println("我是主线程:"+local.get());}
TransmittableThreadLocal源码分析:
TransmittableThreadLocal设计思路如下:总得来说,就是在将异步任务派发给线程池时,对其做一下上下文传递的处理。
第一步:主线程获取上下文,传递给任务暂存。
1 之后的操作都将是异步执行线程操作的。
第二步:异步执行线程将原有上下文取出,暂时保存。并将主线程传递过来的上下文设置。
第三步:执行异步任务
第四步:将原有上下文设置回去。
privatestaticThreadLocal<String>contextHolder=newThreadLocal<>();publicstatic<T>CompletableFuture<T>invokeToCompletableFuture(Supplier<T>supplier,String errorMessage){// 第一步String context=contextHolder.get();Supplier<T>newSupplier=()->{// 第二步String origin=contextHolder.get();try{contextHolder.set(context);// 第三步returnsupplier.get();}finally{// 第四步contextHolder.set(origin);log.info(origin);}};returnCompletableFuture.supplyAsync(newSupplier).exceptionally(e->{thrownewServerErrorException(errorMessage,e);});}// test codepublicstaticvoidmain(String[]args)throws ExecutionException,InterruptedException{contextHolder.set("main");log.info(contextHolder.get());CompletableFuture<String>context=invokeToCompletableFuture(()->test.contextHolder.get(),"error");log.info(context.get());}
TransmittableThreadLocal核心类分析:
其使用方法本质上与上述提到的 CallableWrapper 和 DelegatingExecutor 是一样的,并且为了方便使用,对外提供了静态工厂方法或工具类。
整体主要是三个部分:任务( TtlCallable )、线程池( ExecutorServiceTtlWrapper )、ThreadLocal( TransmittableThreadLocal )。其实对应上述讲到的 CallableWrapper、DelegatingExecutor、InheritableThreadLocal。
精简版本的transmittalbeThreadLocal
因为TTL是继承自ITL,所以在线程池中的线程被创建的时候,会自动继承,且会一直存在,个人感觉没有必要,下面是TTL的精简版代码,继承自TL,但实现原理一致。
publicclassHTransmittableThreadLocal<T>extendsThreadLocal<T>{/** * 保存需要具备传播性的 HTransmittableThreadLocal */privatestaticfinalThreadLocal<WeakHashMap<HTransmittableThreadLocal<Object>,?>>holder=ThreadLocal.withInitial(WeakHashMap::new);@OverridepublicfinalTget(){Tvalue=super.get();addThisToHolder();returnvalue;}@Overridepublicfinalvoidset(Tvalue){super.set(value);addThisToHolder();}@Overridepublicfinalvoidremove(){removeThisFromHolder();super.remove();}privatevoidsuperRemove(){super.remove();}@SuppressWarnings("unchecked")privatevoidaddThisToHolder(){if(!holder.get().containsKey(this)){holder.get().put((HTransmittableThreadLocal<Object>)this,null);}}privatevoidremoveThisFromHolder(){holder.get().remove(this);}/** * 留给子类实现以实现值传递,默认是引用传递。在 {@link #capture()} 被调用时使用 */protectedTcopyValue(TparentValue){returnparentValue;}privatestaticclassSnapshot{finalHashMap<HTransmittableThreadLocal<Object>,Object>hTtlValue;privateSnapshot(HashMap<HTransmittableThreadLocal<Object>,Object>hTtlValue){this.hTtlValue=hTtlValue;}}/** * 在创建子线程任务时调用,以捕获当前线程需要传播的值 */publicstaticObjectcapture(){HashMap<HTransmittableThreadLocal<Object>,Object>hTtlValue=newHashMap<>();for(HTransmittableThreadLocal<Object>threadLocal:holder.get().keySet()){hTtlValue.put(threadLocal,threadLocal.copyValue(threadLocal.get()));}returnnewSnapshot(hTtlValue);}/** * 在子线程的任务执行前进行调用,将父线程捕获的需要传播的值在子线程进行回放,同时返回子线程中需要传播的值 */publicstaticObjectreplay(Objectcaptured){finalSnapshotcapturedSnapshot=(Snapshot)captured;returnnewSnapshot(replayTtlValues(capturedSnapshot.hTtlValue));}privatestaticHashMap<HTransmittableThreadLocal<Object>,Object>replayTtlValues(HashMap<HTransmittableThreadLocal<Object>,Object>captured){HashMap<HTransmittableThreadLocal<Object>,Object>backup=newHashMap<>();for(finalIterator<HTransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator();iterator.hasNext();){HTransmittableThreadLocal<Object>threadLocal=iterator.next();// 备份子线程的TTLbackup.put(threadLocal,threadLocal.get());// 如果holder中的TTL不在父线程的TTL中,则进行holder的清理if(!captured.containsKey(threadLocal)){iterator.remove();threadLocal.superRemove();}}setTtlValuesTo(captured);returnbackup;}/** * 在子线程的任务执行后进行调用,将子线程需要传播的值进行恢复 */publicstaticvoidrestore(Objectbackup){finalSnapshotbackupSnapshot=(Snapshot)backup;restoreTtlValues(backupSnapshot.hTtlValue);}privatestaticvoidrestoreTtlValues(HashMap<HTransmittableThreadLocal<Object>,Object>backup){for(finalIterator<HTransmittableThreadLocal<Object>>iterator=holder.get().keySet().iterator();iterator.hasNext();){HTransmittableThreadLocal<Object>threadLocal=iterator.next();// 如果holder中的TTL不在之前子线程备份的TTL中,则进行holder的清理if(!backup.containsKey(threadLocal)){iterator.remove();threadLocal.superRemove();}}setTtlValuesTo(backup);}/** * 为TTL赋值 */privatestaticvoidsetTtlValuesTo(HashMap<HTransmittableThreadLocal<Object>,Object>hTtlValues){for(Map.Entry<HTransmittableThreadLocal<Object>,Object>entry:hTtlValues.entrySet()){finalHTransmittableThreadLocal<Object>threadLocal=entry.getKey();threadLocal.set(entry.getValue());}}}publicclassHTtlRunnableimplementsRunnable{privatefinalAtomicReference<Object>capturedRef;privatefinalRunnablerunnable;publicHTtlRunnable(Runnablerunnable){this.capturedRef=newAtomicReference<Object>(capture());this.runnable=runnable;}@Overridepublicvoidrun(){finalObjectcaptured=capturedRef.get();finalObjectbackup=replay(captured);try{runnable.run();}finally{restore(backup);}}}
publicclassHTtlTest{publicstaticfinalHTransmittableThreadLocal<String>hTtl=newHTransmittableThreadLocal<>();publicstaticvoidmain(String[]args)throwsException{ExecutorServicethreadPool=Executors.newSingleThreadExecutor();// 模拟第一次开启子任务hTtl.set("hTtl-value-1");printThreadIdAndContext();threadPool.submit(newHTtlRunnable(HTtlTest::printThreadIdAndContext)).get();printThreadIdAndContext();System.out.println("----------------------");// 模拟第二次开启子任务hTtl.set("hTtl-value-2");printThreadIdAndContext();threadPool.submit(newHTtlRunnable(HTtlTest::printThreadIdAndContext)).get();printThreadIdAndContext();}privatestaticvoidprintThreadIdAndContext(){System.out.printf("【%2d】:%s%n",Thread.currentThread().getId(),hTtl.get());}}