怎样才能充分压榨线程


title: 怎样才能充分压榨线程
date: 2022/07/18 13:17


一、线程的五种状态

下图为操作系统层面线程的五种状态

img
  • 【初始状态】仅是在语言层面创建了线程对象,还未与操作系统线程关联
  • 【可运行状态】(就绪状态)指该线程已经被创建(与操作系统线程关联),可以由 CPU 调度执行
  • 【运行状态】指获取了 CPU 时间片运行中的状态
    • 当 CPU 时间片用完,会从【运行状态】转换至【可运行状态】,会导致线程的上下文切换
  • 【阻塞状态】
    1. 如果调用了阻塞 API,如 BIO 读写文件,这时该线程实际不会用到 CPU,会导致线程上下文切换,进入 【阻塞状态】
    2. 等 BIO 操作完毕,会由操作系统唤醒阻塞的线程,转换至【可运行状态】
    3. 与【可运行状态】的区别是,对【阻塞状态】的线程来说只要它们一直不唤醒,调度器就一直不会考虑 调度它们
  • 【终止状态】表示线程已经执行完毕,生命周期已经结束,不会再转换为其它状态

二、阻塞状态

当线程进入阻塞状态时,线程还是会占用着系统的资源(内存之类的),但是却无法继续接收任务

想一下这样的场景,一个只有 10 个线程的线程池,里面的线程全部都在执行一个阻塞操作,而这个阻塞操作需要一天才能处理完,这样将会有一天无法接收新的任务,相当于系统卡死了一天

而更好的做法是,你在那边处理着,等到了预期的时间(1 天)我再管你要结果,这样线程就可以解放出来去干别的了。

常见的阻塞 API:

阻塞 API 解决方案
BIO 操作 IO 多路复用
Thread.sleep(xxx) ScheduledExecutorService
Object.wait() CompletableFuture 可以解决部分场景

注:其实并不是所有阻塞都需要优化(比如 BQ),我们需要优化的是不良阻塞(例如 sleep)

三、案例

需求:检测状态,当状态就绪的时候获取结果并返回

一般我们都会按照下面这种方式来写,但是这种写法会一直占用着线程,降低线程的利用率,当线程池中的线程被耗尽,那么就无法再接收新的请求。

public static void main(String[] args) {
  log.info("结果为:「{}」", func());
}

@SneakyThrows
public static void func() {
  while (true) {
    // check status
    boolean status = checkStatus();
    if (status) {
      log.info("获取结果~");
      return "啦啦啦";
    }

    // 忙等待,此时该线程不能做别的
    log.warn("忙等待 10s");
    TimeUnit.SECONDS.sleep(10L);
  }
}

public static boolean checkStatus() {
  return new Random().nextBoolean();
}

13:27:23.566 [main] WARN com.dist.xdata.dqPlow.controller.DemoController - 忙等待 10s
13:27:33.572 [main] WARN com.dist.xdata.dqPlow.controller.DemoController - 忙等待 10s
13:27:43.575 [main] INFO com.dist.xdata.dqPlow.controller.DemoController - 获取结果,入库~
13:27:43.576 [main] INFO com.dist.xdata.dqPlow.controller.DemoController - 结果为:「success」

3.1 通过 ScheduledExecutorService & CompletableFuture 优化

  1. 由于有后续的操作,所以通过 CompletableFuture 作为一个 promise,然后注册一些回调
  2. 通过 ScheduledExecutorService 创建定时任务,当状态达标的时候触发 CompletableFuture
public static void main(String[] args) {
  func2().thenAccept(x -> log.info("结果2为:「{}」", x));
}

private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);

public static CompletableFuture<String> func2() {
  CompletableFuture<Void> future = new CompletableFuture<>();
  // 提交任务
  submitTask(future);
  // 设置获取到结果后的操作;使用 Async 是为了切换线程,否则会占用定时任务线程
  return future.thenApplyAsync(x -> {
    log.info("获取结果2~");
    return "啦啦啦2";
  });
}

