SpringBoot2.x整合线程池(ThreadPoolTaskExecutor)

JAVA && Spring && SpringBoot2.x — 学习目录

我们在JDK中,可以使用ThreadPoolExecutor提供线程池服务,相关理论,可以在多线程——线程池ThreadPoolExecutor了解。但是SpringBoot提供了@Async [鹅神可]注解,帮助我们更方便的将业务逻辑提交到线程池中异步处理。

1. SpringBoot对线程池的自动装载

源代码:org.springframework.boot.autoconfigure.task.TaskExecutionAutoConfiguration

    @Bean
    @ConditionalOnMissingBean
    public TaskExecutorBuilder taskExecutorBuilder() {
        TaskExecutionProperties.Pool pool = this.properties.getPool();
        TaskExecutorBuilder builder = new TaskExecutorBuilder();
        builder = builder.queueCapacity(pool.getQueueCapacity());
        builder = builder.corePoolSize(pool.getCoreSize());
        builder = builder.maxPoolSize(pool.getMaxSize());
        builder = builder.allowCoreThreadTimeOut(pool.isAllowCoreThreadTimeout());
        builder = builder.keepAlive(pool.getKeepAlive());
        builder = builder.threadNamePrefix(this.properties.getThreadNamePrefix());
        builder = builder.customizers(this.taskExecutorCustomizers);
        builder = builder.taskDecorator(this.taskDecorator.getIfUnique());
        return builder;
    }

    @Lazy
    @Bean(name = { APPLICATION_TASK_EXECUTOR_BEAN_NAME,
            AsyncAnnotationBeanPostProcessor.DEFAULT_TASK_EXECUTOR_BEAN_NAME })
    @ConditionalOnMissingBean(Executor.class)
    public ThreadPoolTaskExecutor applicationTaskExecutor(TaskExecutorBuilder builder) {
        return builder.build();
    }

我们可以在配置文件中配置连接池的相关参数。

2. 自定义线程池

2.1 根据业务配置不同的线程池

我们不推荐一个项目配置一个线程池,这样若是某些业务出现异常时,会影响到整个项目的健壮性。故我们可以根据业务,为不同的业务配置不同参数的数据库连接池。

@Configuration
@EnableAsync
@Slf4j
public class ExecutorConfig {
  
    @Bean
    public Executor asyncServiceExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new VisiableThreadPoolTaskExecutor();
        //核心线程数
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        //最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(5);
        //配置队列大小
        threadPoolTaskExecutor.setQueueCapacity(50);
        //配置线程池前缀
        threadPoolTaskExecutor.setThreadNamePrefix("async-service-");
        //拒绝策略
//        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        threadPoolTaskExecutor.setRejectedExecutionHandler(new PrintingPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Bean
    public Executor customServiceExecutor(){
        ThreadPoolTaskExecutor threadPoolTaskExecutor=new ThreadPoolTaskExecutor();
        //线程核心数目
        threadPoolTaskExecutor.setCorePoolSize(10);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        //最大线程数
        threadPoolTaskExecutor.setMaxPoolSize(10);
        //配置队列大小
        threadPoolTaskExecutor.setQueueCapacity(50);
        //配置线程池前缀
        threadPoolTaskExecutor.setThreadNamePrefix("custom-service-");
        //配置拒绝策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
        //数据初始化
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

若是想在使用连接池的时候,打印出连接池的各项参数,应当如何设置:

@Slf4j
public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor {


    //打印队列的详细信息
    private void showThreadPoolInfo(String prefix){
        ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor();

        if(null==threadPoolExecutor){
            return;
        }

        log.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]",
                this.getThreadNamePrefix(),
                prefix,
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size());
    }


    @Override
    public void execute(Runnable task) {
        showThreadPoolInfo("1. do execute");
        super.execute(task);
    }

    @Override
    public void execute(Runnable task, long startTimeout) {
        showThreadPoolInfo("2. do execute");
        super.execute(task, startTimeout);
    }

    @Override
    public Future<?> submit(Runnable task) {
        showThreadPoolInfo("1. do submit");
        return super.submit(task);
    }

