线程池需要引用commons-pool2的jar包
需要在启动类开启@EnableAsync注解
springboot中的定时任务异步线程池
@Configuration
public class ScheduleConfiguration implements SchedulingConfigurer {
@Value("${schedule.pool.size}")
private int poolSize;
@Value("${schedule.wait.seconds}")
private int waitSeconds;
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("schedule-");
// 根据定时任务的数量的执行频率修改设定
scheduler.setPoolSize(poolSize);
scheduler.setAwaitTerminationSeconds(waitSeconds);
scheduler.setWaitForTasksToCompleteOnShutdown(true);
// 设置拒绝策略:当线程池达到最大线程时,如何处理新任务。ABORT:抛出异常。
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
scheduler.initialize();
taskRegistrar.setScheduler(scheduler);
}
}
springboot开启异步线程池
@Configuration
public class TaskExecutorConfig {
@Value("${task.executor.core.pool.size}")
private int corePoolSize;
@Value("${task.executor.max.pool.size}")
private int maxPoolSize;
@Value("${task.executor.queue.capacity}")
private int queueCapacity;
@Value("${task.executor.keep.alive.seconds}")
private int keepAliveSeconds;
@Bean
public TaskExecutor myAsync() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix("async-");
//rejection-policy:当pool已经达到max size的时候,如何处理新任务
//CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
executor.afterPropertiesSet();
return executor;
}
}
之后就可以通过@Async开启异步线程了
以下为添加线程池失败的四种拒绝策略:
AbortPolicy:默认,拒绝并抛出异常
DiscardPolicy:丢掉,不会抛出异常
DiscardOldestPolicy:删除最早的任务,尝试加入队列
CallerRunsPolicy:自己执行
自定义:也可以自定义
@Component
public class AsyncTask {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncTask.class);
@Async
public Future<Long> getTotalHitsFuture() throws InterruptedException {
LOGGER.info("get total . ");
Thread.sleep(1000);
return new AsyncResult<>(1L);
}
}
@Component
public class MyTask {
private static final Logger LOGGER = LoggerFactory.getLogger(MyTask.class);
@Autowired
private AsyncTask asyncTask;
@Scheduled(cron = "0/30 * * * * *")
public void myFunction() throws ExecutionException, InterruptedException {
System.out.println("task start . ");
Future<Long> future0 = asyncTask.getTotalHitsFuture();
Future<Long> future1 = asyncTask.getTotalHitsFuture();
Future<Long> future2 = asyncTask.getTotalHitsFuture();
Long total = future0.get()+future1.get()+future2.get();
LOGGER.info(total+"===");
}
}