前言
为了把ThreadPoolExecutor和ScheduledThreadPoolExecutor整合到Spring ioc容器中去,Spring提供了ThreadPoolTaskExecutor和ThreadPoolTaskScheduler这两个JavaBean风格的类。并且这两个类还做了一些针对性的优化。
ThreadPoolTaskExecutor
ThreadPoolTaskExecutor对等于ThreadPoolExecutor,我们可以使用xml或者@Bean来创建ThreadPoolTaskExecutor。
改进点
- 提供默认配置,原生的ThreadPoolExecutor的除了ThreadFactory和RejectedExecutionHandler其他没有默认配置
- 实现AsyncListenableTaskExecutor接口,支持对FutureTask添加success和fail的回调,任务成功或失败的时候回执行对应回调方法。
- 因为是spring的工具类,所以抛出的RejectedExecutionException也会被转换为spring框架的TaskRejectedException异常(这个无所谓)
- 提供默认ThreadFactory实现,直接通过参数重载配置
配置
xml
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10"/>
<property name="maxPoolSize" value="20" />
<property name="queueCapacity" value="100" />
<property name="threadNamePrefix" value="test" />
</bean>
注解
@Bean
public ThreadPoolTaskExecutor test2(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("test2");
return executor;
}
使用
和使用ThreadPoolExecutor一致
ThreadPoolTaskScheduler
ThreadPoolTaskScheduler对等ScheduledThreadPoolExecutor,同样的可以使用xml和@Bean注解来创建它。但是针对这个类在封装ScheduledThreadPoolExecutor的基础上做的改进比较大。
改进点
除了ThreadPoolTaskExecutor中的改进之外,还有
- 提供默认配置,因为是ScheduledThreadPoolExecutor,所以只有poolSize这一个默认参数
- 支持自定义任务,通过传入Trigger参数
- 对任务出错处理进行优化,如果是重复性的任务,不抛出异常,通过日志记录下来,不影响下次运行,如果是只执行一次的任务,将异常往上抛。
2和3这两个改进点很有意义,后面会讲解下它们的源码实现。
配置
配置和ThreadPoolTaskExecutor类似。
使用
Trigger使用
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setPoolSize(1);
scheduler.initialize();
scheduler.schedule(new Runnable() {
@Override
public void run() {
System.out.println("test");
}
}, new Trigger() {
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Calendar calendar = Calendar.getInstance();
calendar.add(Calendar.SECOND,10);
return calendar.getTime();
}
});
使用自定义Trigger我们可以在运行时决定下一次任务执行在什么时候触发
针对异常处理测试
ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();
scheduler.setThreadNamePrefix("spring");
scheduler.setPoolSize(1);
scheduler.initialize();
final AtomicInteger count = new AtomicInteger();
Runnable runnable = new Runnable() {
@Override
public void run() {
int num = count.getAndAdd(1);
if(num%2==0){
System.out.println(Thread.currentThread().getName()+"success");
}else{
throw new RuntimeException("test");
}
}
};
scheduler.scheduleAtFixedRate(runnable,6000);
final AtomicInteger count = new AtomicInteger();
Runnable runnable = new Runnable() {
@Override
public void run() {
int num = count.getAndAdd(1);
if(num%2==0){
System.out.println(Thread.currentThread().getName()+"success");
}else{
throw new RuntimeException("test");
}
}
};
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
scheduledThreadPoolExecutor.scheduleAtFixedRate(runnable,0,10, TimeUnit.SECONDS);
运行上述两段代码后,可以发现,Spring的ThreadPoolTaskScheduler会打印抛出的异常,并且不会影响第二次运行,而ScheduledThreadPoolExecutor首先不会抛出异常,其次这个任务取消运行了。
如果在ScheduledThreadPoolExecutor,不想因为异常而中断运行,我们需要手动进行try-catch捕获异常,而ThreadPoolTaskScheduler帮我们做了这一步。
源码分析
自定义Trigger
Trigger是spring附加的一种任务调度策略,我们可以设计自己的任务调度策略,比如我们最常用的CronTrigger。
先看下Triiger的接口定义
public interface Trigger {
//返回下次任务执行的时间
Date nextExecutionTime(TriggerContext triggerContext);
}
在nextExecutionTime方法体内,我们可以根据TriggerContext(任务执行上下文)来决定下一次执行的时间,从TriggerContext中我们可以获取以下三个时间。
public interface TriggerContext {
//上次调度的时间
Date lastScheduledExecutionTime();
//上次调度执行开始时间
Date lastActualExecutionTime();
//上次调度结束时间
Date lastCompletionTime();
}
我们可以根据这三个时间,决定下次任务执行的时间。
下面来看下ThreadPoolTaskScheduler是如何使用Trigger的
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {
ScheduledExecutorService executor = getScheduledExecutor();
try {
ErrorHandler errorHandler = this.errorHandler;
if (errorHandler == null) {
errorHandler = TaskUtils.getDefaultErrorHandler(true);
}
//将task转换为ReschedulingRunnable,并且调用schedule方法
return new ReschedulingRunnable(task, trigger, executor, errorHandler).schedule();
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
在使用Trigger的时候,我们其实没有直接把task放到线程池中去,而是间接使用了ReschedulingRunnable。
public ScheduledFuture<?> schedule() {
synchronized (this.triggerContextMonitor) {
//通过triggerContext计算下次执行时间
this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);
//返回时间为空,结束调度
if (this.scheduledExecutionTime == null) {
return null;
}
//计算任务执行delay
long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();
//放到任务线程池执行,通过delay的方式
this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);
return this;
}
}
ReschedulingRunnable会对run方法的重载
public void run() {
//执行时间
Date actualExecutionTime = new Date();
super.run();
//执行完成时间
Date completionTime = new Date();
synchronized (this.triggerContextMonitor) {
Assert.state(this.scheduledExecutionTime != null, "No scheduled execution");
//更新triggerContext中的时间
this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);
if (!obtainCurrentFuture().isCancelled()) {
//再次放入线程池中
schedule();
}
}
}
总结下ReschedulingRunnable的原理
ReschedulingRunnable的设计其实是基于ScheduledThreadPoolExecutor的ScheduledFutureTask,在ScheduledFutureTask中,任务执行完成,会根据设置的time,period计算下次执行时间,并且再次放入到ScheduledThreadPoolExecutor中。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
//根据time和period计算下次执行时间
setNextRunTime();
//把task再次放入到线程池中
reExecutePeriodic(outerTask);
}
}
ReschedulingRunnable也类似,它在执行完成后,通过TriggerContext和Trigger计算下次执行时间,并且再次把当前task放入到任务线程池中去,不过是以延迟任务的方式。
CronTrigger
CronTrigger是自定义Trigger的实现之一,使用CronTrigger,我们可以根据cron表达式定时执行我们的任务。
scheduler.schedule(new Runnable() {
@Override
public void run() {
System.out.println("test");
}
},new CronTrigger("5 * * * * *"));
通过上述代码,我们可以在每分钟的第五秒运行我们的任务。
原理的话,知道Trigger的作用后,推断一下就能知道。
在CronTrigger内部会根据cron表达式以及TriggerContext计算下一次任务执行的时间。
public Date nextExecutionTime(TriggerContext triggerContext) {
//获取上次运行结束时间
Date date = triggerContext.lastCompletionTime();
if (date != null) {
Date scheduled = triggerContext.lastScheduledExecutionTime();
if (scheduled != null && date.before(scheduled)) {
date = scheduled;
}
}
else {
date = new Date();
}
//通过cron表达式以及date计算下次运行时间
return this.sequenceGenerator.next(date);
}
sequenceGenerator就是封装cron表达式计算逻辑的类。
错误封装处理
从上面的使用案例中,我们看到ThreadPoolTaskScheduler会对任务抛出的异常进行处理,从而防止任务停止执行。在往线程池放任务的时候,会对任务再次进行装饰。
public ScheduledFuture<?> schedule(Runnable task, Date startTime) {
ScheduledExecutorService executor = getScheduledExecutor();
long initialDelay = startTime.getTime() - System.currentTimeMillis();
try {
//使用errorHandlingTask装饰任务
return executor.schedule(errorHandlingTask(task, false), initialDelay, TimeUnit.MILLISECONDS);
}
catch (RejectedExecutionException ex) {
throw new TaskRejectedException("Executor [" + executor + "] did not accept task: " + task, ex);
}
}
errorHandlingTask中会使用DelegatingErrorHandlingRunnable装饰task
public void run() {
try {
this.delegate.run();
}
catch (UndeclaredThrowableException ex) {
this.errorHandler.handleError(ex.getUndeclaredThrowable());
}
catch (Throwable ex) {
this.errorHandler.handleError(ex);
}
}
默认的errorHandler有两个,会根据任务是否可重复执行选择其中一个。
public static ErrorHandler getDefaultErrorHandler(boolean isRepeatingTask) {
return (isRepeatingTask ? LOG_AND_SUPPRESS_ERROR_HANDLER : LOG_AND_PROPAGATE_ERROR_HANDLER);
}
从这两个handler的名字,我们就能看出来它们是如何处理异常的。
LOG_AND_SUPPRESS_ERROR_HANDLER打印日志记录异常
LOG_AND_PROPAGATE_ERROR_HANDLER打印日志记录异常并且往上抛出异常
对于只执行一次的任务,使用LOG_AND_SUPPRESS_ERROR_HANDLER,反正它也不会再执行第二次。
而重复执行的任务,需要再次执行,不希望异常把它的调度中断掉,所以使用LOG_AND_PROPAGATE_ERROR_HANDLER。
总结
既然使用了spring,线程池也尽量托管在spring ioc容器中去。对于简单的定时任务,使用spring的ScheduledThreadPoolExecutor即可。对于一些需要自定义的定时任务触发逻辑,可以自己实现Trigger。
使用ScheduledThreadPoolExecutor,我们大多数时候还是使用@Scheduled注解,@Scheduled的原理是怎么样,请关注我博客。