深入理解scheduledthreadpoolexecutor

scheduledthreadpool是JDK自带的一个定时调度任务的实现,通过它可以实现定时的循环调度,最近在看线程池的源码,顺便也把它看了一下,发现里面的实现真的是很精彩,干货很多。

首先,scheduledthreadpool是继承自ThreadPoolExecutor,也就是说它具有线程池的一些特性,它也正是利用线程池实现了任务的调度。

    /**
     * Creates a new {@code ScheduledThreadPoolExecutor} with the
     * given core pool size.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @throws IllegalArgumentException if {@code corePoolSize < 0}
     */
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

可以发现ScheduledThreadPoolExecutor只是用了核心线程池,同时它的任务队列是采用了DelayedWorkQueue去实现。对于ThreadPoolExecutor不熟悉的,可以翻翻我之前的笔记,有对线程池很详细的解释。
接下来看任务提交的代码,以定时循环调度为例:

    /**
     * @throws RejectedExecutionException {@inheritDoc}
     * @throws NullPointerException       {@inheritDoc}
     * @throws IllegalArgumentException   {@inheritDoc}
     */
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        delayedExecute(t);
        return t;
    }

command表示要提交的线程。
initialDelay表示初始化时的延迟。
period表示调度周期。
unit为时间单位。
首先将command等参数包装成ScheduledFutureTask类,这个类是任务调度的基本单位。
triggerTime方法主要是用来计算下次被调度的时间:

    /**
     * Returns the trigger time of a delayed action.
     */
    private long triggerTime(long delay, TimeUnit unit) {
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    }
    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) {
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    }

我们一会儿在回头看ScheduledFutureTask,先看任务的提交delayedExecute(t):

    /**
     * Main execution method for delayed or periodic tasks.  If pool
     * is shut down, rejects the task. Otherwise adds task to queue
     * and starts a thread, if necessary, to run it.  (We cannot
     * prestart the thread to run the task because the task (probably)
     * shouldn't be run yet,) If the pool is shut down while the task
     * is being added, cancel and remove it if required by state and
     * run-after-shutdown parameters.
     *
     * @param task the task
     */
    private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            super.getQueue().add(task);
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
    }

这里主要做了一个任务入队的操作, super.getQueue().add(task);这个Queue就是我们刚才看到的DelayedWorkQueue,接下来ensurePrestart()是判断当前核心线程池的大小,如果过少,那么增加核心线程。保证任务及时运行。
程序很短,但是我们并没有看到任务是怎么跑起来,怎么被调度的。那这里其实有个前提的知识就是,当任务添加到Queue之中后,那么线程池中的线程会不断的去Queue中获取任务,那么我们看一下Queue的offer方法和take方法,是怎么放入、怎么取出的:

        public boolean offer(Runnable x) {
            if (x == null)
                throw new NullPointerException();
            RunnableScheduledFuture e = (RunnableScheduledFuture)x;
            final ReentrantLock lock = this.lock;
            lock.lock();
            try {
                int i = size;
                if (i >= queue.length)
                    grow();//扩容
                size = i + 1;
                if (i == 0) {//如果是第一个元素
                    queue[0] = e;
                    setIndex(e, 0);
                } else {//如果不是,那么进行上滤操作,保证堆有序
                    siftUp(i, e);
                }
                //如果当前新增加的元素在堆顶,那么可能是最新要执行的
                if (queue[0] == e) {
                    leader = null;
                    available.signal();//发一个信号,通知take的时候的线程,赶紧检测新加入的task
                }
            } finally {
                lock.unlock();
            }
            return true;
        }

