JAVA && Spring && SpringBoot2.x — 学习目录
我们在JDK中,可以使用ThreadPoolExecutor提供线程池服务,相关理论,可以在多线程——线程池ThreadPoolExecutor了解。但是SpringBoot提供了@Async
[鹅神可]
注解,帮助我们更方便的将业务逻辑提交到线程池中异步处理。
1. SpringBoot对线程池的自动装载
源代码:org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration
@Bean
@ConditionalOnMissingBean
public TaskExecutorBuilder taskExecutorBuilder() {
TaskExecutionProperties.Pool pool = this.properties.getPool();
TaskExecutorBuilder builder = new TaskExecutorBuilder();
builder = builder.queueCapacity(pool.getQueueCapacity());
builder = builder.corePoolSize(pool.getCoreSize());
builder = builder.maxPoolSize(pool.getMaxSize());
builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
builder = builder.keepAlive(pool.getKeepAlive());
builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
builder = builder.customizers(this.taskExecutorCustomizers);
builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
return builder;
}
@Lazy
@Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
@ConditionalOnMissingBean(Executor.class)
public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
return builder.build();
}
我们可以在配置文件中配置连接池的相关参数。
2. 自定义线程池
2.1 根据业务配置不同的线程池
我们不推荐一个项目配置一个线程池,这样若是某些业务出现异常时,会影响到整个项目的健壮性。故我们可以根据业务,为不同的业务配置不同参数的数据库连接池。
@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
@Bean
public Executor asyncServiceExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor();
//核心线程数
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(5);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
//拒绝策略
// threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
@Bean
public Executor customServiceExecutor(){
ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
//线程核心数目
threadPoolTaskExecutor.setCorePoolSize(10);
threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
//最大线程数
threadPoolTaskExecutor.setMaxPoolSize(10);
//配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
//配置线程池前缀
threadPoolTaskExecutor.setThreadNamePrefix("custom-service-");
//配置拒绝策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//数据初始化
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
}
若是想在使用连接池的时候,打印出连接池的各项参数,应当如何设置:
@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {
//打印队列的详细信息
private void showThreadPoolInfo(String prefix){
ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();
if(null==threadPoolExecutor){
return;
}
log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
this.getThreadNamePrefix(),
prefix,
threadPoolExecutor.getTaskCount(),
threadPoolExecutor.getCompletedTaskCount(),
threadPoolExecutor.getActiveCount(),
threadPoolExecutor.getQueue().size());
}
@Override
public void execute(Runnable task) {
showThreadPoolInfo("1. do execute");
super.execute(task);
}
@Override
public void execute(Runnable task, long startTimeout) {
showThreadPoolInfo("2. do execute");
super.execute(task, startTimeout);
}
@Override
public Future<?> submit(Runnable task) {
showThreadPoolInfo("1. do submit");
return super.submit(task);
}
@Override
public <T> Future<T> submit(Callable<T> task) {
showThreadPoolInfo("2. do submit");
return super.submit(task);
}
@Override
public ListenableFuture<?> submitListenable(Runnable task) {
showThreadPoolInfo("1. do submitListenable");
return super.submitListenable(task);
}
@Override
public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
showThreadPoolInfo("2. do submitListenable");
return super.submitListenable(task);
}
}
2.2 如何使用连接池
在业务方法中使用@Async注解,并且可以选择使用的连接池。来启动一个异步任务。
- 若是想获取到任务返回值,可创建Callable任务
//带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1() throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
log.info("Task1 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task1 accomplished!");
}
@Async("customServiceExecutor")
public Future<String> doTask2() throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
log.info("Task2 finished, time elapsed: {} ms.", end-start);
return new AsyncResult<>("Task2 accomplished!");
}
- 若是创建的Runnable的异步任务
//创建的是Runnable的任务
@Async("asyncServiceExecutor")
public void executeAsync() {
log.info("start executeAsync");
try{
Thread.sleep(1000);
}catch(Exception e){
e.printStackTrace();
}
log.info("end executeAsync");
}
2.3 如何获取任务的返回值
若是我们使用线程池,来并发的执行任务,首先需要考虑的是,如何等待最后一个任务执行完毕,对任务结果进行汇总处理。
方法一:使用自旋操作,等待任务结果返回。
@RequestMapping("/helloFuture")
@ResponseBody
public String helloFuture() {
try {
Future<String> future1 = serviceImpl.doTask1();
Future<String> future2 = serviceImpl.doTask2();
//自旋锁,停止等待
while (true) {
if (future1.isDone() && future2.isDone()) {
log.info("Task1 result:{}", future1.get());
log.info("Task2 result:{}", future2.get());
break;
}
Thread.sleep(1000);
}
log.info("All tasks finished.");
return "S";
} catch (InterruptedException e) {
log.error("错误信息1", e);
return "F";
} catch (ExecutionException e) {
log.error("错误信息2", e);
return "F";
}
}
方法二:使用CountDownLatch计数器
相关理论可以参考:多线程——CountDownLatch详解
@RequestMapping("/helloFuture2")
@ResponseBody
public String helloFuture2() {
try {
CountDownLatch latch=new CountDownLatch(2);
Future<String> future1 = serviceImpl.doTask1(latch);
Future<String> future2 = serviceImpl.doTask2(latch);
//等待两个线程执行完毕
latch.await();
log.info("All tasks finished!");
String result1 = future1.get();
String result2 = future2.get();
log.info(result1+"--"+result2);
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}
每个任务执行完毕,只需要调用latch.countDown();使得计数器-1。
//带返回值的任务
@Async("asyncServiceExecutor")
public Future<String> doTask1(CountDownLatch latch) throws InterruptedException{
log.info("Task1 started.");
long start = System.currentTimeMillis();
Thread.sleep(5000);
long end = System.currentTimeMillis();
log.info("Task1 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task1 accomplished!");
}
@Async("customServiceExecutor")
public Future<String> doTask2(CountDownLatch latch) throws InterruptedException{
log.info("Task2 started.");
long start = System.currentTimeMillis();
Thread.sleep(3000);
long end = System.currentTimeMillis();
log.info("Task2 finished, time elapsed: {} ms.", end-start);
latch.countDown();
return new AsyncResult<>("Task2 accomplished!");
}
方式三:使用Future的get方法的阻塞特性
@RequestMapping("/helloFuture2")
@ResponseBody
public String helloFuture2() {
try {
List<Future<String>> tasks = new ArrayList<>();
List<String> results = new ArrayList<>();
tasks.add(serviceImpl.doTask1());
tasks.add(serviceImpl.doTask2());
//各个任务执行完毕
for (Future<String> task : tasks) {
//每个任务都会再在此阻塞。
results.add(task.get());
}
log.info("All tasks finished!");
log.info("执行结果:{}", JSON.toJSONString(results));
return "S";
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "F";
}
2.4 Runnable异常处理
该配置可与线程池配置在一起,若异步线程抛出异常,会由该类打印。
@Configuration
public class ExecutorConfig implements AsyncConfigurer {
//配置异常处理机制
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex,method,params)->{
log.error("异步线程执行失败。方法:[{}],异常信息[{}] : ", method, ex.getMessage(),ex);
};
}
}
效果图:
2019-12-25 19:14:09.851 ERROR [] --- [async-service-1] c.g.Config.threadPool.ExecutorConfig : 异步线程执行失败。方法:[public void com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(java.lang.String,java.lang.String)],异常信息[/ by zero] :
java.lang.ArithmeticException: / by zero
at com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(AccountServiceImpl.java:308)
at com.galax.bussiness.account.impl.AccountServiceImpl$$FastClassBySpringCGLIB$$4e0db2a2.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
彩蛋——Future<T>使用lambda表达式
public void sendMail(Map<String, Object> model, String title, String templateName, String toMail, String[] ccMail, long timeout) throws Exception {
Future<String> submit;
submit = emailServiceExecutor.submit(() ->{
try {
return "s";
} catch (Exception e) {
return "F";
}
});
}
彩蛋——若自定义实现线程池,如何获取到各个任务的结果
若是我们自己实现线程池,可以使用
java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)
方法进行定时任务的批量处理,返回值其实是List<Future<T>>
,我们可以循环遍历该List,最终拿到各个任务的执行结果。
@Test
public void test() throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(9));
//需要执行的任务
List<Account> students= new ArrayList<>(5);
//将任务转换为Callable对象
List<Callable<Integer>> callables = new ArrayList<>();
//保存返回结果
List<Integer> results=new ArrayList<>();
//开启线程,lambda表达式
for (Student student : students) {
callables.add(()->{
//插入操作,并发执行
log.info(JSON.toJSONString(student ));
//表示异步操作
int save = serviceImpl.getStu(student);
//返回值
return save ;
});
}
//获取到所有任务的处理结果
List<Future<Integer>> futures = executor.invokeAll(callables);
//遍历每个任务的执行结果,每次future.get()只有在任务执行完毕后,才会继续循环操作,否则会阻塞,等待线程执行结束
for (Future<Integer> future:futures){
try {
results.add(future.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown(); //关闭线程池
log.info("数据执行完毕!{}",JSON.toJSONString(results));
}