

A ThreadPoolExecutor that can additionally schedule commands to 
run after a given delay, or to execute periodically. This class is 
preferable to Timer when multiple worker threads are needed, or when 
the additional flexibility or capabilities of ThreadPoolExecutor (which 
this class extends) are required.

Delayed tasks execute no sooner than they are enabled, but without 
any real-time guarantees about when, after they are enabled, they will 
commence. Tasks scheduled for exactly the same execution time are 
enabled in first-in-first-out (FIFO) order of submission.

When a submitted task is cancelled before it is run, execution is 
suppressed. By default, such a cancelled task is not automatically
 removed from the work queue until its delay elapses. While this 
enables further inspection and monitoring, it may also cause 
unbounded retention of cancelled tasks. To avoid this, set 
setRemoveOnCancelPolicy(boolean) to true, which causes tasks to 
be immediately removed from the work queue at time of cancellation.

Successive executions of a task scheduled via scheduleAtFixedRate 
or scheduleWithFixedDelay do not overlap. While different executions 
may be performed by different threads, the effects of prior executions 
happen-before those of subsequent ones.

While this class inherits from ThreadPoolExecutor, a few of the 
inherited tuning methods are not useful for it. In particular, because it 
acts as a fixed-sized pool using corePoolSize threads and an 
unbounded queue, adjustments to maximumPoolSize have no useful 
effect. Additionally, it is almost never a good idea to set corePoolSize 
to zero or use allowCoreThreadTimeOut because this may leave the 
pool without threads to handle tasks once they become eligible to run.

