java并发编程(3):ScheduledThreadPoolExecutor源码详解

1、基本概念

ScheduledThreadPoolExecutor继承于ThreadPoolExecutor,其功能主要和ThreadPoolExecutor类似,也是基于线程池的任务调度。但ScheduledThreadPoolExecutor的不同点在于其可以基于时延及周期时间进行任务调度,而其实现这些功能是基于一些特殊的机制的。

1.1、ScheduledExecutorService接口

ScheduledExecutorService接口定义了一些基于时延及时间间隔的任务提交接口,用户可以基于这些接口进行任务提交。

public interface ScheduledExecutorService extends ExecutorService {
    
    //在给定的时延后调度任务command
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);

    //在给定的时延后调度任务Callable
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    //在给定的时延initialDelay后调度任务command,并且每隔period调度一次
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    //在给定的时延initialDelay后调度任务command,并且每隔period调度一次
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit);

}

1.2、ScheduledFutureTask任务

ScheduledFutureTask为ScheduledExecutorService中的任务封装,其继承于FutureTask类,并实现了RunnableScheduledFuture接口。

其主要实现有对延时时间及周期间隔的保存;实现了compareTo接口,可以基于时间对任务进行排序;重新实现run方法,可以对延时或周期任务进行调度执行并进行状态恢复等。其是ScheduledExecutorService可以时间基于时间的任务调度的关键。

1.3、DelayedWorkQueue基于时间排序的阻塞队列

DelayedWorkQueue是ScheduledExecutorService中的阻塞队列,而且是唯一的阻塞队列实现。其中保存的任务是基于时间戳及序列号的排序任务。越早要执行的任务,在队列中排的越靠前,每次从队列取得的任务至少是达到其要执行时间的任务。DelayedWorkQueue是任务基于时间调度的保障。

2、ScheduledFutureTask源码详解

ScheduledFutureTask的继承结构如下:

ScheduledFutureTask的继承结构.png

ScheduledFutureTask源码如下:

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {
    //任务添加的序列号
    private final long sequenceNumber;

    //ns格式的时间戳,是此任务要时间的时间
    private long time;
    
    //周期任务的时间间隔;
    //大于0:表示固定周期执行的任务,即每隔固定时间执行一次;
    //等于0:表示非周期任务,只执行一次就完结任务;
    //小于0:表示固定时延的任务,即任务执行完成后在固定的时延后才再次执行;
    private final long period;

    RunnableScheduledFuture<V> outerTask = this;

    //此任务在阻塞队列中的排序位置;
    int heapIndex;

    //
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
        super(r, result);
        this.time = ns;
        this.period = period;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    ScheduledFutureTask(Callable<V> callable, long ns) {
        super(callable);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }

    //获取任务的时延,即任务距离下个执行时间的间隔
    public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
    }

    //任务比较,基于下个执行时间及序列号进行排序
    public int compareTo(Delayed other) {
        if (other == this) // compare zero if same object
            return 0;
        if (other instanceof ScheduledFutureTask) {
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            //基于时间进行排序
            long diff = time - x.time;
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            //时间相等时基于添加是的序列号排序    
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }
        //基于时延排序
        long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
        return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
    }

    //是否为周期任务,固定时间间隔的任务或固定时延的任务
    public boolean isPeriodic() {
        return period != 0;
    }
    
    //设置任务的下个执行时间点
    private void setNextRunTime() {
        long p = period;
        //基于时间间隔的任务
        if (p > 0)
            time += p;
        //基于固定时延的任务    
        else
            time = triggerTime(-p);
    }

    //取消任务
    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean cancelled = super.cancel(mayInterruptIfRunning);
        if (cancelled && removeOnCancel && heapIndex >= 0)
            remove(this);
        return cancelled;
    }
    
    //执行任务
    public void run() {
        boolean periodic = isPeriodic();
        //若为周期任务,在线程池状态为SHUTDOWN时,默认不会取消当前任务的执行;
        //若不为周期任务,在线程池状态为SHUTDOWN时,默认会取消当前任务的执行;
        if (!canRunInCurrentRunState(periodic))
            cancel(false);
        //非周期任务,则直接调度运行    
        else if (!periodic)
            ScheduledFutureTask.super.run();
        //周期任务,   任务执行完成后会对任务的执行时间进行处理,并将任务重新入队
        else if (ScheduledFutureTask.super.runAndReset()) {
            setNextRunTime();
            reExecutePeriodic(outerTask);
        }
    }
}

3、DelayedWorkQueue源码详解

DelayedWorkQueue类继承关系如下:

DelayedWorkQueue继承结构.png

3.1、主要属性

//初始队列容量
private static final int INITIAL_CAPACITY = 16;
//RunnableScheduledFuture的数组任务
private RunnableScheduledFuture<?>[] queue =
    new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
//锁
private final ReentrantLock lock = new ReentrantLock();
//任务的数量
private int size = 0;

//等待队首任务的线程,基于Leader-Follower模式,减少线程不必要的等待;
//当有任务入队后,只会唤醒leader线程,其他线程继续等待
private Thread leader = null;

 //信号量,用于线程任务调度的通知
private final Condition available = lock.newCondition();

3.2、主要方法解析

任务入队offer():

public boolean offer(Runnable x) {
    if (x == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        int i = size;
        //若队列空间不够,即数组queue的长度不够,则调用grow()对数组进行扩容
        if (i >= queue.length)
            grow();
        size = i + 1;
        //若队列为空,则直接入队
        if (i == 0) {
            queue[0] = e;
            setIndex(e, 0);
        
        //队列不为空,则调用siftUp将任务入队 ,并根据任务执行时间,按任务时间排序       
        } else {
            siftUp(i, e);
        }
        //若入队后队首即为当前任务,则唤醒等待队列中的线程获取任务执行
        if (queue[0] == e) {
            leader = null;
            available.signal();
        }
    } finally {
        lock.unlock();
    }
    return true;
}

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