private static void submitTask(CompletableFuture<Void> future) {
  executorService.schedule(() -> {
    // check status
    boolean status = checkStatus();
    if (status) {
      log.info("状态合适了~");
      future.complete(null);
    } else {
      log.warn("等待 10s 后再次执行");
      submitTask(future);
    }
  }, 1L, TimeUnit.SECONDS);
}

public static boolean checkStatus() {
  return new Random().nextBoolean();
}

// schedule thread
15:26:11.684 [pool-1-thread-1] WARN com.dist.xdata.dqPlow.controller.DemoController - 等待 10s 后再次执行
15:26:12.689 [pool-1-thread-1] WARN com.dist.xdata.dqPlow.controller.DemoController - 等待 10s 后再次执行
15:26:13.693 [pool-1-thread-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 状态合适了~
// forkjoinpool thread
15:26:13.694 [ForkJoinPool.commonPool-worker-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 获取结果2~
15:26:13.695 [ForkJoinPool.commonPool-worker-1] INFO com.dist.xdata.dqPlow.controller.DemoController - 结果2为:「啦啦啦2」

3.2 通过 SpringMVC 异步处理优化

通过上面方式写的代码我们返回的是 CompletableFuture 对象,那么当我们写 mvc 的 Controller 的时候是不是需要这样写:

@RequestMapping("/v1/res")
public String getRes() {
    return func2().get();
}

很明显这样太蠢了,这样写实际上还是阻塞了调用线程(此处是 Tomcat 的线程 nio-8080-exec-1),所以有没有什么办法可以释放这个线程,让他继续接收别的请求呢?

@RequestMapping("/v1/res")
public void getRes(HttpServletResponse response) {
    func2().thenAccept(x -> response.getWriter().print(x));
}

// 注:这种方案是错误的,只作为引导思路
// 因为当代码执行完毕时,Tomcat 就会将连接关闭根本就获取不到 Response
// 在 Tomcat 中如果想要实现异步响应的效果,需要通过 AsyncContext 来实现,下面的 mvc 的方案就是基于 AsyncContext。
// 参见:https://mp.weixin.qq.com/s/eu6cpP7rpjZvf4-nl3K7uw

然而我们平时返回的数据并不是简单的字符串,我们还需要用到 SpringMVC 提供的一系列功能(异常捕获、返回值封装、序列化等)。故而 SpringMVC 帮我们适配了异步处理(基于 Servlet 3.0 异步处理),使用起来很简单,直接返回 CompletableFuture 对象即可:

@RequestMapping("/v1/res")
public CompletableFuture<String> getRes(HttpServletResponse response) {
    return func2();
}

四、SpringMVC 异步处理原理

4.1 Servlet3.0 异步处理

Servlet 3.0 开始提供了 AsyncContext 用来支持异步处理请求,他的出现是为了解决慢请求导致线程池中线程全部被占满的问题。

Web 容器处理请求的方式一般是为每个 request 分配一个 thread。而 Thread 的创建是有代价的,而且线程的数量是有上限的。

在高负载情况下,线程池中的线程都被占着了,那么后续的 request 就只能等待

传统的解决方案只能通过调大线程池的大小,但是遇到高负载的情况下慢请求还是会将所有的线程全部占用。

造成这个问题的原因是:

  1. 接收请求的线程和工作线程是同一个
  2. 代码的执行耗时较高,高负载情况下会将线程池中的所有线程占用

因此我们很容易想到,我们可以将执行较久的方法交给另一个线程来执行

不建议将所有方法设置为异步,这样瓶颈就变成了工作线程池的大小,并没有提升任何优化,而且会有一次线程切换的消耗,从而造成负优化。

我们只应该将执行很慢的方法设置为异步。

4.1.1 官方示例

@WebServlet(urlPatterns={"/asyncservlet"}, asyncSupported=true)
public class AsyncServlet extends HttpServlet {
   /* ... Same variables and init method as in SyncServlet ... */

   @Override
   public void doGet(HttpServletRequest request, 
                     HttpServletResponse response) {
      response.setContentType("text/html;charset=UTF-8");
      final AsyncContext acontext = request.startAsync();
      acontext.start(new Runnable() {
             public void run() {
                String param = acontext.getRequest().getParameter("param");
                String result = resource.process(param);
                HttpServletResponse response = acontext.getResponse();
                /* ... print to the response ... */
                acontext.complete();
             }
      });
   }
}

在这个官方例子里,每个HTTP thread都会开启另一个Worker thread来处理请求,然后把HTTP thread就归还给Web容器。但是看AsyncContext.start()方法的javadoc:

Causes the container to dispatch a thread, possibly from a managed thread pool, to run the specified Runnable.

使容器调度线程(可能来自托管线程池)以运行指定的 Runnable。

实际上这里并没有规定Worker thread到底从哪里来,也许是HTTP thread pool之外的另一个thread pool?还是说就是HTTP thread pool?

The Limited Usefulness of AsyncContext.start()文章里写道:不同的Web容器对此有不同的实现,不过Tomcat实际上是利用HTTP thread pool来处理AsyncContext.start()

这也就是说,我们原本是想释放HTTP thread的,但实际上并没有,因为有HTTP thread依然被用作 Worker thread,只不过这个thread和接收请求的HTTP thread不是同一个而已

4.1.2 手动切换线程

@WebServlet(value = "/async-servlet-2", asyncSupported = true)
public class AsyncServlet2 extends HttpServlet {

    private final ExecutorService executorService = Executors.newFixedThreadPool(400);

    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) {
        AsyncContext asyncContext = req.startAsync();
        executorService.submit(
                () -> {
                    // do something slow
                    TimeUnit.DAYS.sleep(1L);
                    // 写入数据
                    asyncContext.getResponse().getWriter().println("result");
                    // 通知请求已结束
                    asyncContext.complete();
                }
        );
    }
}

注:通过 asyncContext.complete()这是一种方案,SpringMVC 采用的是 asyncContext.dispatch()实现,可以参考:https://blog.csdn.net/weixin_44742132/article/details/117137408

4.1.3 提高性能 no! 提升吞吐量

AsyncContext 的目的并不是为了提高性能,也并不直接提供性能提升,它提供了把HTTP thread和Worker thread 解藕的机制,从而提高Web容器的响应能力(吞吐量)。

注:就像 reactor 一样,他们提升的都是吞吐量,而 Servlet3.0 异步处理是为了让接收 http 请求的线程不被慢方法阻塞住,从而接收更多的请求。

❌错误场景

web 容器接收请求的线程池线程数量为 200

worker 线程池线程数量为 300

所有的请求都发送到 worker 线程池

这样确实能提升吞吐量,因为干活的人多了,但是转念一想为何不直接将接收请求的线程池线程数量调大呢?

✅正确场景

web 容器接收请求的线程池线程数量为 200

worker 线程池线程数量任意

只将执行很慢方法发送到 worker 线程池

这样一来,无论有多少请求到慢速操作,它都不会将HTTP thread 占满导致其他请求无法处理。

4.2 SpringMVC 异步处理

大类 支持的返回值 描述 业务代码执行线程 描述
Callable Callable task1 spring 线程池
Callable WebAsyncTask task1 spring 线程池
Callable StreamingResponseBodyReturnValueHandler 支持异步往输出流写数据(不占用 http 线程) task1 spring 线程池
DeferredResult DeferredResult nio-8080-exec-2 另一个 Tomcat 线程
DeferredResult ListenableFuture 自定义线程
DeferredResult CompletionStage fork-join-pool-1 CompletionFuture内部的默认线程池
DeferredResult ResponseBodyEmitter 自定义线程
DeferredResult SseEmitter 自定义线程

测试代码在文末

由上表可知,其实 SpringMVC 支持处理的异步返回值主要是两类:Callable 和 DeferredResult。

4.2.1 Callable 类

先简单介绍下流程:

  1. ==== Tomcat 线程 1 ====
  2. CallableMethodReturnValueHandler 拿到了我们返回的 Callable 对象,调用 WebAsyncManager#startCallableProcessing 方法
  3. 调用 request.startAsync()方法开启 Tomcat 的异步处理
  4. 然后将 Callable 提交到线程池(Spring 内部定义的)中执行
  5. ==== 此时 Tomcat 线程的工作完成被回收,接下来都是 “task1” 线程的工作 ====
  6. 执行 Callable#call() 方法,等待运行结束返回结果
  7. 将结果存入当前请求对应的 WebAsyncManager 中
  8. Spring MVC 会调用 asyncContext.dispatch()将请求重新交给Servlet容器完成
  9. ==== 此时 “task1” 线程的工作已完成,切换到 Tomcat 线程 2 ====
  10. DispatcherServlet 会再次接收到该请求
  11. 当调用到 RequestMappingHandlerAdapter#invokeHandlerMethod方法时,会将先前计算的结果从 WebAsyncManager 中取出并返回。

我们看到在整个过程中有三个线程切换,接下来我们以返回 Callable 为例来分析一下源码。

1)返回值处理器 CallableMethodReturnValueHandler

