ScheduledThreadPoolExecutor主要用来定期执行任务,或者是在给定的延迟之后运行任务。它的功能与Timer类似,但是比起Timer,ScheduledThreadPoolExecutor功能更强大,使用也更灵活。
ScheduledThreadPoolExecutor与Timer区别:
Timer对应单个后台线程,所有的任务都由同一个线程调度,因此所有的任务都是串行执行的,前一个任务的延迟或者异常都将会影响到之后的任务;
ScheduledThreadPoolExecutor对应多个后台线程,每一个调度的任务都将由线程池中的一个线程去执行,在同一时刻,任务并发执行,并且它们之间不会相互干扰。
ScheduledThreadPoolExecutor源码分析
在开始分析源码具体实现之前,先给一个简单的ScheduledThreadPoolExecutor使用案例:
ScheduledThreadPoolExecutor声明;
调用scheduleAtFixedRate方法和scheduleWithFixedDelay方法提交定时任务task。
类声明
在看构造方法之前先来看看ScheduledThreadPoolExecutor类声明:
从ScheduledThreadPoolExecutor类声明可以看出:
ScheduledThreadPoolExecutor是ThreadPoolExecutor的子类,并且实现了接口ScheduledExecutorService;
ScheduledThreadPoolExecutor是另外一种线程池,它同ThreadPoolExecutor拥有相同的特性,但是又略有不同,具体的不同之处会在后文做详细的介绍。
构造方法
ScheduledThreadPoolExecutor提供3个构造方法以供使用者使用:
从构造方法可以看出,ScheduledThreadPoolExecutor使用DelayQueue来作为线程池的工作队列,由于DelayQueue是无界队列,根据线程池的工作原理,核心参数maximumPoolSize在ScheduledThreadPoolExecutor中是没有什么意义的。
总的来说,ScheduledThreadPoolExecutor为了实现周期性执行任务,对ThreadPoolExecutor做了以下改动:
工作队列使用DelayQueue;
任务提交之后统统都进工作队列;
获取任务的方式改变,执行了任务之后,也增加了额外的处理,具体的改变后文会一一给出详细的分析。
任务提交与调度
ScheduledThreadPoolExecutor中最常用的任务提交的方法有两个:ScheduleAtFixedRate方法和ScheduleWithFixedDelay方法。
具体的执行流程如下:
参数校验,不合法参数抛出异常;
构造task;
调用delayedExecute方法进行后续相关处理。
接下来我们先来分析ScheduledThreadPoolExecutor的调度任务的最小单位ScheduledFutureTask相关实现。
ScheduledFutureTask
-
成员变量
ScheduledFutureTask包含3个成员变量:
成员变量sequenceNumber:任务被添加到ScheduledThreadPoolExecutor中的序号;
time:任务将要被执行的具体时间;
period:任务执行的间隔周期。
ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:
-
compareTo实现
compareTo实现首先按照time排序,time小的排在前面,time大的排在后面;
如果time相同,按照sequenceNumber排序,sequenceNumber小的排在前面,sequenceNumber大的排在后面,换句话说,如果两个task的执行时间相同,优先执行先提交的task。
-
任务调度之run方法实现
run方法是调度task的核心,task的执行实际上是run方法的执行。
run方法实现
具体执行流程如下:
步骤1:判断当前task是否可以执行,如果不能执行,调用cancel方法取消task执行,否则,跳转到步骤2;
步骤2:判断当前task是否到达执行时间点,如果到达,执行该task,否则跳转到步骤3;
步骤3:重置状态,计算任务下次执行时间,重新把任务添加到工作队列中,让该任务可重复执行。
看完task之后,我们接下看看看构造好task之后的delayedExecute方法相关实现。
** delayedExecute实现**
delayedExecute方法主要完成了这些操作:
将task添加到工作队列;
调用ensurePrestart()方法做预处理,该方法实现在线程池一文中做过详细分析在这里就不再做赘述了。
任务调度小结
到这里为止,ScheduledThreadPoolExecutor的task执行过程可以总结为下图:
- 步骤1:线程1从工作队列DelayQueue中获取已到期的task;
- 步骤2:线程1执行该task;
- 步骤3:线程1修改ScheduledFutureTask的time变量为下次被执行的时间;
- 步骤4:线程1将修改后的task重新放回DelayQueue中。
接下来我们来详细看看各个步骤DelayQueue具体相关实现。
任务获取 - take实现
具体执行步骤如下:
获取Lock;
从优先队列中获取任务:
步骤1:如果PriorityQueue为空,当前线程到Condition中等待,否则执行步骤2;
步骤2:如果PriorityQueue的第一个元素的时间比当前时间小,获取该任务,否则,线程到Condition中等待;
步骤3:获取任务成功后,如果PriorityQueue不为空,唤醒等待在Condition中的所有线程。
- 释放Lock。
ScheduledThreadPoolExecutor的getTask()方法会无限循环获取task,直到线程从PriorityQueue中获取到一个元素后,才会退出无限循环。
任务添加 - add实现
add方法的核心是offer方法调用,我们来看看offer方法的具体实现:
具体执行步骤如下:
获取Lock;
添加任务:
向PriorityQueue添加任务;
如果待添加的任务是PriorityQueue的第一个元素,唤醒在Condition中等待的所有线程。
- 释放Lock。