这是DelayedWorkQueue的offer方法,DelayedWorkQueue里面是使用数组去维护任务队列的,那么数组是怎么保证任务有序呢?
其实仔细看代码,我们能发现,这里的实现是用一个二叉堆去对数组元素进行排序。确切的说是小顶堆。
首先判断容量,如果容量不够就扩容,接着判断是不是第一个元素,如果是,那么直接放在index为0的位置,不是的话进行上滤操作。接下来判断添加的元素是不是在堆顶,如果是那么需要进行优先调度,那么进行signal。
其实这里又引申出一个问题,那么就是是靠什么排序的?这个时候我们看一下任务的实体ScheduledFutureTask,它复写了compareTo方法:

        //比较方法
        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                //根据time去比较,time是在创建任务的时候计算出来的,指下一次运行的时间
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
        }

里面用time去判断大小,time便是下一次调度的时间点,那么显然越小的离现在越近,越要放在前面。
看了offer方法我们再看看take方法,这个方法是用来获取任务的:

        public RunnableScheduledFuture take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();//阻塞
            //队列的存储采用数组,优先级排序采用二叉堆实现。
            try {
                for (;;) {
                    //最大的一个是最先应该被执行的
                    RunnableScheduledFuture first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        //获取第一个任务,是距离当前最近的任务,可能会有一点延迟
                        long delay = first.getDelay(TimeUnit.NANOSECONDS);
                        if (delay <= 0)//要立即执行
                            return finishPoll(first);
                        else if (leader != null)
                            available.await();//拿不到leader的线程全部await
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);//如果此时线程唤醒了,那么其他线程将不能进入同步块
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();//唤醒所有的await线程
                lock.unlock();
            }
        }

毫无疑问,take中直接获取queue[0],它是距离目前最近的要被执行的任务,先检测一下还有多长时间,任务会被执行,如果小于0,那么立刻弹出,并且做一个下滤操作,重新找出堆顶元素。如果不小于0,那么证明时间还没到,那么available.awaitNanos(delay);等到delay时间后自动唤醒,或者因为添加了一个更加紧急的任务即offer中的signal被调用了,那么唤醒,重新循环获取最优先执行的任务,如果delay小于0,那么直接弹出任务。
至此任务调度的逻辑分析完了,但是还有周期执行是怎么实现的呢?其实是在ScheduledFutureTask的run中实现的:

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            boolean periodic = isPeriodic();//判断是不是定时周期调度的
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {//设置当前状态是可重复执行的
                setNextRunTime();//计算下一次执行时期
                reExecutePeriodic(outerTask);//加入队列
            }
        }

判断是不是周期调度的任务,如果是等待执行完毕之后,重新设置下一次执行时间,并且将此任务重新offer到queue中,这样就实现了周期调度。

问题:
其实这里是有一个问题的,就是如果当核心线程池比较少,但是执行的任务又有很多阻塞性的任务,那么就会导致在任务到期改执行的时候,而没有线程去执行任务。这样就会导致任务调度时间不准,同时后面的任务也可能被影响,所以在设置的时候可以将自己的核心线程池调大一点,避免这种问题。

整体的流程可以参考下面这个图,描述的十分清楚:

Paste_Image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容

  • 深入理解Java线程池 线程池初探 所谓线程池,就是将多个线程放在一个池子里面(所谓池化技术),然后需要线程的时候...
    程序员七哥阅读 1,302评论 0 16
  • 从哪说起呢? 单纯讲多线程编程真的不知道从哪下嘴。。 不如我直接引用一个最简单的问题,以这个作为切入点好了 在ma...
    Mr_Baymax阅读 2,740评论 1 17
  • NSThread 第一种:通过NSThread的对象方法 NSThread *thread = [[NSThrea...
    攻城狮GG阅读 795评论 0 3
  • 这里陆陆续续添加一些自己总结或借鉴的erlang代码规约 [强制] [推荐] [建议] =============...
    kamfon阅读 1,838评论 0 0
  • 没有星星,没有月亮,天黑得仿佛不会再天亮了…… 日子一天天飞过,平静得似乎被定格,伴着欢笑、伴着打闹,小奇和伙伴们...
    Victorfy阅读 416评论 4 4