public class CallableMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

   @Override
   public boolean supportsReturnType(MethodParameter returnType) {
      return Callable.class.isAssignableFrom(returnType.getParameterType());
   }

   @Override
   public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
         ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

      if (returnValue == null) {
         mavContainer.setRequestHandled(true);
         return;
      }

      Callable<?> callable = (Callable<?>) returnValue;
        // 2. 从当前请求中获取对应的 WebAsyncManager;调用 startCallableProcessing 方法开启异步处理
      WebAsyncUtils.getAsyncManager(webRequest).startCallableProcessing(callable, mavContainer);
   }
}

public static WebAsyncManager getAsyncManager(WebRequest webRequest) {
  int scope = RequestAttributes.SCOPE_REQUEST;
  WebAsyncManager asyncManager = null;
  // 尝试从 request 域中获取 WebAsyncManager,一般都是能获取到的,因为在处理请求的时候 RequestMappingHandlerAdapter#invokeHandlerMethod 方法已经向 request 域中设置了当前请求对应的 WebAsyncManager
  Object asyncManagerAttr = webRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, scope);
  if (asyncManagerAttr instanceof WebAsyncManager) {
    asyncManager = (WebAsyncManager) asyncManagerAttr;
  }
  // 如果 request 域中不存在,则自己创建一个塞进去
  if (asyncManager == null) {
    asyncManager = new WebAsyncManager();
    webRequest.setAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE, asyncManager, scope);
  }
  return asyncManager;
}

