scheduledthreadpool是JDK自带的一个定时调度任务的实现,通过它可以实现定时的循环调度,最近在看线程池的源码,顺便也把它看了一下,发现里面的实现真的是很精彩,干货很多。
首先,scheduledthreadpool是继承自ThreadPoolExecutor,也就是说它具有线程池的一些特性,它也正是利用线程池实现了任务的调度。
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
可以发现ScheduledThreadPoolExecutor只是用了核心线程池,同时它的任务队列是采用了DelayedWorkQueue去实现。对于ThreadPoolExecutor不熟悉的,可以翻翻我之前的笔记,有对线程池很详细的解释。
接下来看任务提交的代码,以定时循环调度为例:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
command表示要提交的线程。
initialDelay表示初始化时的延迟。
period表示调度周期。
unit为时间单位。
首先将command等参数包装成ScheduledFutureTask类,这个类是任务调度的基本单位。
triggerTime方法主要是用来计算下次被调度的时间:
/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
我们一会儿在回头看ScheduledFutureTask,先看任务的提交delayedExecute(t):
/**
* Main execution method for delayed or periodic tasks. If pool
* is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet,) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
这里主要做了一个任务入队的操作, super.getQueue().add(task);这个Queue就是我们刚才看到的DelayedWorkQueue,接下来ensurePrestart()是判断当前核心线程池的大小,如果过少,那么增加核心线程。保证任务及时运行。
程序很短,但是我们并没有看到任务是怎么跑起来,怎么被调度的。那这里其实有个前提的知识就是,当任务添加到Queue之中后,那么线程池中的线程会不断的去Queue中获取任务,那么我们看一下Queue的offer方法和take方法,是怎么放入、怎么取出的:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();//扩容
size = i + 1;
if (i == 0) {//如果是第一个元素
queue[0] = e;
setIndex(e, 0);
} else {//如果不是,那么进行上滤操作,保证堆有序
siftUp(i, e);
}
//如果当前新增加的元素在堆顶,那么可能是最新要执行的
if (queue[0] == e) {
leader = null;
available.signal();//发一个信号,通知take的时候的线程,赶紧检测新加入的task
}
} finally {
lock.unlock();
}
return true;
}
这是DelayedWorkQueue的offer方法,DelayedWorkQueue里面是使用数组去维护任务队列的,那么数组是怎么保证任务有序呢?
其实仔细看代码,我们能发现,这里的实现是用一个二叉堆去对数组元素进行排序。确切的说是小顶堆。
首先判断容量,如果容量不够就扩容,接着判断是不是第一个元素,如果是,那么直接放在index为0的位置,不是的话进行上滤操作。接下来判断添加的元素是不是在堆顶,如果是那么需要进行优先调度,那么进行signal。
其实这里又引申出一个问题,那么就是是靠什么排序的?这个时候我们看一下任务的实体ScheduledFutureTask,它复写了compareTo方法:
//比较方法
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//根据time去比较,time是在创建任务的时候计算出来的,指下一次运行的时间
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
里面用time去判断大小,time便是下一次调度的时间点,那么显然越小的离现在越近,越要放在前面。
看了offer方法我们再看看take方法,这个方法是用来获取任务的:
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞
//队列的存储采用数组,优先级排序采用二叉堆实现。
try {
for (;;) {
//最大的一个是最先应该被执行的
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
//获取第一个任务,是距离当前最近的任务,可能会有一点延迟
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//要立即执行
return finishPoll(first);
else if (leader != null)
available.await();//拿不到leader的线程全部await
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);//如果此时线程唤醒了,那么其他线程将不能进入同步块
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();//唤醒所有的await线程
lock.unlock();
}
}
毫无疑问,take中直接获取queue[0],它是距离目前最近的要被执行的任务,先检测一下还有多长时间,任务会被执行,如果小于0,那么立刻弹出,并且做一个下滤操作,重新找出堆顶元素。如果不小于0,那么证明时间还没到,那么available.awaitNanos(delay);等到delay时间后自动唤醒,或者因为添加了一个更加紧急的任务即offer中的signal被调用了,那么唤醒,重新循环获取最优先执行的任务,如果delay小于0,那么直接弹出任务。
至此任务调度的逻辑分析完了,但是还有周期执行是怎么实现的呢?其实是在ScheduledFutureTask的run中实现的:
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();//判断是不是定时周期调度的
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//设置当前状态是可重复执行的
setNextRunTime();//计算下一次执行时期
reExecutePeriodic(outerTask);//加入队列
}
}
判断是不是周期调度的任务,如果是等待执行完毕之后,重新设置下一次执行时间,并且将此任务重新offer到queue中,这样就实现了周期调度。
问题:
其实这里是有一个问题的,就是如果当核心线程池比较少,但是执行的任务又有很多阻塞性的任务,那么就会导致在任务到期改执行的时候,而没有线程去执行任务。这样就会导致任务调度时间不准,同时后面的任务也可能被影响,所以在设置的时候可以将自己的核心线程池调大一点,避免这种问题。
整体的流程可以参考下面这个图,描述的十分清楚: