并发编程之ScheduledThreadPoolExecutor(四)

一、ScheduledThreadPoolExecutor的爷爷类AbstractExecutorService

本篇把上一篇漏掉的一些如submit等方法的解析补回来,尽量给大家构建一个完整的体系。想要了解submit首先得了解FutureTask!

1、FutureTask类
1.1、FutureTask实现的接口,顶层是Runnable和Future
public class FutureTask<V> implements RunnableFuture<V> {···}
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}
1.2、FutureTask重要的成员变量
    private volatile int state;  //任务的状态
    /*可能的任务状态切换
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private static final int NEW          = 0;  //新建状态
    private static final int COMPLETING   = 1;  //正在执行状态
    private static final int NORMAL       = 2;  //正常可取结果状态
    private static final int EXCEPTIONAL  = 3;  //例外状态
    private static final int CANCELLED    = 4;  //取消状态
    private static final int INTERRUPTING = 5;  //正在中断
    private static final int INTERRUPTED  = 6;  //中断完成
    /** 内部组合了可返回结果的状态类 */
    private Callable<V> callable;
    /** 任务执行结果 */
    private Object outcome; 
    /** 执行callable的线程*/
    private volatile Thread runner;
    /**等待任务执行完成的线程队列*/
    private volatile WaitNode waiters;
1.3、FutureTask重要的成员方法
//用于获取任务结果,多线程竞争无所谓,只是读取
private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
/**取消任务方法,改变任务状态,中断执行任务的线程,唤醒等待队列中的线程并把等待队列线程节点设置为空*/
public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
/** 获取结果,如果当前任务还没完成,调用awaitDone等待*/
public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
/**定时等待任务完成,内部排队挂起*/
private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            //如果任务完成则把线程从阻塞队列中移除
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) 
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
/**任务执行入口*/
public void run() {
        //这里的cas操作设置了runner线程为当前线程
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))。
            return;
        try {
            //构造方法里设置的Runnable或者Callable
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //真正执行任务的call方法,返回执行结果,阻塞调用
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

在认识了FutureTask本质是Runnable和Callable任务之后感觉FutureTask真是没啥东西,所谓阻塞获取无非就是死循环取Callable.call()方法的结果,等待一定时间获取也是调用了LockSupport挂起线程一定时间而已。


2、最重要的方法submit:
public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task, result);
        execute(ftask);
        return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
}
2.1、submit方法解析:

submit前两种都调用了newTaskFor方法,该方法返回一个FutureTask<T>,其内部实现实际上调用了Executors的callable方法,Executor给Runnable和Callable提供了一个适配器,让Runnable有结果可以返回,这里暂且知道FutureTask内部是由Callable任务类型实现的就可以了。而真正执行任务的方法还是execute(ftask),该方法的调用链是这样的

execute(ftask)-->ThreadPoolExecutor.execute(ftask)-->ThreadPoolExecutor.addWorker()
-->t.start()-->(FutureTask)ftask.run()-->Callable.call()-->Executors.RunnableAdapter.call()-->(Runnable)task.run

这个调用链很长,最终的最终还是调用了task的run方法,中间过程为了保存线程执行结果使用了FutureTask的一些方法。
submit的第三种方法我们是很常用的,直接提交一个Callable任务给线程池执行,并返回一个可以获取线程执行结果的FutureTask。
以上就是对上一篇的补充,接着分析定时线程池


3、ScheduledThreadPoolExecutor类:

3.1、该类能够执行定时任务主要依赖于一个内部类:ScheduledFutureTask
//该类继承了FutureTask并实现了RunnableScheduledFuture(这个接口内部又继承了Delayed接口),接口不了解不要紧,只要知道必须实现两个接口方法isPeriodic和getDelayed即可
 private class ScheduledFutureTask<V>
            extends FutureTask<V> implements RunnableScheduledFuture<V> {
        //任务编号
        private final long sequenceNumber;
        //任务可以开始执行的时间
        private long time;
        //任务重复执行的时间间隔
        private final long period;
        /** reExecutePeriodic方法调用后重新入队的任务 */
        RunnableScheduledFuture<V> outerTask = this;
        // 支持快速取消,Delayed队列的索引值 
        int heapIndex;
        //这个比较方法是任务入队的规则,即队列按照这个比较方法对任务排序,实际上就是按照任务设定的开始时间创建的任务队列
        public int compareTo(Delayed other) {
            if (other == this) // compare zero if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
            return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
        }
        //任务的run方法,
        public void run() {
            //是否可重复执行
            boolean periodic = isPeriodic();
            //如果任务当前状态不可执行,那么取消该任务
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            //如果任务可重复执行并且状态也是可执行的,那么直接执行该任务并设置下一次要执行的时间
            else if (ScheduledFutureTask.super.runAndReset()) {
                setNextRunTime();
                //重新周期执行该任务,内部把任务放到待执行的队列中(如果可执行)或者取消任务
                reExecutePeriodic(outerTask);
            }
        }
        //取消任务方法,调用了ThreadPoolExecutor的取消方法
        public boolean cancel(boolean mayInterruptIfRunning) {
            boolean cancelled = super.cancel(mayInterruptIfRunning);
            //如果任务取消成功那么从任务队列中移除任务
            if (cancelled && removeOnCancel && heapIndex >= 0)
                remove(this);
            return cancelled;
        }

3.1小结:

定时线程池能够定时执行任务全靠内部类ScheduledFutureTask,该类维护了一个DelayedWorkQueue(按照自己指定顺序加入元素的队列),任务在队列中的顺序实际上是任务开始执行的时间的顺序。当任务执行的时候会调用ThreadPoolExecutor的执行方法runAndReset(线程执行完交出执行权),执行完会设置下一次将要执行的时间并把任务重新放到任务队列等待其他线程执行。判断当前任务是否可执行要看当前线程池是否处于SHUTDOWN并有残留任务或者处于RUNNING状态,如果线程池处于其他状态,那么当前任务是要取消的。

3.2、定时线程池最重要的方法schedule
//该方法对普通任务进行了装饰,即装饰城ScheduledFutureTask,真正执行任务的是delayedExecute
public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}
//本方法真正执行定时任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果线程池关闭,拒绝接受任务
        if (isShutdown())
            reject(task);
        else {
            //任务入队
            super.getQueue().add(task);
            //如果线程池关闭或者不能继续执行任务,从队列里移除任务并取消任务
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            //如果可以执行任务那么确保线程池各方面状态正常
            else
                ensurePrestart();
        }
}
//该方法在线程池中启动了一定数量的工作线程,addWorker方法我们前面讲过,该方法会自己取任务执行
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

3.2小结:

分析至此我们可以明白定时任务的执行原理了,
首先:任务创建成功之后根据任务开始执行的时间先后顺序把任务加到任务队列里(比如队列里原来有个任务10点执行,你这个任务11点执行,那你就在它后面,如果再来一个任务10点半执行,那么10点半的任务在你俩中间)
然后:启动线程池,线程池的工人们来这个时序队列里取任务执行,如果在线程池关闭的时候恰好来了一个任务,那么丢弃该任务。线程池的线程执行完任务后会交出任务的使用权(runner=null),并把任务下一次执行时间设置成你创建任务时设置的时间间隔+原来的执行时间,并再一次把任务加到任务队列(如果线程池没关闭的话)。

总结:从AQS框架到ScheduledThreadPoolExecutor一层层的类和接口构成了层次清晰的知识树,解析到这里不得不佩服作者!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容

  • 译序 本指南根据 Jakob Jenkov 最新博客翻译,请随时关注博客更新:http://tutorials.j...
    高广超阅读 5,097评论 1 68
  • 一.线程安全性 线程安全是建立在对于对象状态访问操作进行管理,特别是对共享的与可变的状态的访问 解释下上面的话: ...
    黄大大吃不胖阅读 836评论 0 3
  • 早上和妈妈一起回忆了小时候,才发现原来父母为我们真的做了很多很多。因为二姐的出生,我们一家的命运都有所改变,至少妈...
    刘芷宁running阅读 216评论 0 0
  • 猴年说猴 开场:猴年到,猴年到,吉星福星高高照!小妹小妹开心笑,来到舞台乐淘淘。宋小妹给大家拜年了! 祝大家猴年行...
    土家霜妹阅读 401评论 0 0
  • 中医学认为脾胃是人体的“后天之本”、“气血生化之源”,是消化、吸收、转化人体所需气血精微的重要脏腑。 脾胃...
    海真阅读 2,506评论 0 1