private void siftDown(int k, RunnableScheduledFuture<?> key) {
    int half = size >>> 1;
    while (k < half) {
        int child = (k << 1) + 1;
        RunnableScheduledFuture<?> c = queue[child];
        int right = child + 1;
        if (right < size && c.compareTo(queue[right]) > 0)
            c = queue[child = right];
        if (key.compareTo(c) <= 0)
            break;
        queue[k] = c;
        setIndex(c, k);
        k = child;
    }
    queue[k] = key;
    setIndex(key, k);
}

任务出队take()及poll():

public RunnableScheduledFuture<?> take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            //若队首为空,则阻塞等待,直到有任务入队
            if (first == null)
                available.await();
            else {
                //队首任务距离当前时间的执行时间差,<=0表示要立即执行队首任务
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0)
                    //从队列中移除任务并返回
                    return finishPoll(first);
                first = null; // don't retain ref while waiting
                //若有leader线程在等待任务,则当前线程阻塞等待
                if (leader != null)
                    available.await();
                else {
                    //当前无leader线程,当前线程阻塞等待delay后再尝试获取任务
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        //若leader为空,但队列还有任务,则唤醒下个等待的线程获取任务执行
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            RunnableScheduledFuture<?> first = queue[0];
            if (first == null) {
                //队首为空,等待超时时间到达,则直接返回null;
                //否则继续等待nanos时间;
                if (nanos <= 0)
                    return null;
                else
                    nanos = available.awaitNanos(nanos);
            } else {
                long delay = first.getDelay(NANOSECONDS);
                //队首任务到达执行时间,则返回任务
                if (delay <= 0)
                    return finishPoll(first);
                //队首任务未到达执行时间,但超时等待时间到达,则返回null    
                if (nanos <= 0)
                    return null;
                first = null; // don't retain ref while waiting
                //未到达等待的超时时间,并且有leader线程,则继续阻塞等待
                if (nanos < delay || leader != null)
                    nanos = available.awaitNanos(nanos);
                else {
                    //无leader线程,但队首任务未达到执行时间,则等待delay时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        long timeLeft = available.awaitNanos(delay);
                        nanos -= delay - timeLeft;
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {
        if (leader == null && queue[0] != null)
            available.signal();
        lock.unlock();
    }
}

4、ScheduledThreadPoolExecutor源码解析

ScheduledThreadPoolExecutor类继承结构如下:

ScheduledThreadPoolExecutor类继承结构.png

4.1、任务提交

任务提交主要提供:schedule、 scheduleAtFixedRate、scheduleWithFixedDelay这三个方法,其他方法都是基于这三个方法的调用。

public ScheduledFuture<?> schedule(Runnable command,
                                   long delay,
                                   TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<?> t = decorateTask(command,
        new ScheduledFutureTask<Void>(command, null,
                                      triggerTime(delay, unit)));
    //将command包装成ScheduledFutureTask                            
    delayedExecute(t);
    return t;
}

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}


public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                              long initialDelay,
                                              long period,
                                              TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (period <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(period));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                 long initialDelay,
                                                 long delay,
                                                 TimeUnit unit) {
    if (command == null || unit == null)
        throw new NullPointerException();
    if (delay <= 0)
        throw new IllegalArgumentException();
    ScheduledFutureTask<Void> sft =
        new ScheduledFutureTask<Void>(command,
                                      null,
                                      triggerTime(initialDelay, unit),
                                      unit.toNanos(-delay));
    RunnableScheduledFuture<Void> t = decorateTask(command, sft);
    sft.outerTask = t;
    delayedExecute(t);
    return t;
}

4.2、任务重入队列

当周期任务执行完成后,重置任务的状态等数据,并将任务重新入队。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    //如果线程池已经关闭,则拒绝执行任务
    if (isShutdown())
        reject(task);
    else {
        //将任务入队
        super.getQueue().add(task);
        //若线程池已关闭,并且不允许线程在关闭状态执行,则移除任务并取消任务
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
        //确保有线程可以执行任务
            ensurePrestart();
    }
}

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    //当前线程池状态允许执行任务,则将任务添加到队列中
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        //重新检查线程池状态,若不允许允许任务,则移除任务并取消任务
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
        //确保有线程可以执行任务
            ensurePrestart();
    }
}

4.3、关闭线程池

ScheduledThreadPoolExecutor重写了线程池关闭的钩子函数onShutdown(),用于对队列数组中的任务进行取消并清除;

void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    //shutdown是否仍然执行延时任务。
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    // shutdown是否仍然执行周期任务。    
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    // 如果两者皆不可则对队列中所有RunnableScheduledFuture调用cancel取消并清空队列。
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                /*
                 * 不需要执行的任务删除并取消。
                 * 已经取消的任务也需要从队列中删除。
                 * 所以这里就判断下是否需要执行或者任务是否已经取消。
                 */    
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    // 因为任务被从队列中清理掉,所以这里需要调用tryTerminate尝试跃迁executor的状态
    tryTerminate();
}

4.4、关闭线程池状态下的任务执行控制

//true:线程池在关闭状态下可以继续执行周期任务;
//false:线程池在关闭状态下不能执行周期任务
//默认值为:false
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

//true:线程池在关闭状态下可以继续执行非周期任务(固定延迟任务);
//false:线程池在关闭状态下不可继续执行非周期任务(固定延迟任务);
//默认值:true
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

//true:ScheduledFutureTask.cancel需要将任务从队列中移除;否则不移除
//默认值:false
private volatile boolean removeOnCancel = false;
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351

推荐阅读更多精彩内容