Extension notes: This class overrides the execute and submit 
methods to generate internal ScheduledFuture objects to control per-
task delays and scheduling. To preserve functionality, any further 
overrides of these methods in subclasses must invoke superclass 
versions, which effectively disables additional task customization. 
However, this class provides alternative protected extension method 
decorateTask (one version each for Runnable and Callable) that can 
be used to customize the concrete task types used to execute 
commands entered via execute, submit, schedule, 
scheduleAtFixedRate, and scheduleWithFixedDelay. By default, a 
ScheduledThreadPoolExecutor uses a task type extending 
FutureTask. However, this may be modified or replaced using 
subclasses of the form







 public class CustomScheduledExecutor extends ScheduledThreadPoolExecutor {

   static class CustomTask<V> implements RunnableScheduledFuture<V> { ... }

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Runnable r, RunnableScheduledFuture<V> task) {
       return new CustomTask<V>(r, task);

   protected <V> RunnableScheduledFuture<V> decorateTask(
                Callable<V> c, RunnableScheduledFuture<V> task) {
       return new CustomTask<V>(c, task);
   // ... add constructors, etc.
This class specializes ThreadPoolExecutor implementation by

1. Using a custom task type ScheduledFutureTask, even for tasks
   that don't require scheduling because they are submitted
   using ExecutorService rather than ScheduledExecutorService
   methods, which are treated as tasks with a delay of zero.

2. Using a custom queue (DelayedWorkQueue), a variant of
   unbounded DelayQueue. The lack of capacity constraint and
   the fact that corePoolSize and maximumPoolSize are
   effectively identical simplifies some execution mechanics
   (see delayedExecute) compared to ThreadPoolExecutor.

3. Supporting optional run-after-shutdown parameters, which
   leads to overrides of shutdown methods to remove and cancel
   tasks that should NOT be run after shutdown, as well as
   different recheck logic when task (re)submission overlaps
   with a shutdown.

4. Task decoration methods to allow interception and
   instrumentation, which are needed because subclasses cannot
   otherwise override submit methods to get this effect. These
   don't have any impact on pool control logic though.
  • 1.使用自定义任务类型ScheduledFutureTask,即使对于不需要调度的任务,因为它们是使用ExecutorService而不是ScheduledExecutorService方法提交的,这些方法被视为延迟为零的任务。

  • 2.使用自定义队列(DelayedWorkQueue),无界DelayQueue的变体。与ThreadPoolExecutor相比,缺少容量约束以及corePoolSize和maximumPoolSize实际上相同的事实简化了一些执行机制(请参阅delayedExecute)。

  • 3.支持可选的run-after-shutdown参数,这会导致关闭方法的覆盖,以删除和取消关闭后不应运行的任务,以及任务(重新)提交与关闭重叠时的不同重新检查逻辑。

  • 4.允许拦截和检测的任务装饰方法,因为子类不能覆盖submit方法。但这些对池控制逻辑没有任何影响。


    public ScheduledThreadPoolExecutor(int corePoolSize,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue(), threadFactory, handler);



3.1 schedule一次性任务

    public void execute(Runnable command) {
        schedule(command, 0, NANOSECONDS);
 public Future<?> submit(Runnable task) {
        return schedule(task, 0, NANOSECONDS);

    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(Executors.callable(task, result), 0, NANOSECONDS);

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, NANOSECONDS);


    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<V> t = decorateTask(callable,
            new ScheduledFutureTask<V>(callable,
                                       triggerTime(delay, unit)));
        return t;
     * Main execution method for delayed or periodic tasks.  If pool
     * is shut down, rejects the task. Otherwise adds task to queue
     * and starts a thread, if necessary, to run it.  (We cannot
     * prestart the thread to run the task because the task (probably)
     * shouldn't be run yet.)  If the pool is shut down while the task
     * is being added, cancel and remove it if required by state and
     * run-after-shutdown parameters.
     * @param task the task
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
        else {
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
  • step1.如果线程池已关闭,则拒绝任务
  • step2.将任务加入到队列中。不能预启动线程,因为任务可能还没有到达运行的时间。
  • step3.如果任务加入后,线程池已经关闭,且状态和run-after-shutdown参数不允许运行,则删除并取消任务


    void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);

3.2 scheduleAtFixedRate和scheduleWithFixedDelay重复执行的任务

scheduleAtFixedRate和scheduleWithFixedDelay任务调度的核心与 schedule一样,都是通过delayedExecute(t)进行调度。不同点在于ScheduledFutureTask构造的不同。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          triggerTime(initialDelay, unit),
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        return t;
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          triggerTime(initialDelay, unit),
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        return t;


     * Returns true if can run a task given current run state
     * and run-after-shutdown parameters.
     * @param periodic true if this task periodic, false if delayed
    boolean canRunInCurrentRunState(boolean periodic) {
        return isRunningOrShutdown(periodic ?
                                   continueExistingPeriodicTasksAfterShutdown :

     * False if should cancel/suppress periodic tasks on shutdown.
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

     * False if should cancel non-periodic tasks on shutdown.
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
        continueExistingPeriodicTasksAfterShutdown = value;
        if (!value && isShutdown())

    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
        executeExistingDelayedTasksAfterShutdown = value;
        if (!value && isShutdown())



5.1 域

        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;

        /** The time the task is enabled to execute in nanoTime units */
        private long time;

         * Period in nanoseconds for repeating tasks.  A positive
         * value indicates fixed-rate execution.  A negative value
         * indicates fixed-delay execution.  A value of 0 indicates a
         * non-repeating task.
        private final long period;

        /** The actual task to be re-enqueued by reExecutePeriodic */
        RunnableScheduledFuture<V> outerTask = this;

         * Index into delay queue, to support faster cancellation.
        int heapIndex;
  • sequenceNumber是用于打破平局的序列号。
  • time任务执行的时间
  • period重复执行任务的周期,正值表示以固定的速率执行,负值表示固定延迟执行,0表示非重复执行的任务。
  • outerTask表示在reExecutePeriodic中重新入队的实际任务。
  • heapIndex延迟队列的索引,支持更快的取消。

5.2 构造器


new ScheduledFutureTask<V>(callable, triggerTime(delay, unit))


         * Creates a one-shot action with given nanoTime-based trigger time.
        ScheduledFutureTask(Callable<V> callable, long ns) {
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
new ScheduledFutureTask<Void>(command,
                                          triggerTime(initialDelay, unit),

new ScheduledFutureTask<Void>(command,
                                          triggerTime(initialDelay, unit),


         * Creates a periodic action with given nano time and period.
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();

5.3 compareTo方法使用sequenceNumber打破time相同的平局

        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                    return 1;
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;


5.4 setNextRuntime


         * Sets the next time to run for a periodic task.
        private void setNextRunTime() {
            long p = period;
            if (p > 0)
                time += p;
                time = triggerTime(-p);

    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
  • p > 0 表示以固定速率运行任务,因此time设置为上次任务启动运行时间向后延迟p
  • p < 0 表示以固定的延迟运行任务,time设置为当前任务完成时间now()向后延迟delay

5.5 heapIndex会加速cancel

        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            if (cancelled && removeOnCancel && heapIndex >= 0)
            return cancelled;


    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); // In case SHUTDOWN and now empty
        return removed;


        public boolean remove(Object x) {
            final ReentrantLock lock = this.lock;
            try {
                int i = indexOf(x);
                if (i < 0)
                    return false;

                setIndex(queue[i], -1);
                int s = --size;
                RunnableScheduledFuture<?> replacement = queue[s];
                queue[s] = null;
                if (s != i) {
                    siftDown(i, replacement);
                    if (queue[i] == replacement)
                        siftUp(i, replacement);
                return true;
            } finally {
        private int indexOf(Object x) {
            if (x != null) {
                if (x instanceof ScheduledFutureTask) {
                    int i = ((ScheduledFutureTask) x).heapIndex;
                    // Sanity check; x could conceivably be a
                    // ScheduledFutureTask from some other pool.
                    if (i >= 0 && i < size && queue[i] == x)
                        return i;
                } else {
                    for (int i = 0; i < size; i++)
                        if (x.equals(queue[i]))
                            return i;
            return -1;

5.6 核心的run

         * Overrides FutureTask version so as to reset/requeue if periodic.
        public void run() {
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
            else if (!periodic)
            else if (ScheduledFutureTask.super.runAndReset()) {
    protected boolean runAndReset() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return false;
        boolean ran = false;
        int s = state;
        try {
            Callable<V> c = callable;
            if (c != null && s == NEW) {
                try {
                    c.call(); // don't set result
                    ran = true;
                } catch (Throwable ex) {
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            s = state;
            if (s >= INTERRUPTING)
        return ran && s == NEW;
     * Requeues a periodic task unless current run state precludes it.
     * Same idea as delayedExecute except drops task rather than rejecting.
     * @param task the task
    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(true)) {
            if (!canRunInCurrentRunState(true) && remove(task))
  • 对于一次性任务,调用FutureTask.run执行
  • 对于周期性任务



A DelayedWorkQueue is based on a heap-based data structure
like those in DelayQueue and PriorityQueue, except that
every ScheduledFutureTask also records its index into the
heap array. This eliminates the need to find a task upon
cancellation, greatly speeding up removal (down from O(n)
to O(log n)), and reducing garbage retention that would
otherwise occur by waiting for the element to rise to top
before clearing. But because the queue may also hold
RunnableScheduledFutures that are not ScheduledFutureTasks,
we are not guaranteed to have such indices available, in
which case we fall back to linear search. (We expect that
most tasks will not be decorated, and that the faster cases
will be much more common.)

All heap operations must record index changes -- mainly
within siftUp and siftDown. Upon removal, a task's
heapIndex is set to -1. Note that ScheduledFutureTasks can
appear at most once in the queue (this need not be true for
other kinds of tasks or work queues), so are uniquely
identified by heapIndex.


所有堆操作都必须记录索引更改 - 主要在siftUp和siftDown中。删除后,任务的heapIndex设置为-1。请注意,ScheduledFutureTasks最多可以出现在队列中一次(对于其他类型的任务或工作队列,不一定是这样),因此由heapIndex唯一标识。

6.1 入队和出队


        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
            final ReentrantLock lock = this.lock;
            try {
                int i = size;
                if (i >= queue.length)
                size = i + 1;
                if (i == 0) {
                    queue[0] = e;
                    setIndex(e, 0);
                } else {
                    siftUp(i, e);
                if (queue[0] == e) {
                    leader = null;
            } finally {
            return true;


    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {


            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
        public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            try {
                for (;;) {
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
            } finally {
                if (leader == null && queue[0] != null)

6.2 Leader-Follower模式

参考并发容器BlockingQueue - DelayQueue及Leader-Follower模式

  • 当一个线程称为leader时,其会定时等待下一个delay元素过期,但是其他线程会无限期等待。
  • 当从take/poll返回之前,leader线程必须signal其他等待线程,除非在此期间有线程称为了新的leader。
  • 每当队列头部元素被更早到期的元素替换时,leader被置为null,offer里面q.peek() == e时,会将leader=null,此时当然会signal,重新竞选leader。所以定时等待线程必须要处理失去leader时情况。


  • 首先就是线程池本身:执行任务execute及submit被覆盖了以实现周期任务,增加了run-after-shutdown参数来处理线程池关闭后怎么处理周期任务
  • 线程还是沿用Worker,本身实现了AQS,在执行任务加锁,屏蔽了中断
  • 阻塞队列使用的是定制的DelayedWorkQueue,优先队列,ScheduledFutureTask会记录其在堆数组中索引,这会消除在取消时查找任务的操作,大大加快了移除操作。但是在siftUp和siftDown中会增加维护索引的额外操作。
  • 任务是继承自FutureTask的ScheduledFutureTask,实现了compareTo(基于time和序列号),方便放入DelayedWorkQueue。通过period区分是一次性任务还是周期性任务。通过setNextRuntime区分是scheduleAtFixedRate还是scheduleWithFixedDelay。