2)WebAsyncManager#startCallableProcessing()

public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext)
      throws Exception {

   // 上面的代码省略... 主要就是添加一些拦截器和一些回调
 
   // 3. 调用了 AsyncContext#startAsync() 方法开启了异步处理
   startAsyncProcessing(processingContext);
   try {
      // 4. 这个线程池也是 RequestMappingHandlerAdapter#invokeHandlerMethod 方法中 set 的
      Future<?> future = this.taskExecutor.submit(() -> {
         Object result = null;
         try {
            interceptorChain.applyPreProcess(this.asyncWebRequest, callable);
            // 6. 使用 spring 内部的线程池执行提交的 Callable
            result = callable.call();
         }
         catch (Throwable ex) {
            result = ex;
         }
         finally {
            result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, result);
         }
         // 这个方法往下看
         setConcurrentResultAndDispatch(result);
      });
      interceptorChain.setTaskFuture(future);
   }
   catch (RejectedExecutionException ex) {
      Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
      setConcurrentResultAndDispatch(result);
      throw ex;
   }
}

private void setConcurrentResultAndDispatch(Object result) {
   synchronized (WebAsyncManager.this) {
      // 7. 设置计算的结果
      if (this.concurrentResult != RESULT_NONE) {
         return;
      }
      this.concurrentResult = result;
   }

   if (this.asyncWebRequest.isAsyncComplete()) {
      if (logger.isDebugEnabled()) {
         logger.debug("Async result set but request already complete: " + formatRequestUri());
      }
      return;
   }

   // 8. 调用 asyncContext#dispatch() 方法将请求重新交给Servlet容器完成
   this.asyncWebRequest.dispatch();
}

3)DispatcherServlet 再次接收到该请求

4)调用到 RequestMappingHandlerAdapter#invokeHandlerMethod方法处理请求

