开篇
ScheduledThreadPoolExecutor的文章主要需要讲清楚一个核心概念(划重点),如何通过数据结构实现定时调度功能,顺带了解下如何实现循环调度功能。
ScheduledThreadPoolExecutor的周期性在于执行完任务后重新计算下一次周期并提交到DelayedWorkQueue当中,判断是否到执行时间通过DelayedWorkQueue(堆排序)实现。
类图
ScheduledThreadPoolExecutor的类依赖图当中,我们可以得出以下几个结论:
- 继承自ThreadPoolExecutor,所以具备ThreadPoolExecutor的特性。
- 实现ScheduledExecutoService,提供一系列Schedule特性相关的接口。
类源码
- ScheduledThreadPoolExecutor的构造函数内部通过ThreadPoolExecutor的构造函数去实现核心结构的构造的。
- ScheduledThreadPoolExecutor通过DelayedWorkQueue去保存任务,这样保证了按照时间的倒序访问,直接可以通过判断第一个节点的时间来判定是否需要执行调度任务。
- ScheduledThreadPoolExecutor的keepAliveTime设置为0,那么如果有空闲线程就立即回收。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
任务提交
- ScheduledThreadPoolExecutor的任务提交方式有两种:submit和shedule。
- 内部创建ScheduledFutureTask对象对运行的task进行简单封装。
- 内部执行delayedExecute()方法提交任务。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
public Future<?> submit(Runnable task) {
return schedule(task, 0, NANOSECONDS);
}
public <T> Future<T> submit(Runnable task, T result) {
return schedule(Executors.callable(task, result), 0, NANOSECONDS);
}
public <T> Future<T> submit(Callable<T> task) {
return schedule(task, 0, NANOSECONDS);
}
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
}
- delayedExecute()方法内部判断线程池是否已经关闭,关闭则直接拒绝任务。
- delayedExecute()方法直接通过DelayedWorkQueue的add方法添加任务。
- DelayedWorkQueue其实是类似DelayedQueue的基于最小堆的数据结构。
- DelayedWorkQueue按照提交任务的过期时间进行排序,最快过期数据在堆顶。
- delayedExecute()方法内部会判断当前核心线程池的大小,如果过少就增加核心线程。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 启动工作线程来执行定时调度任务
ensurePrestart();
}
}
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
// 启动核心工作线程
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
任务单元
ScheduledFutureTask的核心变量主要包括:
- time:延迟周期也就是执行任务时间点
- period:周期执行的时间配置
- outerTask:执行的任务
ScheduledFutureTask的执行逻辑run()方法负责执行调度任务:
- 如果是非周期性任务,那么就执行一次。
- 如果是周期性任务,那么先执行任务后重新计算下一次周期重新投递到队列中。
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
private final long sequenceNumber;
// 延迟周期也就是执行任务时间点
private long time;
// 周期执行的时间配置
private final long period;
// 执行任务
RunnableScheduledFuture<V> outerTask = this;
int heapIndex;
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 计算是否到期,用下一次执行时间减去当前时间来判断
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
public boolean isPeriodic() {
return period != 0;
}
// 设置下一个运行时间,如果周期性的就在当前时间+周期性时间
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任务,那么就执行一次
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
// 如果是周期性任务,那么就重新计算下一次任务周期并重新添加任务队列当中
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
}
// 重新提交任务到DelayedWorkQueue
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
计算下一次执行任务的时间,triggerTime()就是完成这个任务的。
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
private long overflowFree(long delay) {
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
long headDelay = head.getDelay(NANOSECONDS);
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
任务调度
任务调度通过ThreadPoolExecutor的worker的run()方法进入while()循环调用实现任务的消费。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
参考文章
java源码-ThreadPoolExecutor(1)
java源码-ThreadPoolExecutor(2)
java源码-ThreadPoolExecutor(3)
java源码-DelayQueue
深入理解scheduledthreadpoolexecutor
彩蛋
倚楼听风雨,淡看江湖路;莫问前尘有悔,唯愿今朝无憾。