    @Override
    public <T> Future<T> submit(Callable<T> task) {
        showThreadPoolInfo("2. do submit");
        return super.submit(task);
    }

    @Override
    public ListenableFuture<?> submitListenable(Runnable task) {
        showThreadPoolInfo("1. do submitListenable");
        return super.submitListenable(task);
    }

    @Override
    public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
        showThreadPoolInfo("2. do submitListenable");
        return super.submitListenable(task);
    }

}

2.2 如何使用连接池

在业务方法中使用@Async注解,并且可以选择使用的连接池。来启动一个异步任务。

  1. 若是想获取到任务返回值,可创建Callable任务
//带返回值的任务
    @Async("asyncServiceExecutor")
    public Future<String> doTask1() throws InterruptedException{
        log.info("Task1 started.");
        long start = System.currentTimeMillis();
        Thread.sleep(5000);
        long end = System.currentTimeMillis();

        log.info("Task1 finished, time elapsed: {} ms.", end-start);

        return new AsyncResult<>("Task1 accomplished!");
    }

    @Async("customServiceExecutor")
    public Future<String> doTask2() throws InterruptedException{
        log.info("Task2 started.");
        long start = System.currentTimeMillis();
        Thread.sleep(3000);
        long end = System.currentTimeMillis();

        log.info("Task2 finished, time elapsed: {} ms.", end-start);

        return new AsyncResult<>("Task2 accomplished!");
    }
  1. 若是创建的Runnable的异步任务
    //创建的是Runnable的任务
    @Async("asyncServiceExecutor")
    public void executeAsync() {
        log.info("start executeAsync");
        try{
            Thread.sleep(1000);
        }catch(Exception e){
            e.printStackTrace();
        }
        log.info("end executeAsync");
    }

2.3 如何获取任务的返回值

若是我们使用线程池,来并发的执行任务,首先需要考虑的是,如何等待最后一个任务执行完毕,对任务结果进行汇总处理。

方法一:使用自旋操作,等待任务结果返回。

    @RequestMapping("/helloFuture")
    @ResponseBody
    public String helloFuture() {
        try {
            Future<String> future1 = serviceImpl.doTask1();
            Future<String> future2 = serviceImpl.doTask2();
            //自旋锁,停止等待
            while (true) {
                if (future1.isDone() && future2.isDone()) {
                    log.info("Task1 result:{}", future1.get());
                    log.info("Task2 result:{}", future2.get());
                    break;
                }
                Thread.sleep(1000);
            }
            log.info("All tasks finished.");
            return "S";
        } catch (InterruptedException e) {
            log.error("错误信息1", e);
            return "F";
        } catch (ExecutionException e) {
            log.error("错误信息2", e);
            return "F";
        }
    }

方法二:使用CountDownLatch计数器

