1、基本概念
ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,其功能主要和ThreadPoolExecutor类似,也是基于线程池的任务调度。但ScheduledThreadPoolExecutor的不同点在于其可以基于时延及周期时间进行任务调度,而其实现这些功能是基于一些特殊的机制的。
1.1、ScheduledExecutorService接口
ScheduledExecutorService接口定义了一些基于时延及时间间隔的任务提交接口,用户可以基于这些接口进行任务提交。
public interface ScheduledExecutorService extends ExecutorService {
//在给定的时延后调度任务command
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
//在给定的时延后调度任务Callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
//在给定的时延initialDelay后调度任务command,并且每隔period调度一次
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
//在给定的时延initialDelay后调度任务command,并且每隔period调度一次
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
1.2、ScheduledFutureTask任务
ScheduledFutureTask为ScheduledExecutorService中的任务封装,其继承于FutureTask类,并实现了RunnableScheduledFuture接口。
其主要实现有对延时时间及周期间隔的保存;实现了compareTo接口,可以基于时间对任务进行排序;重新实现run方法,可以对延时或周期任务进行调度执行并进行状态恢复等。其是ScheduledExecutorService可以时间基于时间的任务调度的关键。
1.3、DelayedWorkQueue基于时间排序的阻塞队列
DelayedWorkQueue是ScheduledExecutorService中的阻塞队列,而且是唯一的阻塞队列实现。其中保存的任务是基于时间戳及序列号的排序任务。越早要执行的任务,在队列中排的越靠前,每次从队列取得的任务至少是达到其要执行时间的任务。DelayedWorkQueue是任务基于时间调度的保障。
2、ScheduledFutureTask源码详解
ScheduledFutureTask的继承结构如下:
ScheduledFutureTask源码如下:
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//任务添加的序列号
private final long sequenceNumber;
//ns格式的时间戳,是此任务要时间的时间
private long time;
//周期任务的时间间隔;
//大于0:表示固定周期执行的任务,即每隔固定时间执行一次;
//等于0:表示非周期任务,只执行一次就完结任务;
//小于0:表示固定时延的任务,即任务执行完成后在固定的时延后才再次执行;
private final long period;
RunnableScheduledFuture<V> outerTask = this;
//此任务在阻塞队列中的排序位置;
int heapIndex;
//
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
//获取任务的时延,即任务距离下个执行时间的间隔
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
//任务比较,基于下个执行时间及序列号进行排序
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//基于时间进行排序
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
//时间相等时基于添加是的序列号排序
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
//基于时延排序
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//是否为周期任务,固定时间间隔的任务或固定时延的任务
public boolean isPeriodic() {
return period != 0;
}
//设置任务的下个执行时间点
private void setNextRunTime() {
long p = period;
//基于时间间隔的任务
if (p > 0)
time += p;
//基于固定时延的任务
else
time = triggerTime(-p);
}
//取消任务
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
//执行任务
public void run() {
boolean periodic = isPeriodic();
//若为周期任务,在线程池状态为SHUTDOWN时,默认不会取消当前任务的执行;
//若不为周期任务,在线程池状态为SHUTDOWN时,默认会取消当前任务的执行;
if (!canRunInCurrentRunState(periodic))
cancel(false);
//非周期任务,则直接调度运行
else if (!periodic)
ScheduledFutureTask.super.run();
//周期任务, 任务执行完成后会对任务的执行时间进行处理,并将任务重新入队
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
3、DelayedWorkQueue源码详解
DelayedWorkQueue类继承关系如下:
3.1、主要属性
//初始队列容量
private static final int INITIAL_CAPACITY = 16;
//RunnableScheduledFuture的数组任务
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//锁
private final ReentrantLock lock = new ReentrantLock();
//任务的数量
private int size = 0;
//等待队首任务的线程,基于Leader-Follower模式,减少线程不必要的等待;
//当有任务入队后,只会唤醒leader线程,其他线程继续等待
private Thread leader = null;
//信号量,用于线程任务调度的通知
private final Condition available = lock.newCondition();
3.2、主要方法解析
任务入队offer():
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
//若队列空间不够,即数组queue的长度不够,则调用grow()对数组进行扩容
if (i >= queue.length)
grow();
size = i + 1;
//若队列为空,则直接入队
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
//队列不为空,则调用siftUp将任务入队 ,并根据任务执行时间,按任务时间排序
} else {
siftUp(i, e);
}
//若入队后队首即为当前任务,则唤醒等待队列中的线程获取任务执行
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
private void siftDown(int k, RunnableScheduledFuture<?> key) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
int right = child + 1;
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
if (key.compareTo(c) <= 0)
break;
queue[k] = c;
setIndex(c, k);
k = child;
}
queue[k] = key;
setIndex(key, k);
}
任务出队take()及poll():
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
//若队首为空,则阻塞等待,直到有任务入队
if (first == null)
available.await();
else {
//队首任务距离当前时间的执行时间差,<=0表示要立即执行队首任务
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
//从队列中移除任务并返回
return finishPoll(first);
first = null; // don't retain ref while waiting
//若有leader线程在等待任务,则当前线程阻塞等待
if (leader != null)
available.await();
else {
//当前无leader线程,当前线程阻塞等待delay后再尝试获取任务
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
//若leader为空,但队列还有任务,则唤醒下个等待的线程获取任务执行
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
//队首为空,等待超时时间到达,则直接返回null;
//否则继续等待nanos时间;
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
//队首任务到达执行时间,则返回任务
if (delay <= 0)
return finishPoll(first);
//队首任务未到达执行时间,但超时等待时间到达,则返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
//未到达等待的超时时间,并且有leader线程,则继续阻塞等待
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
//无leader线程,但队首任务未达到执行时间,则等待delay时间
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
4、ScheduledThreadPoolExecutor源码解析
ScheduledThreadPoolExecutor类继承结构如下:
4.1、任务提交
任务提交主要提供:schedule、 scheduleAtFixedRate、scheduleWithFixedDelay这三个方法,其他方法都是基于这三个方法的调用。
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//将command包装成ScheduledFutureTask
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
4.2、任务重入队列
当周期任务执行完成后,重置任务的状态等数据,并将任务重新入队。
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果线程池已经关闭,则拒绝执行任务
if (isShutdown())
reject(task);
else {
//将任务入队
super.getQueue().add(task);
//若线程池已关闭,并且不允许线程在关闭状态执行,则移除任务并取消任务
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//确保有线程可以执行任务
ensurePrestart();
}
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
//当前线程池状态允许执行任务,则将任务添加到队列中
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
//重新检查线程池状态,若不允许允许任务,则移除任务并取消任务
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
//确保有线程可以执行任务
ensurePrestart();
}
}
4.3、关闭线程池
ScheduledThreadPoolExecutor重写了线程池关闭的钩子函数onShutdown(),用于对队列数组中的任务进行取消并清除;
void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
//shutdown是否仍然执行延时任务。
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
// shutdown是否仍然执行周期任务。
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果两者皆不可则对队列中所有RunnableScheduledFuture调用cancel取消并清空队列。
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
/*
* 不需要执行的任务删除并取消。
* 已经取消的任务也需要从队列中删除。
* 所以这里就判断下是否需要执行或者任务是否已经取消。
*/
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
// 因为任务被从队列中清理掉,所以这里需要调用tryTerminate尝试跃迁executor的状态
tryTerminate();
}
4.4、关闭线程池状态下的任务执行控制
//true:线程池在关闭状态下可以继续执行周期任务;
//false:线程池在关闭状态下不能执行周期任务
//默认值为:false
private volatile boolean continueExistingPeriodicTasksAfterShutdown;
//true:线程池在关闭状态下可以继续执行非周期任务(固定延迟任务);
//false:线程池在关闭状态下不可继续执行非周期任务(固定延迟任务);
//默认值:true
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;
//true:ScheduledFutureTask.cancel需要将任务从队列中移除;否则不移除
//默认值:false
private volatile boolean removeOnCancel = false;