异步调用
"异步调用" 对应 "同步调用",同步调用是指程序按照定义的顺序依次执行,在时间轴上看,这些程序是串行的,只有在上一程序执行完了才会执行下一程序,简单说我们平时写的代码80%以上都是同步调用的,这种程序要么是执行较快没必要拆出来用多线程跑(多线程涉及上下文切换以及线程同步问题),要么就是前后具有强依赖关系,也就是下一步依赖了上一段程序的处理结果。
但是也有很多情况下,两段程序之间实际上没有半毛钱关系,而且两个程序都很耗时,这个时候采用多线程技术或许会比较好一点。
下面通过一个简单的例子说明一下:
现在有个银行贷款服务,贷款前要对用户进行各种合法性检查,代码如下:
package com.ggr.hello.task;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
import static java.lang.Thread.sleep;
@Slf4j
@Component
public class LoanCheckTask {
/**
* 贷款前的用户信息检查
* @return
*/
public String getUserInfo(){
try {
sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "身份合法";
}
/**
* 贷款前用户的信用信息检查
* @return
*/
public String getBankCreditInfo(){
try {
sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "银行信用良好";
}
/**
* 开启该用户的贷款权限
* @return
*/
public Boolean getAllowLoan(String userInfo,String creditInfo){
try {
sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Boolean.TRUE;
}
}
下面是业务代码:
package com.ggr.hello.service;
import com.ggr.hello.task.LoanCheckTask;
import com.ggr.hello.task.MyAnsyTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static java.lang.Thread.sleep;
@Service
@Slf4j
public class MyTestService {
@Autowired
LoanCheckTask loanCheckTask;
/**
* 开通贷款服务
* @return
*/
public boolean openLoanService(){
long s = System.currentTimeMillis();
//获取用户基本信息
String userInfo = loanCheckTask.getUserInfo();//1
//获取用户的信用信息
String bankCreditInfo = loanCheckTask.getBankCreditInfo();//2
//是否可以开通贷款服务
Boolean allowLoan = loanCheckTask.getAllowLoan(userInfo,bankCreditInfo);//3
log.info("接口耗时time={}",System.currentTimeMillis()-s);
return allowLoan;
}
}
通过观察上面的代码我们很容易就知道这个实现是有问题的,因为程序正常执行完至少需要1.85秒,如果再加上网络延迟,数据传输等因素,我们的这个接口可能至少也要个1.9秒吧。1.9秒什么概念,理论上我们超过1秒的接口都是要重新审核的,用户永远不会听你的技术解释为什么这么慢的,每个人的时间都是宝贵的。
重新回到代码本身,我们发现实际上我们的代码实际上是有优化空间的,我们发现openLoanService
里面有两个程序完全是没有依赖关系的,且比较耗时。所以我们第一时间想到了,异步调用。于是我们根据所学的知识修改了一波如下:
/**
* 开通贷款服务
* @return
*/
public boolean openLoanService(){
long s = System.currentTimeMillis();
CompletableFuture<Boolean> cf = CompletableFuture
.supplyAsync(() -> loanCheckTask.getUserInfo())//s1
.thenCombineAsync(CompletableFuture.supplyAsync(()->loanCheckTask.getBankCreditInfo()),//s2
(BiFunction<String, String, Boolean>) (s12, s2) -> loanCheckTask.getAllowLoan(s12,s2));//s3
boolean allowLoan = cf.join();
log.info("接口耗时time={}",System.currentTimeMillis()-s);
return allowLoan;
}
使用异步优化后,我们发现我们的接口返回时间稳定在0.95秒左右。终于不用担心被架构师大佬疯狂虐了。在这个基础上面,我们业务上基本没啥可以优化的,唯一能提高的应该就落在了getUserInfo
和getBankCreditInfo
接口上了。
@Async 的使用
现在有一个问题,一个是CompletableFuture使用的默认线程池大小默认比较小,电脑核心数-1,如果我们的接口是一个热接口,访问比较频繁就会导致请求处理不过来,同时,我们希望这个线程池最好是我们可以方便地根据自己的项目情况自定义一些参数,同时交给Spring容器管理。SpringBoot的@Async提供了对这个方面的支持。
现在我们只要把我们需要执行的方法抽离到和调用方不同的一个类里面(Spring使用代理去做的异步调度,如果被调度的代码与调用方在一个类里面,将会导致异步调用失败,相当于代理异步调的还是自己,变相转变为了同步调用),再方法上加上这个@Async便可。将我们之前的代码改一下:
@Slf4j
@Component
public class LoanCheckTask {
/**
* 贷款前的用户信息检查
* @return
*/
@Async
public CompletableFuture getUserInfo(){
try {
sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture( "身份合法");
}
/**
* 贷款前用户的信用信息检查
* @return
*/
@Async
public CompletableFuture getBankCreditInfo(){
try {
sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture( "银行信用良好");
}
//另外一个接口不用异步,50毫秒,用多线程,上下文切换耗时都比这个数大了。
}
同时在启动类App.java里面加一个@EnableAsync开启异步调用的注解就可以了。Spring默认会开启一个coreSize为10,队列为200的线程池,具体可自定debug查看,可能版本不同会有不一样。
@SpringBootApplication
@EnableConfigurationProperties({TaskThreadPoolConfig.class} )
@EnableAsync
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class,args);
}
}
@Async 带返回值
网上大部分使用Future去接的,但是我这里使用的是CompletableFuture主要是为了适应Java 函数式编程的语法风格同时方便后面的代码书写。CompletableFuture比Future还是更加强大好用的。
* 开通贷款服务
*
* @return
*/
public boolean openLoanService() {
long s = System.currentTimeMillis();
CompletableFuture userInfoFuture = loanCheckTask.getUserInfo();
CompletableFuture bankCreditInfoFuture = loanCheckTask.getBankCreditInfo();
CompletableFuture<Boolean> future = userInfoFuture.thenCombine(bankCreditInfoFuture,
(BiFunction<String, String, Boolean>) (x, y) -> loanCheckTask.getAllowLoan(x, y));
Boolean join = future.join();
log.info("开通贷款服务time={}",System.currentTimeMillis()-s);
return join;
}
@Async 指定执行的线程池
我们如果想不用默认的线程池,可以自定义一个线程池放到容器中然后通过@Async 注解内部参数指定具体执行的pool的实例。
首先在配置文件application.yml中配置好我们需要的参数
spring:
task:
pool:
corePoolSize: 60
maxPoolSize: 100
keepAliveSecond: 120
queueCapacity: 2
http:
encoding:
force: true
charset: UTF-8
enabled: true
uri-encoding: UTF-8
然后将配置注入到我们新建的配置类中TaskThreadPoolConfig.java
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.task.pool") // 该注解的locations已经被启用,现在只要是在环境中,都会优先加载
public class TaskThreadPoolConfig {
private int corePoolSize;
private int maxPoolSize;
private int keepAliveSeconds;
private int queueCapacity;
}
下一步,初始化我们的线程池
package com.ggr.hello.config;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class TaskExecutePool {
@Autowired
private TaskThreadPoolConfig taskThreadPoolConfig;
@Bean("myTaskAsyncPool")
public Executor myTaskAsyncPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(taskThreadPoolConfig.getCorePoolSize());
executor.setMaxPoolSize(taskThreadPoolConfig.getMaxPoolSize());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.setQueueCapacity(taskThreadPoolConfig.getQueueCapacity());
executor.setKeepAliveSeconds(taskThreadPoolConfig.getKeepAliveSeconds());
executor.setThreadNamePrefix("MyExecutor-");
// rejection-policy:当pool已经达到max size的时候,如何处理新任务
// CALLER_RUNS:不在新线程中执行任务,而是由调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
最后指定一下我们注解所用到的线程池
@Async("myTaskAsyncPool")
public CompletableFuture<String> getName(){
try {
sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return CompletableFuture.completedFuture("你好hello");
}
好了,接下来我们就可以测试了。。。
当然,还有其他方法实现这个需求:我们点进去 @EnableAsync看源码就会发现官方提供的那种方法如下:
@Configuration
@EnableAsync
@Slf4j
public class AppConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(7);
executor.setMaxPoolSize(42);
executor.setQueueCapacity(11);
executor.setThreadNamePrefix("MyExecutor-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, objects) -> log.info("出错了throwable={},method={},params={}", throwable, method, objects);
}
}
@Async 更多说明
多线程,一般分两种,一种是一个线程一个队列,一种是多个线程一个队列,我们的线程池属于后一种。在使用之前,最好对线程池有一定的了解。