相关理论可以参考:多线程——CountDownLatch详解

    @RequestMapping("/helloFuture2")
    @ResponseBody
    public String helloFuture2() {
        try {
            CountDownLatch latch=new CountDownLatch(2);
            Future<String> future1 = serviceImpl.doTask1(latch);
            Future<String> future2 = serviceImpl.doTask2(latch);
            //等待两个线程执行完毕
            latch.await();
            log.info("All tasks finished!");
            String result1 = future1.get();
            String result2 = future2.get();
            log.info(result1+"--"+result2);
            return "S";
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "F";
    }

每个任务执行完毕,只需要调用latch.countDown();使得计数器-1。

  //带返回值的任务
    @Async("asyncServiceExecutor")
    public Future<String> doTask1(CountDownLatch latch) throws InterruptedException{
        log.info("Task1 started.");
        long start = System.currentTimeMillis();
        Thread.sleep(5000);
        long end = System.currentTimeMillis();

        log.info("Task1 finished, time elapsed: {} ms.", end-start);
        latch.countDown();
        return new AsyncResult<>("Task1 accomplished!");
    }

    @Async("customServiceExecutor")
    public Future<String> doTask2(CountDownLatch latch) throws InterruptedException{
        log.info("Task2 started.");
        long start = System.currentTimeMillis();
        Thread.sleep(3000);
        long end = System.currentTimeMillis();

        log.info("Task2 finished, time elapsed: {} ms.", end-start);
        latch.countDown();
        return new AsyncResult<>("Task2 accomplished!");
    }

方式三:使用Future的get方法的阻塞特性

    @RequestMapping("/helloFuture2")
    @ResponseBody
    public String helloFuture2() {
        try {
            List<Future<String>> tasks = new ArrayList<>();
            List<String> results = new ArrayList<>();
            tasks.add(serviceImpl.doTask1());
            tasks.add(serviceImpl.doTask2());
            //各个任务执行完毕
            for (Future<String> task : tasks) {
                //每个任务都会再在此阻塞。
                results.add(task.get());
            }
            log.info("All tasks finished!");
            log.info("执行结果:{}", JSON.toJSONString(results));
            return "S";
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "F";
    }

2.4 Runnable异常处理

该配置可与线程池配置在一起,若异步线程抛出异常,会由该类打印。

@Configuration
public class ExecutorConfig implements AsyncConfigurer {
    //配置异常处理机制
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex,method,params)->{
            log.error("异步线程执行失败。方法:[{}],异常信息[{}] : ", method, ex.getMessage(),ex);
        };
    }
}

效果图:

2019-12-25 19:14:09.851 ERROR [] --- [async-service-1] c.g.Config.threadPool.ExecutorConfig     : 异步线程执行失败。方法:[public void com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(java.lang.String,java.lang.String)],异常信息[/ by zero] : 

java.lang.ArithmeticException: / by zero
    at com.galax.bussiness.account.impl.AccountServiceImpl.getAccInfoByTime(AccountServiceImpl.java:308)
    at com.galax.bussiness.account.impl.AccountServiceImpl$$FastClassBySpringCGLIB$$4e0db2a2.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:93)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

彩蛋——Future<T>使用lambda表达式

    public void sendMail(Map<String, Object> model, String title, String templateName, String toMail, String[] ccMail, long timeout) throws Exception {
        Future<String> submit;
        submit = emailServiceExecutor.submit(() ->{
            try {
                return "s";
            } catch (Exception e) {
                return "F";
            }
        });

    }

彩蛋——若自定义实现线程池,如何获取到各个任务的结果

若是我们自己实现线程池,可以使用java.util.concurrent.AbstractExecutorService#invokeAll(java.util.Collection<? extends java.util.concurrent.Callable<T>>)方法进行定时任务的批量处理,返回值其实是List<Future<T>>,我们可以循环遍历该List,最终拿到各个任务的执行结果。

  @Test
    public void test() throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 5, 60,
                TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(9));
        //需要执行的任务
        List<Account> students= new ArrayList<>(5);
        //将任务转换为Callable对象
        List<Callable<Integer>> callables = new ArrayList<>();
        //保存返回结果
        List<Integer> results=new ArrayList<>();
        //开启线程,lambda表达式
        for (Student student : students) {
            callables.add(()->{
                //插入操作,并发执行
                log.info(JSON.toJSONString(student ));
                //表示异步操作
                int save = serviceImpl.getStu(student);
               //返回值
                return save ;
            });
        }
        //获取到所有任务的处理结果
        List<Future<Integer>> futures = executor.invokeAll(callables);
        //遍历每个任务的执行结果,每次future.get()只有在任务执行完毕后,才会继续循环操作,否则会阻塞,等待线程执行结束
        for (Future<Integer> future:futures){
            try {
                results.add(future.get());
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        executor.shutdown();  //关闭线程池
        log.info("数据执行完毕!{}",JSON.toJSONString(results));
    }

推荐阅读

Callable+ThreadPoolExecutor实现多线程并发并获得返回值

springboot线程池的使用和扩展

多线程——线程池ThreadPoolExecutor

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,937评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,503评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,712评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,668评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,677评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,601评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,975评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,637评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,881评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,621评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,710评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,387评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,971评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,947评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,189评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,805评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,449评论 2 342