@Nullable
protected ModelAndView invokeHandlerMethod(HttpServletRequest request,
      HttpServletResponse response, HandlerMethod handlerMethod) throws Exception {

   ServletWebRequest webRequest = new ServletWebRequest(request, response);
   try {
      WebDataBinderFactory binderFactory = getDataBinderFactory(handlerMethod);
      ModelFactory modelFactory = getModelFactory(handlerMethod, binderFactory);

      ServletInvocableHandlerMethod invocableMethod = createInvocableHandlerMethod(handlerMethod);
      if (this.argumentResolvers != null) {
         invocableMethod.setHandlerMethodArgumentResolvers(this.argumentResolvers);
      }
      if (this.returnValueHandlers != null) {
         invocableMethod.setHandlerMethodReturnValueHandlers(this.returnValueHandlers);
      }
      invocableMethod.setDataBinderFactory(binderFactory);
      invocableMethod.setParameterNameDiscoverer(this.parameterNameDiscoverer);

      ModelAndViewContainer mavContainer = new ModelAndViewContainer();
      mavContainer.addAllAttributes(RequestContextUtils.getInputFlashMap(request));
      modelFactory.initModel(webRequest, mavContainer, invocableMethod);
      mavContainer.setIgnoreDefaultModelOnRedirect(this.ignoreDefaultModelOnRedirect);

      AsyncWebRequest asyncWebRequest = WebAsyncUtils.createAsyncWebRequest(request, response);
      asyncWebRequest.setTimeout(this.asyncRequestTimeout);

      // 从 request 域中取出 WebAsyncManager
      WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
      asyncManager.setTaskExecutor(this.taskExecutor);
      asyncManager.setAsyncWebRequest(asyncWebRequest);
      asyncManager.registerCallableInterceptors(this.callableInterceptors);
      asyncManager.registerDeferredResultInterceptors(this.deferredResultInterceptors);

      // 11. 判断 WebAsyncManager 中是否已经存在结果了,如果存在表示已经调用过 asyncContext#dispatch() 方法
      if (asyncManager.hasConcurrentResult()) {
         Object result = asyncManager.getConcurrentResult();
         mavContainer = (ModelAndViewContainer) asyncManager.getConcurrentResultContext()[0];
         asyncManager.clearConcurrentResult();
         LogFormatUtils.traceDebug(logger, traceOn -> {
            String formatted = LogFormatUtils.formatValue(result, !traceOn);
            return "Resume with async result [" + formatted + "]";
         });
         // 此时的 invocableMethod 的返回值类型还标注的是 Callable 类型,需要将其调整成返回 result 的方法(() -> result),以复用之前的逻辑
         invocableMethod = invocableMethod.wrapConcurrentResult(result);
      }

      // 执行方法
      invocableMethod.invokeAndHandle(webRequest, mavContainer);
      if (asyncManager.isConcurrentHandlingStarted()) {
         return null;
      }

      return getModelAndView(mavContainer, modelFactory, webRequest);
   }
   finally {
      webRequest.requestCompleted();
   }
}

4.2.2 DeferredResult 类

大体上和 Callable 类是相同的,区别在于:

  1. ListenableFuture、CompletionStage、ResponseBodyEmitter、SseEmitter 是用自定义的线程池执行的 Callable(可以参见附录中的测试代码)
  2. DeferredResult 是通过另一个 Tomcat 线程执行的

1)返回值处理器 CallableMethodReturnValueHandler

public class DeferredResultMethodReturnValueHandler implements HandlerMethodReturnValueHandler {

   @Override
   public boolean supportsReturnType(MethodParameter returnType) {
      Class<?> type = returnType.getParameterType();
      return (DeferredResult.class.isAssignableFrom(type) ||
            ListenableFuture.class.isAssignableFrom(type) ||
            CompletionStage.class.isAssignableFrom(type));
   }

   @Override
   public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
         ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

      if (returnValue == null) {
         mavContainer.setRequestHandled(true);
         return;
      }

      DeferredResult<?> result;

      if (returnValue instanceof DeferredResult) {
         result = (DeferredResult<?>) returnValue;
      }
      else if (returnValue instanceof ListenableFuture) {
         // 将 ListenableFuture 适配成 DeferredResult,我们看下适配的代码
         result = adaptListenableFuture((ListenableFuture<?>) returnValue);
      }
      else if (returnValue instanceof CompletionStage) {
         // 这个与 ListenableFuture 类似,不在重复分析
         result = adaptCompletionStage((CompletionStage<?>) returnValue);
      }
      else {
         // Should not happen...
         throw new IllegalStateException("Unexpected return value type: " + returnValue);
      }

