@Async 是spring提供的非常方便的异步执行的注解,非常方便,可以指定线程池执行,但是它不是动态代理实现,也就是和其它动态代理注解(例如@Transactional)放在一起会导致动态代理失效。因为spring在拿到 @Async注解后直接委托给 AnnotationAsyncExecutionInterceptor 来执行@Async目标方法,而不是执行代理方法会走层层动态代理。
然后包装一个callable提交给TaskExecutor 来执行。
我们不会具体讨论@Async和线程池以及threadLocal的具体实现,只跟随我们的使用场景涉及到的源码
使用场景及问题
// 定义个 ttl threadLocal用于存储一些信息
private static final TransmittableThreadLocal<FlowContext> FLOW_CONTEXT = new TransmittableThreadLocal<>();
//使用spring mvc 提供的HandlerInterceptor 接口的拦截能力做请求前放入threadLocal中一些信息 然后在 afterCompletion 时机清理掉
//如下例子
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
String processInstanceIdStr = request.getHeader(METADATA_FLOW_PROCESS_INSTANCE_ID);
String processIdStr = request.getHeader(METADATA_FLOW_FIRST_PROCESS_ID);
try {
Long processInstanceId = null;
Long processId = null;
if (StringUtils.isNotBlank(processInstanceIdStr)) {
processInstanceId = Long.valueOf(processInstanceIdStr);
}
if (StringUtils.isNotBlank(processIdStr)) {
processId = Long.valueOf(processIdStr);
}
doFill(processInstanceId, processId);
} catch (Exception e) {
FLOW_CONTEXT.remove();
}
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
super.afterCompletion(request, response, handler, ex);
FLOW_CONTEXT.remove();
}
如上代码非常简单的 通过header透传一些信息。在servlet线程周期管理 theadLocal信息。但是我们在请求进来后有一些异步操作也想要获取threadLocal信息如下
@Async
public void test(Long a, TransmittableThreadLocal<FlowContext> local) throws InterruptedException {
Long processId = MetadataFlowContextHandler.getFirstProcessId();
if (MetadataFlowContextHandler.getLocal() == local) {
System.out.println("会进入到这里");
}
if (!Objects.equals(a, processId)) {
System.out.println("有问题,除了第一次都不相等");
}
}
在我们本地进行一次测试会发现ThreadLocal信息如预想般获取到了正确的值,但是如果你仔细测试,并发情况,或者你测试几下,然后等一会再测试就会出现错误的情况,那么下面列出了错误的情况和简略原因,然后分析一下源码
影响因素
- 内存足够
- @Aync 线程池core线程数量都已经创建
- @Aync 线程池任务队列没有排满
会出现theadLocal错误的情况
- 满足上述条件1,2,3新的请求进入就会错误
- 1不满足,满足2,3则可能会在子线程(也就是@Async)获取到null(未验证凭借猜想)
原理就是因为主线程在第一次传递theadLocal对象的引用给子线程后放到当前线程的threadLocalMap中,后续子线程由于线程复用会在get时先通过当前线程对象去theadLocalMap中获取缓存的值,如果获取到直接返回,那么大部分时候会一直返回第一次主线程传递过来的引用。而主线程remove是不会传递的。
为什么要满足上述3个影响因素,如果1不满足,jvm的gc会将theadLocalMap对象清理,因为他是一个弱引用 WeakReference,而TTL主线程传递给子线程时也是存入主线程的theadLocal对象weakHashMap返回,如果内存不足会清理掉后在子线程会调用ThreadLocal#setInitialValue方法委托到子类TransmittableThreadLocal#initialValue其实也是返回一个空的map就会获取失败(未验证),那么2,3也是这个道理,如果服务刚启动线程池可能会new新的thead那么主线程也一定传递正确的
@Async的执行代码
我们直接看ReflectiveMethodInvocation类进入拦截逻辑
@Override
@Nullable
public Object proceed() throws Throwable {
// We start with an index of -1 and increment early.
if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
return invokeJoinpoint();
}
// 这里会获取到一个AnnotationAsyncExecutionInterceptor,它不属于动态代理,在下面不会执行其它所有动态代理了,至于这个@Async的排序是不是最前面的index,如果是后面的index其实前面的动态代理也是可以执行的,这里不详细研究了
Object interceptorOrInterceptionAdvice =
this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
// Evaluate dynamic method matcher here: static part will already have
// been evaluated and found to match.
InterceptorAndDynamicMethodMatcher dm =
(InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
// 这里算是将动态代理的方法作为一个适配器去匹配
if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
// 如果匹配成功则会执行动态代理,而后又会执行到当前方法体内
return dm.interceptor.invoke(this);
}
else {
// Dynamic matching failed.
// Skip this interceptor and invoke the next in the chain.
return proceed();
}
}
else {
// It's an interceptor, so we just invoke it: The pointcut will have
// been evaluated statically before this object was constructed.
// 如果不是动态代理这里直接执行目标方法拦截器,那么不会重复进入当前方法体内了,其它的动态代理会失效
return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
}
}
下面是AnnotationAsyncExecutionInterceptor#invoke方法
@Override
@Nullable
public Object invoke(final MethodInvocation invocation) throws Throwable {
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
// 这里必须是 异步的
if (executor == null) {
throw new IllegalStateException(
"No executor specified and no default executor set on AsyncExecutionInterceptor either");
}
Callable<Object> task = () -> {
try {
Object result = invocation.proceed();
if (result instanceof Future) {
return ((Future<?>) result).get();
}
}
catch (ExecutionException ex) {
handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
}
catch (Throwable ex) {
handleError(ex, userDeclaredMethod, invocation.getArguments());
}
return null;
};
// 提交到线程池执行
return doSubmit(task, executor, invocation.getMethod().getReturnType());
}
下面是 doSubmit方法
@Nullable
protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
// 这里根据返回值进行了封装,如果是CompletableFuture 则将这个callable 封装为CompletableFuture 返回给客户端自由操作
if (CompletableFuture.class.isAssignableFrom(returnType)) {
return CompletableFuture.supplyAsync(() -> {
try {
return task.call();
}
catch (Throwable ex) {
throw new CompletionException(ex);
}
}, executor);
}
// 这是 spring提供的一个可以添加监听的Future,也就是将返回值设置为ListenableFuture的子类便可以添加一些监听例如异步方法成功后,或者抛出异常的后进行一些信息收集和逻辑判断日志打印之类
// 例如 StompBrokerRelayMessageHandler#forward 方法,这是spring-messaging中的stomp协议的一个future监听实现
else if (ListenableFuture.class.isAssignableFrom(returnType)) {
return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
}
// 这是一个基础的 Future封装返回
else if (Future.class.isAssignableFrom(returnType)) {
return executor.submit(task);
}
else {
// 其它返回值或者没有返回值的
executor.submit(task);
return null;
}
}
再下面进入了线程池的逻辑,为什么要知道线程池的逻辑,因为影响了ttl传递threadLocal的逻辑,因为在子线程是new的情况下会将当前主线程的threadLocal的引用传递给异步的子线程,如果是复用时则什么也不会做!那为什么你在测试代码时已经复用的线程还是好用呢,因为子线程通过弱引用的threadLocalMap保存了第一次在new Thread时的主线程threadLocal信息,你换个信息的值再试试!
/*
*分三步进行:
*
- 1。如果运行的线程少于corePoolSize,则尝试
*以给定的命令作为第一个启动一个新线程
*任务。对addWorker的调用自动地检查runState和 - workerCount,这样可以防止添加错误警报
*当它不应该返回false线程。
* - 2。如果一个任务可以成功排队,那么我们仍然需要
*再次检查我们是否应该添加一个线程
*(因为已经存在的线程在上次检查后已经死亡)
*池在进入该方法后关闭。所以我们
*重新检查状态,必要时回滚排队
*停止,或启动一个新的线程,如果没有。
* - 3。如果不能对任务进行排队,则尝试添加一个新的
*线程。如果失败,我们就知道系统关闭或饱和了
*等拒绝该任务。
*/
上面复制于源码的注释,具体大家可以百度其它的文章来学习,或者直接看源码ThreadPoolExecutor#execute(Runnable command)的代码
ttl在子线程为new Thread时传递逻辑
直接看Thread#init代码
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
// 省略部分代码...
this.group = g;
this.daemon = parent.isDaemon();
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
// 这里判断主线程是否含有inheritableThreadLocals && 当前子线程是否可以传递线程私有变量
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
// 子线程创建 threadLocal 并传入父线程的 threadLocalMap
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
/* Stash the specified stack size in case the VM cares */
this.stackSize = stackSize;
/* Set thread ID */
tid = nextThreadID();
}
下面会调用ThreadLocal#ThreadLocalMap(ThreadLocalMap parentMap) -> ThreadLocal#childValue(T parentValue)
// 这是ThreadLocal 的代码,会直接报错不支持,只有InheritableThreadLocal及其子类支持,而TTL继承了InheritableThreadLocal类
T childValue(T parentValue) {
throw new UnsupportedOperationException();
}
// 而InheritableThreadLocal的实现如下直接返回主线程的值,虽然传递了但是客户端不容易拿到
protected T childValue(T parentValue) {
return parentValue;
}
// TransmittableThreadLocal 的实现是返回一个以TransmittableThreadLocal对象为key的weakHashMap,作为InheritableThreadLocal的增强保持了弱引用的语义及传入主线程值的引用,并且可以在子线程通过TTL的get时从这个weakHashMap直接获取到从主线程传递过来的引用
private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder = new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
return new WeakHashMap();
}
//这里看出TTL 不单单是将主线程的 threadLocal的值引用传递,并且将主线程的TransmittableThreadLocal 对象作为key传入到子线程的ThreadLocalMap中
protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
return new WeakHashMap(parentValue);
}
};
下面时TTL 的get方法
@Override
public final T get() {
// 调用父类 ThreadLocal的get
T value = super.get();
// 这里很重要,
if (null != value) addValue();
return value;
}
// threadLocal 的get方法
public T get() {
Thread t = Thread.currentThread();
// 获取当前线程的 ThreadLocalMap
ThreadLocalMap map = getMap(t);
if (map != null) {
// 这里的this其实是当前TransmittableThreadLocal 对象
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 如果没有获取到值会初始化一下新的theadLocal中的对象,先会调用子类中的initialValue然后如果ThreadLocalMap没有被回收直接返回init的值并返回,如果被回收了会create新的map,实际TTL也只会new一个空的map返回
return setInitialValue();
}
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
// 通过threadLocal对象的hashCode从ThreadLocalMap获取到缓存的对象
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
我们尽量不讨论 TTL之外的代码,上述代码是一个标准的从ThreadLocal中get对象的流程,但是TTL的get有一个addValue的操作
// 这里的逻辑是在TTL 传递childValue时的 map重新灌入的逻辑,目前还不知道为什么这样做,后续文章会仔细探讨
private void addValue() {
if (!holder.get().containsKey(this)) {
holder.get().put(this, null); // WeakHashMap supports null value.
}
}
如上述这些代码,虽然使用TTL在new Thread时将主线程的引用灌入了子线程中,并处理业务对象本身还放入了一个weakHashMap以threadLocal对象为key,但是在get时候并没有什么不同啊,我们通过测试发现了问题后debug这里也看不出什么猫腻,但是发现就是子线程一直获取第一次传递过来的对象引用,实际实现的逻辑也没有用到TTL重写的childValue方法中构造的map,而是直接使用了InheritableThreadLocal实现的业务对象的直接引用。然后看TTL的代码中内部类Transmitter有大量的复制,重放,还原的逻辑如下
@Nonnull
public static Object replay(@Nonnull Object captured) {
// 这个方法一看就是在复制替换数据,实际就是在各种线程池的工作线程执行前的重放(替换threadLocal变量)具体逻辑后续文章探讨,那么看来这个动作在debug中一直没有执行,所以没有产生TTL线程私有变量的正确传递,我们看看是谁在调用它
@SuppressWarnings("unchecked")
Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();
for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
iterator.hasNext(); ) {
Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
TransmittableThreadLocal<?> threadLocal = next.getKey();
// backup
backup.put(threadLocal, threadLocal.get());
// clear the TTL values that is not in captured
// avoid the extra TTL values after replay when run task
if (!capturedMap.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// set values to captured TTL
setTtlValuesTo(capturedMap);
// call beforeExecute callback
// 执行时机 为 目标方法执行前
doExecuteCallback(true);
return backup;
}
看看是谁在调用这个replay方法
我们进入一个Runnable的地方TtlRunnable类,看的出来是一个装饰器做了增强,然后对目标Runnable执行前执行后对threadLocal进行了重放,还原工作 如下 TtlRunnable
@Override
public void run() {
Object captured = capturedRef.get();
if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
throw new IllegalStateException("TTL value reference is released after run!");
}
Object backup = replay(captured);
try {
runnable.run();
} finally {
restore(backup);
}
}
那么继续,TtlRunnable又是谁搞的呢
原来是ExecutorServiceTtlWrapper 类,另一个先忽略一看就是和定时相关的。那么这是一个ExecutorService的装饰器,也是做了增强,目的是可以使用TtlRunnable这个增强再往下看
TtlExecutors 这个类有一堆静态方法,都是返回传入目标对象返回其装饰器的方法,那就是我们在构造ExecutorService线程池时可以直接使用这个类的返回装饰器应该就可以了
@Bean(name = "taskExecutor")
public ExecutorService threadPoolTaskExecutor() {
// 返回装饰器
return TtlExecutors.getTtlExecutorService(
new ThreadPoolExecutor(50, 300, 300, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new ThreadPoolExecutor.CallerRunsPolicy()));
}
经过测试没有问题,嗐,原来是自己不知道TTL还需要结合线程池的装饰器来实现threadLocal的正确传递!菜是原罪啊T.T
网上很多关于TTL的实现原理的讲解,我们后续也会通过这次经验来详细了解一下TTL的实现机制和设计思想