title: 怎样才能充分压榨线程
date: 2022/07/18 13:17
一、线程的五种状态
下图为操作系统层面上线程的五种状态:
- 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联
- 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
- 【运行状态】指获取了 CPU 时间片运行中的状态
- 当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
- 【阻塞状态】
- 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入 【阻塞状态】
- 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
- 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑 调度它们
- 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态
二、阻塞状态
当线程进入阻塞状态时,线程还是会占用着系统的资源(内存之类的),但是却无法继续接收任务。
想一下这样的场景,一个只有 10 个线程的线程池,里面的线程全部都在执行一个阻塞操作,而这个阻塞操作需要一天才能处理完,这样将会有一天无法接收新的任务,相当于系统卡死了一天。
而更好的做法是,你在那边处理着,等到了预期的时间(1 天)我再管你要结果,这样线程就可以解放出来去干别的了。
常见的阻塞 API:
阻塞 API | 解决方案 |
---|---|
BIO 操作 | IO 多路复用 |
Thread.sleep(xxx) | ScheduledExecutorService |
Object.wait() | CompletableFuture 可以解决部分场景 |
注:其实并不是所有阻塞都需要优化(比如 BQ),我们需要优化的是不良阻塞(例如 sleep)
三、案例
需求:检测状态,当状态就绪的时候获取结果并返回
一般我们都会按照下面这种方式来写,但是这种写法会一直占用着线程,降低线程的利用率,当线程池中的线程被耗尽,那么就无法再接收新的请求。
public static void main(String[] args) {
log.info("结果为:「{}」", func());
}
@SneakyThrows
public static void func() {
while (true) {
// check status
boolean status = checkStatus();
if (status) {
log.info("获取结果~");
return "啦啦啦";
}
// 忙等待,此时该线程不能做别的
log.warn("忙等待 10s");
TimeUnit.SECONDS.sleep(10L);
}
}
public static boolean checkStatus() {
return new Random().nextBoolean();
}
13:27:23.566 [main] WARN com.dist.xdata.dqPlow.controller.DemoController - 忙等待 10s
13:27:33.572 [main] WARN com.dist.xdata.dqPlow.controller.DemoController - 忙等待 10s
13:27:43.575 [main] INFO com.dist.xdata.dqPlow.controller.DemoController - 获取结果,入库~
13:27:43.576 [main] INFO com.dist.xdata.dqPlow.controller.DemoController - 结果为:「success」
3.1 通过 ScheduledExecutorService & CompletableFuture 优化
- 由于有后续的操作,所以通过 CompletableFuture 作为一个 promise,然后注册一些回调
- 通过 ScheduledExecutorService 创建定时任务,当状态达标的时候触发 CompletableFuture
public static void main(String[] args) {
func2().thenAccept(x -> log.info("结果2为:「{}」", x));
}
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
public static CompletableFuture<String> func2() {
CompletableFuture<Void> future = new CompletableFuture<>();
// 提交任务
submitTask(future);
// 设置获取到结果后的操作;使用 Async 是为了切换线程,否则会占用定时任务线程
return future.thenApplyAsync(x -> {
log.info("获取结果2~");
return "啦啦啦2";
});
}
private static void submitTask(CompletableFuture<Void> future) {
executorService.schedule(() -> {
// check status
boolean status = checkStatus();
if (status) {
log.info("状态合适了~");
future.complete(null);
} else {
log.warn("等待 10s 后再次执行");
submitTask(future);
}
}, 1L, TimeUnit.SECONDS);
}
public static boolean checkStatus() {
return new Random().nextBoolean();
}
// schedule thread
15:26:11.684 [pool-1-thread-1] WARN com.dist.xdata.dqPlow.controller.DemoController - 等待 10s 后再次执行
15:26:12.689 [pool-1-thread-1] WARN com.dist.xdata.dqPlow.controller.DemoController - 等待 10s 后再次执行
15:26:13.693 [pool-1-thread-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 状态合适了~
// forkjoinpool thread
15:26:13.694 [ForkJoinPool.commonPool-worker-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 获取结果2~
15:26:13.695 [ForkJoinPool.commonPool-worker-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 结果2为:「啦啦啦2」
3.2 通过 SpringMVC 异步处理优化
通过上面方式写的代码我们返回的是 CompletableFuture 对象,那么当我们写 mvc 的 Controller 的时候是不是需要这样写:
@RequestMapping("/v1/res")
public String getRes() {
return func2().get();
}
很明显这样太蠢了,这样写实际上还是阻塞了调用线程(此处是 Tomcat 的线程 nio-8080-exec-1),所以有没有什么办法可以释放这个线程,让他继续接收别的请求呢?
@RequestMapping("/v1/res")
public void getRes(HttpServletResponse response) {
func2().thenAccept(x -> response.getWriter().print(x));
}
// 注:这种方案是错误的,只作为引导思路
// 因为当代码执行完毕时,Tomcat 就会将连接关闭根本就获取不到 Response
// 在 Tomcat 中如果想要实现异步响应的效果,需要通过 AsyncContext 来实现,下面的 mvc 的方案就是基于 AsyncContext。
// 参见:https://mp.weixin.qq.com/s/eu6cpP7rpjZvf4-nl3K7uw
然而我们平时返回的数据并不是简单的字符串,我们还需要用到 SpringMVC 提供的一系列功能(异常捕获、返回值封装、序列化等)。故而 SpringMVC 帮我们适配了异步处理(基于 Servlet 3.0 异步处理),使用起来很简单,直接返回 CompletableFuture 对象即可:
@RequestMapping("/v1/res")
public CompletableFuture<String> getRes(HttpServletResponse response) {
return func2();
}
四、SpringMVC 异步处理原理
4.1 Servlet3.0 异步处理
Servlet 3.0 开始提供了 AsyncContext 用来支持异步处理请求,他的出现是为了解决慢请求导致线程池中线程全部被占满的问题。
Web 容器处理请求的方式一般是为每个 request 分配一个 thread。而 Thread 的创建是有代价的,而且线程的数量是有上限的。
在高负载情况下,线程池中的线程都被占着了,那么后续的 request 就只能等待。
传统的解决方案只能通过调大线程池的大小,但是遇到高负载的情况下慢请求还是会将所有的线程全部占用。
造成这个问题的原因是:
- 接收请求的线程和工作线程是同一个
- 代码的执行耗时较高,高负载情况下会将线程池中的所有线程占用
因此我们很容易想到,我们可以将执行较久的方法交给另一个线程来执行,
不建议将所有方法设置为异步,这样瓶颈就变成了工作线程池的大小,并没有提升任何优化,而且会有一次线程切换的消耗,从而造成负优化。
我们只应该将执行很慢的方法设置为异步。
4.1.1 官方示例
@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
/* ... Same variables and init method as in SyncServlet ... */
@Override
public void doGet(HttpServletRequest request,
HttpServletResponse response) {
response.setContentType("text/html;charset=UTF-8");
final AsyncContext acontext = request.startAsync();
acontext.start(new Runnable() {
public void run() {
String param = acontext.getRequest().getParameter("param");
String result = resource.process(param);
HttpServletResponse response = acontext.getResponse();
/* ... print to the response ... */
acontext.complete();
}
});
}
}
在这个官方例子里,每个HTTP thread都会开启另一个Worker thread来处理请求,然后把HTTP thread就归还给Web容器。但是看AsyncContext.start()
方法的javadoc:
Causes the container to dispatch a thread, possibly from a managed thread pool, to run the specified Runnable.
使容器调度线程(可能来自托管线程池)以运行指定的 Runnable。
实际上这里并没有规定Worker thread到底从哪里来,也许是HTTP thread pool之外的另一个thread pool?还是说就是HTTP thread pool?
The Limited Usefulness of AsyncContext.start()文章里写道:不同的Web容器对此有不同的实现,不过Tomcat实际上是利用HTTP thread pool来处理AsyncContext.start()
的。
这也就是说,我们原本是想释放HTTP thread的,但实际上并没有,因为有HTTP thread依然被用作 Worker thread,只不过这个thread和接收请求的HTTP thread不是同一个而已。
4.1.2 手动切换线程
@WebServlet(value = "/async-servlet-2", asyncSupported = true)
public class AsyncServlet2 extends HttpServlet {
private final ExecutorService executorService = Executors.newFixedThreadPool(400);
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
AsyncContext asyncContext = req.startAsync();
executorService.submit(
() -> {
// do something slow
TimeUnit.DAYS.sleep(1L);
// 写入数据
asyncContext.getResponse().getWriter().println("result");
// 通知请求已结束
asyncContext.complete();
}
);
}
}
注:通过
asyncContext.complete()
这是一种方案,SpringMVC 采用的是asyncContext.dispatch()
实现,可以参考:https://blog.csdn.net/weixin_44742132/article/details/117137408
4.1.3 提高性能 no! 提升吞吐量
AsyncContext 的目的并不是为了提高性能,也并不直接提供性能提升,它提供了把HTTP thread和Worker thread 解藕的机制,从而提高Web容器的响应能力(吞吐量)。
注:就像 reactor 一样,他们提升的都是吞吐量,而 Servlet3.0 异步处理是为了让接收 http 请求的线程不被慢方法阻塞住,从而接收更多的请求。
❌错误场景
web 容器接收请求的线程池线程数量为 200
worker 线程池线程数量为 300
所有的请求都发送到 worker 线程池
这样确实能提升吞吐量,因为干活的人多了,但是转念一想为何不直接将接收请求的线程池线程数量调大呢?
✅正确场景
web 容器接收请求的线程池线程数量为 200
worker 线程池线程数量任意
只将执行很慢方法发送到 worker 线程池
这样一来,无论有多少请求到慢速操作,它都不会将HTTP thread 占满导致其他请求无法处理。
4.2 SpringMVC 异步处理
大类 | 支持的返回值 | 描述 | 业务代码执行线程 | 描述 |
---|---|---|---|---|
Callable | Callable | task1 | spring 线程池 | |
Callable | WebAsyncTask | task1 | spring 线程池 | |
Callable | StreamingResponseBodyReturnValueHandler | 支持异步往输出流写数据(不占用 http 线程) | task1 | spring 线程池 |
DeferredResult | DeferredResult | nio-8080-exec-2 | 另一个 Tomcat 线程 | |
DeferredResult | ListenableFuture | 自定义线程 | ||
DeferredResult | CompletionStage | fork-join-pool-1 | CompletionFuture内部的默认线程池 | |
DeferredResult | ResponseBodyEmitter | 自定义线程 | ||
DeferredResult | SseEmitter | 自定义线程 |
测试代码在文末
由上表可知,其实 SpringMVC 支持处理的异步返回值主要是两类:Callable 和 DeferredResult。
4.2.1 Callable 类
先简单介绍下流程:
- ==== Tomcat 线程 1 ====
-
CallableMethodReturnValueHandler
拿到了我们返回的 Callable 对象,调用WebAsyncManager#startCallableProcessing
方法 - 调用
request.startAsync()
方法开启 Tomcat 的异步处理 - 然后将 Callable 提交到线程池(Spring 内部定义的)中执行
- ==== 此时 Tomcat 线程的工作完成被回收,接下来都是 “task1” 线程的工作 ====
- 执行
Callable#call()
方法,等待运行结束返回结果 - 将结果存入当前请求对应的 WebAsyncManager 中
- Spring MVC 会调用 asyncContext.dispatch()将请求重新交给Servlet容器完成
- ==== 此时 “task1” 线程的工作已完成,切换到 Tomcat 线程 2 ====
- DispatcherServlet 会再次接收到该请求
- 当调用到
RequestMappingHandlerAdapter#invokeHandlerMethod
方法时,会将先前计算的结果从 WebAsyncManager 中取出并返回。
我们看到在整个过程中有三个线程切换,接下来我们以返回 Callable 为例来分析一下源码。
1)返回值处理器 CallableMethodReturnValueHandler
public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter returnType) {
return Callable.class.isAssignableFrom(returnType.getParameterType());
}
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
Callable<?> callable = (Callable<?>) returnValue;
// 2. 从当前请求中获取对应的 WebAsyncManager;调用 startCallableProcessing 方法开启异步处理
WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
}
}
public static WebAsyncManager getAsyncManager(WebRequest webRequest) {
int scope = RequestAttributes.SCOPE_REQUEST;
WebAsyncManager asyncManager = null;
// 尝试从 request 域中获取 WebAsyncManager,一般都是能获取到的,因为在处理请求的时候 RequestMappingHandlerAdapter#invokeHandlerMethod 方法已经向 request 域中设置了当前请求对应的 WebAsyncManager
Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope);
if (asyncManagerAttr instanceof WebAsyncManager) {
asyncManager = (WebAsyncManager) asyncManagerAttr;
}
// 如果 request 域中不存在,则自己创建一个塞进去
if (asyncManager == null) {
asyncManager = new WebAsyncManager();
webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope);
}
return asyncManager;
}
2)WebAsyncManager#startCallableProcessing()
public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
throws Exception {
// 上面的代码省略... 主要就是添加一些拦截器和一些回调
// 3. 调用了 AsyncContext#startAsync() 方法开启了异步处理
startAsyncProcessing(processingContext);
try {
// 4. 这个线程池也是 RequestMappingHandlerAdapter#invokeHandlerMethod 方法中 set 的
Future<?> future = this.taskExecutor.submit(() -> {
Object result = null;
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
// 6. 使用 spring 内部的线程池执行提交的 Callable
result = callable.call();
}
catch (Throwable ex) {
result = ex;
}
finally {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
}
// 这个方法往下看
setConcurrentResultAndDispatch(result);
});
interceptorChain.setTaskFuture(future);
}
catch (RejectedExecutionException ex) {
Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
setConcurrentResultAndDispatch(result);
throw ex;
}
}
private void setConcurrentResultAndDispatch(Object result) {
synchronized (WebAsyncManager.this) {
// 7. 设置计算的结果
if (this.concurrentResult != RESULT_NONE) {
return;
}
this.concurrentResult = result;
}
if (this.asyncWebRequest.isAsyncComplete()) {
if (logger.isDebugEnabled()) {
logger.debug("Async result set but request already complete: " + formatRequestUri());
}
return;
}
// 8. 调用 asyncContext#dispatch() 方法将请求重新交给Servlet容器完成
this.asyncWebRequest.dispatch();
}
3)DispatcherServlet 再次接收到该请求
4)调用到 RequestMappingHandlerAdapter#invokeHandlerMethod
方法处理请求
@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {
ServletWebRequest webRequest = new ServletWebRequest(request, response);
try {
WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);
ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
if (this.argumentResolvers != null) {
invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
}
if (this.returnValueHandlers != null) {
invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
}
invocableMethod.setDataBinderFactory(binderFactory);
invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
modelFactory.initModel(webRequest, mavContainer, invocableMethod);
mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);
AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
asyncWebRequest.setTimeout(this.asyncRequestTimeout);
// 从 request 域中取出 WebAsyncManager
WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
asyncManager.setTaskExecutor(this.taskExecutor);
asyncManager.setAsyncWebRequest(asyncWebRequest);
asyncManager.registerCallableInterceptors(this.callableInterceptors);
asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);
// 11. 判断 WebAsyncManager 中是否已经存在结果了,如果存在表示已经调用过 asyncContext#dispatch() 方法
if (asyncManager.hasConcurrentResult()) {
Object result = asyncManager.getConcurrentResult();
mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
asyncManager.clearConcurrentResult();
LogFormatUtils.traceDebug(logger, traceOn -> {
String formatted = LogFormatUtils.formatValue(result, !traceOn);
return "Resume with async result [" + formatted + "]";
});
// 此时的 invocableMethod 的返回值类型还标注的是 Callable 类型,需要将其调整成返回 result 的方法(() -> result),以复用之前的逻辑
invocableMethod = invocableMethod.wrapConcurrentResult(result);
}
// 执行方法
invocableMethod.invokeAndHandle(webRequest, mavContainer);
if (asyncManager.isConcurrentHandlingStarted()) {
return null;
}
return getModelAndView(mavContainer, modelFactory, webRequest);
}
finally {
webRequest.requestCompleted();
}
}
4.2.2 DeferredResult 类
大体上和 Callable 类是相同的,区别在于:
- ListenableFuture、CompletionStage、ResponseBodyEmitter、SseEmitter 是用自定义的线程池执行的 Callable(可以参见附录中的测试代码)
- DeferredResult 是通过另一个 Tomcat 线程执行的
1)返回值处理器 CallableMethodReturnValueHandler
public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {
@Override
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> type = returnType.getParameterType();
return (DeferredResult.class.isAssignableFrom(type) ||
ListenableFuture.class.isAssignableFrom(type) ||
CompletionStage.class.isAssignableFrom(type));
}
@Override
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
DeferredResult<?> result;
if (returnValue instanceof DeferredResult) {
result = (DeferredResult<?>) returnValue;
}
else if (returnValue instanceof ListenableFuture) {
// 将 ListenableFuture 适配成 DeferredResult,我们看下适配的代码
result = adaptListenableFuture((ListenableFuture<?>) returnValue);
}
else if (returnValue instanceof CompletionStage) {
// 这个与 ListenableFuture 类似,不在重复分析
result = adaptCompletionStage((CompletionStage<?>) returnValue);
}
else {
// Should not happen...
throw new IllegalStateException("Unexpected return value type: " + returnValue);
}
// 从当前请求中获取对应的 WebAsyncManager;调用 startDeferredResultProcessing 方法开启异步处理
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
}
private DeferredResult<Object> adaptListenableFuture(ListenableFuture<?> future) {
// 注意。这个地方使用的是 DeferredResult 的空参构造,我们进去看一下
DeferredResult<Object> result = new DeferredResult<>();
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onSuccess(@Nullable Object value) {
// 当成功返回结果时设置值
result.setResult(value);
}
@Override
public void onFailure(Throwable ex) {
result.setErrorResult(ex);
}
});
return result;
}
}
public class DeferredResult<T> {
private static final Object RESULT_NONE = new Object();
@Nullable
private final Long timeoutValue;
private final Supplier<?> timeoutResult;
public DeferredResult() {
// 记住这个地方将 timeoutResult 设置成返回 RESULT_NONE,后面要考
this(null, () -> RESULT_NONE);
}
public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
this.timeoutValue = timeoutValue;
this.timeoutResult = timeoutResult;
}
2)WebAsyncManager#startDeferredResultProcessing()
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
// 省略部分代码。。。
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
// 调用了 AsyncContext#startAsync() 方法开启了异步处理
startAsyncProcessing(processingContext);
try {
interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
// 重点看这里,这个地方是注册了一个返回值处理器,在返回值处理器中调用的 asyncContext#dispatch() 方法
// 因为像 ListenableFuture 这样的实际上已经在线程池中执行了,而且支持注册回调,直接在回调里执行就好了
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
// 这个同上
setConcurrentResultAndDispatch(result);
});
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
}
后续的操作就和 Callable 类是一样的了。。。
3)问题:当返回 DeferredResult 时,其并没有提交到线程池中呀!那怎么办呢?
此处其实我也不太懂,可能是利用了 Tomcat 的超时机制,他会调用到我们在这里注册的超时处理器
public void startDeferredResultProcessing(
final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {
// 省略部分代码。。。
final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
this.asyncWebRequest.addTimeoutHandler(() -> {
try {
// 可能是利用了 socket 会触发一个 timeout 事件,从而调用到此处
interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
}
catch (Throwable ex) {
setConcurrentResultAndDispatch(ex);
}
});
接着他会调用到 DeferredResult 中的一个匿名内部类的 handleTimeout() 方法:
public class DeferredResult<T> {
private static final Object RESULT_NONE = new Object();
@Nullable
private final Long timeoutValue;
private final Supplier<?> timeoutResult;
public DeferredResult() {
// 记住这个地方将 timeoutResult 设置成返回 RESULT_NONE,后面要考
this(null, () -> RESULT_NONE);
}
public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
this.timeoutValue = timeoutValue;
this.timeoutResult = timeoutResult;
}
final DeferredResultProcessingInterceptor getInterceptor() {
return new DeferredResultProcessingInterceptor() {
@Override
public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
boolean continueProcessing = true;
try {
if (timeoutCallback != null) {
timeoutCallback.run();
}
}
finally {
// 对于返回的是 DeferredResult,此处会调用传入的 Callable
// 而对于返回的是 ListenableFuture 此处调用会返回 RESULT_NONE
Object value = timeoutResult.get();
if (value != RESULT_NONE) {
continueProcessing = false;
try {
/*
此处调用的是此部分代码,asyncContext#dispatch()
deferredResult.setResultHandler(result -> {
result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
setConcurrentResultAndDispatch(result);
});
*/
// 注意:由于当前调用 asyncContext#dispatch() 方法已经是 Tomcat 线程了,所以他并不会进行线程切换,也就是说:当返回的是 DeferredResult 对象时,他只有两次线程切换 tomcat thread1 -> tomcat thread2
setResultInternal(value);
}
catch (Throwable ex) {
logger.debug("Failed to handle timeout result", ex);
}
}
}
return continueProcessing;
}
}
参考文章
Servlet3.0新特性:异步处理,太好用了!!!(这篇虽然我没参考,但是我觉得他写的挺好)
附录
4.2 测试代码
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncTask;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;
@Slf4j
@RestController
@RequestMapping("/rest/async")
public class AsyncController {
// callable 族
@GetMapping("/v1/callable")
public Callable<String> get() {
log.info("controller....");
return () -> {
log.warn("service....");
return "xxx";
};
}
@GetMapping("/v1/WebAsyncTask")
public WebAsyncTask<String> getWebAsyncTask() {
log.info("controller....");
return new WebAsyncTask<>(() -> {
log.warn("service....");
return "xxx";
});
}
@GetMapping("/v1/StreamingResponseBody")
public StreamingResponseBody getStreamingResponseBody() {
log.info("controller....");
return outputStream -> {
log.warn("service....");
outputStream.write("xxx".getBytes(StandardCharsets.UTF_8));
outputStream.close();
};
}
// DeferredResult 族
@GetMapping("/v1/DeferredResult")
public DeferredResult<String> getDeferredResult() {
log.info("controller....");
return new DeferredResult<>(10L, () -> {
log.warn("service....");
return "xxx";
});
}
@GetMapping("/v1/ListenableFuture")
public ListenableFuture<String> getListenableFuture() {
log.info("controller....");
ListenableFutureTask<String> task = new ListenableFutureTask<>(() -> {
log.warn("service....");
return "xxx";
});
Executors.newCachedThreadPool().submit(task);
return task;
}
@GetMapping("/v1/CompletionStage")
public CompletionStage<String> getCompletionStage() {
log.info("controller....");
return CompletableFuture.supplyAsync(() -> {
log.warn("service....");
return "xxx";
});
}
@GetMapping("/v1/ResponseBodyEmitter")
public ResponseBodyEmitter getResponseBodyEmitter() {
log.info("controller....");
ResponseBodyEmitter emitter = new ResponseBodyEmitter();
Executors.newCachedThreadPool().submit(() -> {
try {
for (int i = 0; i < 5; i++) {
emitter.send("Count: " + (i + 1));
emitter.send("\n");
emitter.send("hello");
emitter.send("\n\n");
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
@GetMapping("/v1/SseEmitter")
public SseEmitter getSseEmitter() {
SseEmitter emitter = new SseEmitter();
Executors.newCachedThreadPool().submit(() -> {
try {
for (int i = 0; i < 5; i++) {
emitter.send("Count: " + (i + 1));
emitter.send("\n");
emitter.send("hello");
emitter.send("\n\n");
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
}
});
return emitter;
}
}