测试代码
/**
* 自定义 - 线程池
*/
private static final ThreadPoolExecutor executorService = new ThreadPoolExecutor(50, 200,
180L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(3000), new ThreadFactory() {
final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
Thread thread = defaultFactory.newThread(r);
thread.setName("testRequest - " + thread.getName());
return thread;
}
}, new ThreadPoolExecutor.CallerRunsPolicy());
private String getToken() {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (requestAttributes == null) {
return null;
}
HttpServletRequest currentRequest = requestAttributes.getRequest();
return currentRequest.getHeader("token");
}
@SneakyThrows
public void test() {
String token = getToken();
log.info("测试1:主线程 mainThreadToken = {}", token);
log.info("----------------------------------------");
// 子线程1 Future
Future<String> future1 = CompletableFuture.supplyAsync(() -> {
// 子线程中获取token
String childThreadToken = getToken();
log.info("测试2:子线程 childThreadToken = {}", childThreadToken);
return childThreadToken;
}, executorService);
future1.get();
log.info("----------------------------------------");
// 子线程2
Arrays.asList(1, 2, 3, 4).parallelStream().forEach(t -> {
// 子线程中获取token
String childThreadToken = getToken();
log.info("测试3:子线程 childThreadToken = {}", childThreadToken);
});
log.info("----------------------------------------");
// 子线程 嵌套 子线程
Future<String> future2 = CompletableFuture.supplyAsync(() -> {
Arrays.asList(1, 2, 3, 4).parallelStream().forEach(t -> {
// 子线程中获取token
String childThreadToken = getToken();
log.info("测试4:子线程 childThreadToken = {}", childThreadToken);
});
return null;
}, executorService);
future2.get();
log.info("----------------------------------------");
// 子线程 嵌套 子线程 传递 header 方式1
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
Future<String> future3 = CompletableFuture.supplyAsync(() -> {
try {
RequestContextHolder.setRequestAttributes(attributes);
Arrays.asList(1, 2, 3, 4).parallelStream().forEach(t -> {
RequestContextHolder.setRequestAttributes(attributes);
// 子线程中获取token
String childThreadToken = getToken();
log.info("方式1:子线程 childThreadToken = {}", childThreadToken);
});
return null;
} finally {
RequestContextHolder.resetRequestAttributes();
}
}, executorService);
future3.get();
log.info("----------------------------------------");
// 子线程 嵌套 子线程 传递 header 方式2
Future<String> future4 = CompletableFuture.supplyAsync(() -> {
try {
Arrays.asList(1, 2, 3, 4).parallelStream().forEach(t -> {
// 子线程中获取token
String childThreadToken = TokenInheritableThreadLocal.getToken();
log.info("方式2:子线程 childThreadToken = {}", childThreadToken);
});
return null;
} finally {
RequestContextHolder.resetRequestAttributes();
}
}, executorService);
future4.get();
log.info("----------------------------------------");
}
测试结果
--------------------------------------------------------------------------------
测试1:主线程 http-nio-8083-exec-3,mainThreadToken = 123456
--------------------------------------------------------------------------------
测试2:子线程 testRequest - pool-1-thread-1,childThreadToken = null
--------------------------------------------------------------------------------
测试3:子线程 http-nio-8083-exec-3,childThreadToken = 123456
测试3:子线程 ForkJoinPool.commonPool-worker-2,childThreadToken = null
测试3:子线程 ForkJoinPool.commonPool-worker-9,childThreadToken = null
测试3:子线程 ForkJoinPool.commonPool-worker-2,childThreadToken = null
--------------------------------------------------------------------------------
测试4:子线程 testRequest - pool-1-thread-2,childThreadToken = null
测试4:子线程 ForkJoinPool.commonPool-worker-4,childThreadToken = null
测试4:子线程 ForkJoinPool.commonPool-worker-11,childThreadToken = null
测试4:子线程 ForkJoinPool.commonPool-worker-2,childThreadToken = null
--------------------------------------------------------------------------------
方式1:子线程 ForkJoinPool.commonPool-worker-2,childThreadToken = 123456
方式1:子线程 ForkJoinPool.commonPool-worker-4,childThreadToken = 123456
方式1:子线程 ForkJoinPool.commonPool-worker-11,childThreadToken = 123456
方式1:子线程 testRequest - pool-1-thread-3,childThreadToken = 123456
--------------------------------------------------------------------------------
方式2:子线程 testRequest - pool-1-thread-4,childThreadToken = 123456
方式2:子线程 ForkJoinPool.commonPool-worker-2,childThreadToken = 123456
方式2:子线程 ForkJoinPool.commonPool-worker-11,childThreadToken = 123456
方式2:子线程 ForkJoinPool.commonPool-worker-4,childThreadToken = 123456
--------------------------------------------------------------------------------
分析
- parallelStream() 也会导致请求头丢失,没丢失的是其中一个并行的主线程 http-nio-8083-exec-3
- 通过RequestContextHolder.setRequestAttributes(attributes); 可以透传header,但是麻烦
- InheritableThreadLocal 可以绑定父线程的变量,子线程也可以获取,使用方便
最佳解决方案
- TokenInheritableThreadLocal
public class TokenInheritableThreadLocal {
private TokenInheritableThreadLocal() {
}
public static final InheritableThreadLocal<String> THREAD_LOCAL = new InheritableThreadLocal() {
@Override
protected String initialValue() {
return null;
}
};
public static String getToken() {
return THREAD_LOCAL.get();
}
public static void remove() {
THREAD_LOCAL.remove();
}
}
public class TokenInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
TokenInheritableThreadLocal.THREAD_LOCAL.set(request.getHeader("token"));
return HandlerInterceptor.super.preHandle(request, response, handler);
}
/**
* 本次调用结束了,就可以释放threadLocal资源避免内存泄漏的情况发生
*/
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
TokenInheritableThreadLocal.THREAD_LOCAL.remove();
HandlerInterceptor.super.afterCompletion(request, response, handler, ex);
}
}
@Configuration
public class CommonWebAppConfigurer implements WebMvcConfigurer {
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(new TokenInterceptor());
}
}