      // 从当前请求中获取对应的 WebAsyncManager;调用 startDeferredResultProcessing 方法开启异步处理
      WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
   }

   private DeferredResult<Object> adaptListenableFuture(ListenableFuture<?> future) {
      // 注意。这个地方使用的是 DeferredResult 的空参构造,我们进去看一下
      DeferredResult<Object> result = new DeferredResult<>();
      future.addCallback(new ListenableFutureCallback<Object>() {
         @Override
         public void onSuccess(@Nullable Object value) {
            // 当成功返回结果时设置值
            result.setResult(value);
         }
         @Override
         public void onFailure(Throwable ex) {
            result.setErrorResult(ex);
         }
      });
      return result;
   }
}

public class DeferredResult<T> {

    private static final Object RESULT_NONE = new Object();

    @Nullable
    private final Long timeoutValue;

    private final Supplier<?> timeoutResult;

    public DeferredResult() {
    // 记住这个地方将 timeoutResult 设置成返回 RESULT_NONE,后面要考
        this(null, () -> RESULT_NONE);
    }
  
    public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
        this.timeoutValue = timeoutValue;
        this.timeoutResult = timeoutResult;
    }

2)WebAsyncManager#startDeferredResultProcessing()

public void startDeferredResultProcessing(
      final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

     // 省略部分代码。。。

   final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);

   this.asyncWebRequest.addTimeoutHandler(() -> {
      try {
         interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
      }
      catch (Throwable ex) {
         setConcurrentResultAndDispatch(ex);
      }
   });

   // 调用了 AsyncContext#startAsync() 方法开启了异步处理
   startAsyncProcessing(processingContext);

   try {
      interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
      // 重点看这里,这个地方是注册了一个返回值处理器,在返回值处理器中调用的 asyncContext#dispatch() 方法
      // 因为像 ListenableFuture 这样的实际上已经在线程池中执行了,而且支持注册回调,直接在回调里执行就好了
      deferredResult.setResultHandler(result -> {
         result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
         // 这个同上
         setConcurrentResultAndDispatch(result);
      });
   }
   catch (Throwable ex) {
      setConcurrentResultAndDispatch(ex);
   }
}

后续的操作就和 Callable 类是一样的了。。。

3)问题:当返回 DeferredResult 时,其并没有提交到线程池中呀!那怎么办呢?

此处其实我也不太懂,可能是利用了 Tomcat 的超时机制,他会调用到我们在这里注册的超时处理器

