ScheduledFutureTask
run
public void run() {
// 首先判断是否是周期性任务
boolean periodic = isPeriodic();
// 如果该任务不能再当前的线程池状态下运行,那么取消任务
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任务,那么该任务就是个普通的FutureTask,直接调用父类的run
else if (!periodic)
ScheduledFutureTask.super.run();
// 到这里,说明是周期性任务,那么执行且重置任务
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下个周期
setNextRunTime();
//
reExecutePeriodic(outerTask);
}
}
reExecutePeriodic
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
// 如果当前线程池状态允许执行周期性任务
if (canRunInCurrentRunState(true)) {
// 将该任务重新再假如到任务列表中
super.getQueue().add(task);
// 如果现在线程池状态又不允许了,那么从任务列表中移除该任务,且取消执行
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
// 否则新增工作线程,worker会自己去拿任务
ensurePrestart();
}
}
delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池已经SHUTDOWN,那么执行拒绝策略
if (isShutdown())
reject(task);
else {
// 将任务加到任务列表中
super.getQueue().add(task);
// 如果线程池是SHUTDOWN,且任务不能再SHUTDOWN之后继续执行
// 那么这个任务没有存在的意义,从队列中删除该任务,并尝试取消任务执行
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 添加工作线程,worker会自己去拿任务
ensurePrestart();
}
}
ensurePrestart
void ensurePrestart() {
// 拿到工作线程数
int wc = workerCountOf(ctl.get());
// 如果小于核心线程数,那么增加工作线程,worker会自己去拿任务
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
DelayedWorkQueue
offer
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// 如果超过queue的长度,那么扩容
if (i >= queue.length)
grow();
// 否则长度加一
size = i + 1;
// 如果当前队列为空,那么该任务作为首节点
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
// 否则做堆上浮的操作,将该任务放在二叉堆中合适的位置
} else {
siftUp(i, e);
}
// 经过上面的操作,如果当前队列的首节点是当前任务,那么唤醒一个等待线程开始处理
// 如果首节点,不是当前任务的话,那么这里只是插入,不做唤醒
if (queue[0] == e) {
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
take
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
// 先锁队列
lock.lockInterruptibly();
try {
for (;;) {
// 拿到首节点
RunnableScheduledFuture<?> first = queue[0];
// 如果首节点为空,那么等待,直到有任务为止
// Leader-Follower pattern
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
// 如果首节点已经超时,说明到了该任务的执行时间了,那么返回该任务
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// 如果leader不为空,说明有其他线程在执行任务,那么这里无限等待
if (leader != null)
available.await();
else {
// 否则,说明leader现在是空挡
Thread thisThread = Thread.currentThread();
// 将当前线程作为新的leader
leader = thisThread;
try {
// 等待任务
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
finishPoll
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
int s = --size;
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
if (s != 0)
// 将最后节点放到堆顶,做堆下沉,重新整理二叉堆
siftDown(0, x);
setIndex(f, -1);
return f;
}