public void startDeferredResultProcessing(
      final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

     // 省略部分代码。。。

   final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
    
   this.asyncWebRequest.addTimeoutHandler(() -> {
      try {
         // 可能是利用了 socket 会触发一个 timeout 事件,从而调用到此处
         interceptorChain.triggerAfterTimeout(this.asyncWebRequest, deferredResult);
      }
      catch (Throwable ex) {
         setConcurrentResultAndDispatch(ex);
      }
   });

接着他会调用到 DeferredResult 中的一个匿名内部类的 handleTimeout() 方法:

public class DeferredResult<T> {

    private static final Object RESULT_NONE = new Object();

    @Nullable
    private final Long timeoutValue;

    private final Supplier<?> timeoutResult;

    public DeferredResult() {
    // 记住这个地方将 timeoutResult 设置成返回 RESULT_NONE,后面要考
        this(null, () -> RESULT_NONE);
    }
  
    public DeferredResult(@Nullable Long timeoutValue, Supplier<?> timeoutResult) {
        this.timeoutValue = timeoutValue;
        this.timeoutResult = timeoutResult;
    }

  final DeferredResultProcessingInterceptor getInterceptor() {
     return new DeferredResultProcessingInterceptor() {
        @Override
        public <S> boolean handleTimeout(NativeWebRequest request, DeferredResult<S> deferredResult) {
           boolean continueProcessing = true;
           try {
              if (timeoutCallback != null) {
                 timeoutCallback.run();
              }
           }
           finally {
                // 对于返回的是 DeferredResult,此处会调用传入的 Callable
              // 而对于返回的是 ListenableFuture 此处调用会返回 RESULT_NONE
              Object value = timeoutResult.get();
              if (value != RESULT_NONE) {
                 continueProcessing = false;
                 try {
                    /*
                    此处调用的是此部分代码,asyncContext#dispatch()
                    deferredResult.setResultHandler(result -> {
                      result = interceptorChain.applyPostProcess(this.asyncWebRequest, deferredResult, result);
                      setConcurrentResultAndDispatch(result);
                    });
                    */
                    // 注意:由于当前调用 asyncContext#dispatch() 方法已经是 Tomcat 线程了,所以他并不会进行线程切换,也就是说:当返回的是 DeferredResult 对象时,他只有两次线程切换 tomcat thread1 -> tomcat thread2
                    setResultInternal(value);
                 }
                 catch (Throwable ex) {
                    logger.debug("Failed to handle timeout result", ex);
                 }
              }
           }
           return continueProcessing;
        }
  }

参考文章

Servlet 3.0 异步处理详解

Spring MVC异步处理简介

Servlet3.0新特性:异步处理,太好用了!!!(这篇虽然我没参考,但是我觉得他写的挺好)

【springmvc】对异步处理的支持

附录

4.2 测试代码

import lombok.extern.slf4j.Slf4j;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureTask;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncTask;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import org.springframework.web.servlet.mvc.method.annotation.StreamingResponseBody;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executors;

@Slf4j
@RestController
@RequestMapping("/rest/async")
public class AsyncController {

    // callable 族

    @GetMapping("/v1/callable")
    public Callable<String> get() {
        log.info("controller....");
        return () -> {
            log.warn("service....");
            return "xxx";
        };
    }

    @GetMapping("/v1/WebAsyncTask")
    public WebAsyncTask<String> getWebAsyncTask() {
        log.info("controller....");
        return new WebAsyncTask<>(() -> {
            log.warn("service....");
            return "xxx";
        });
    }

    @GetMapping("/v1/StreamingResponseBody")
    public StreamingResponseBody getStreamingResponseBody() {
        log.info("controller....");
        return outputStream -> {
            log.warn("service....");
            outputStream.write("xxx".getBytes(StandardCharsets.UTF_8));
            outputStream.close();
        };
    }

    // DeferredResult 族

    @GetMapping("/v1/DeferredResult")
    public DeferredResult<String> getDeferredResult() {
        log.info("controller....");
        return new DeferredResult<>(10L, () -> {
            log.warn("service....");
            return "xxx";
        });
    }

    @GetMapping("/v1/ListenableFuture")
    public ListenableFuture<String> getListenableFuture() {
        log.info("controller....");
        ListenableFutureTask<String> task = new ListenableFutureTask<>(() -> {
            log.warn("service....");
            return "xxx";
        });
        Executors.newCachedThreadPool().submit(task);
        return task;
    }

    @GetMapping("/v1/CompletionStage")
    public CompletionStage<String> getCompletionStage() {
        log.info("controller....");
        return CompletableFuture.supplyAsync(() -> {
            log.warn("service....");
            return "xxx";
        });
    }

    @GetMapping("/v1/ResponseBodyEmitter")
    public ResponseBodyEmitter getResponseBodyEmitter() {
        log.info("controller....");
        ResponseBodyEmitter emitter = new ResponseBodyEmitter();
        Executors.newCachedThreadPool().submit(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    emitter.send("Count: " + (i + 1));
                    emitter.send("\n");
                    emitter.send("hello");
                    emitter.send("\n\n");
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }

    @GetMapping("/v1/SseEmitter")
    public SseEmitter getSseEmitter() {
        SseEmitter emitter = new SseEmitter();
        Executors.newCachedThreadPool().submit(() -> {
            try {
                for (int i = 0; i < 5; i++) {
                    emitter.send("Count: " + (i + 1));
                    emitter.send("\n");
                    emitter.send("hello");
                    emitter.send("\n\n");
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;
    }

}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容