线程池(3)终止线程池原理

终止线程池

一、终止线程池方法

1、 shutdown() 安全的终止线程池

- checkShutdownAccess()  核查当前线程是否有权限intercept()
- advanceRunState(SHUTDOWN) 设置CAS方式设置线程池shutdown状态
- interruptIdleWorkers()中断空闲线程
- onShutdown()钩子函数
- tryTerminate()尝试中断线程池

2、 shutdownNow() 强制终止线程池

- checkShutdownAccess()  核查当前线程是否有权限intercept()
- advanceRunState(STOP) 设置CAS方式设置线程池Stop状态
- interruptWorkers() 中断所有线程(不会尝试获取锁w.tryLock,直接intercept)
- tasks = drainQueue(); //将workQueue中的元素放入一个List并返回
- tryTerminate();尝试中断线程池

3、 awaitTermination(timeout) 等待线程池终止

- termination.awaitNanos(nanos);
private final Condition termination = mainLock.newCondition();
- 这个方法最长会等待timeout时间,返回线程池是否已经终止,如果超过timeout时间会返回false

shutdown()和shutdownNow()区别

和shutdownNow()区别:

1、interruptIdleWorker() 会通过w.tryLock()来判断worker是否空闲。

2、interruptWorkers() 只判断worker的state>0,即线程是start状态,就直接停止,不管是否有task正在运行。

3、tasks = drainQueue(); //将workQueue中的元素放入一个List并返回

源码解析

一、shutdown()

(1)shutdown()

/**
 * Initiates an orderly shutdown in which previously submitted
 * tasks are executed, but no new tasks will be accepted.
 * Invocation has no additional effect if already shut down.
 * 开始一个有序的关闭,在关闭中,之前提交的任务会被执行(包含正在执行的,在阻塞队列中的),但新任务会被拒绝
 * 如果线程池已经shutdown,调用此方法不会有附加效应
 *
 * <p>This method does not wait for previously submitted tasks to
 * complete execution.  Use {@link #awaitTermination awaitTermination}
 * to do that.
 * 当前方法不会等待之前提交的任务执行结束,可以使用awaitTermination()
 *
 * @throws SecurityException {@inheritDoc}
 */
public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁
     
    try {
        //判断调用者是否有权限shutdown线程池
        checkShutdownAccess();
         
        //CAS+循环设置线程池状态为shutdown
        advanceRunState(SHUTDOWN);
         
        //中断所有空闲线程
        interruptIdleWorkers();
         
        onShutdown(); // hook for ScheduledThreadPoolExecutor
    } 
    finally {
        mainLock.unlock(); //解锁
    }
     
    //尝试终止线程池
    tryTerminate();
}

shutdown()执行流程:

1、上锁,mainLock是线程池的主锁,是可重入锁,当要操作workers set这个保持线程的HashSet时,需要先获取mainLock,还有当要处理largestPoolSize、completedTaskCount这类统计数据时需要先获取mainLock

2、判断调用者是否有权限shutdown线程池

3、使用CAS操作将线程池状态设置为shutdown,shutdown之后将不再接收新任务

4、中断所有空闲线程 interruptIdleWorkers()

5、onShutdown(),ScheduledThreadPoolExecutor中实现了这个方法,可以在shutdown()时做一些处理

6、解锁

7、尝试终止线程池 tryTerminate()

(2)interruptIdleWorkers()

那么interruptIdleWorkers()是怎么中断线程的呢?

/**
 * Interrupts threads that might be waiting for tasks (as
 * indicated by not being locked) so they can check for
 * termination or configuration changes. Ignores
 * SecurityExceptions (in which case some threads may remain
 * uninterrupted).
 * 中断在等待任务的线程(没有上锁的),中断唤醒后,可以判断线程池状态是否变化来决定是否继续
 *
 * @param onlyOne If true, interrupt at most one worker. This is
 * called only from tryTerminate when termination is otherwise
 * enabled but there are still other workers.  In this case, at
 * most one waiting worker is interrupted to propagate shutdown
 * signals in case(以免) all threads are currently waiting.
 * Interrupting any arbitrary thread ensures that newly arriving
 * workers since shutdown began will also eventually exit.
 * To guarantee eventual termination, it suffices to always
 * interrupt only one idle worker, but shutdown() interrupts all
 * idle workers so that redundant workers exit promptly, not
 * waiting for a straggler task to finish.
 * 
 * onlyOne如果为true,最多interrupt一个worker
 * 只有当终止流程已经开始,但线程池还有worker线程时,tryTerminate()方法会做调用onlyOne为true的调用
 * (终止流程已经开始指的是:shutdown状态 且 workQueue为空,或者 stop状态)
 * 在这种情况下,最多有一个worker被中断,为了传播shutdown信号,以免所有的线程都在等待
 * 为保证线程池最终能终止,这个操作总是中断一个空闲worker
 * 而shutdown()中断所有空闲worker,来保证空闲线程及时退出
 */
private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁
    try {
        for (Worker w : workers) { 
            Thread t = w.thread;
             //w.tryLock非常关键,还记得execute()分析时,我们在runWorker时执行task.run()之前所做的吗,是的w.lock(),
             //每一个任务执行前都会先加上AQS不可重入锁。
             //所以在这如果tryLock()获取锁成功,那么意味着线程中并没有task正在执行。也就是空闲线程了。
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock(); //解锁
    }
}

分析:

1、w.tryLock非常关键

  • runWorker(),在task执行前都会调用w.lock()先AQS不可重入锁锁住。
  • 它的目的就是通过判断是否能获取锁,来确定线程中是否有task正在运行。如果没有task运行,那么tryLock()就能获取锁也就意味着线程处于空闲状态。这点是利用了不可重入锁AQS的特性。

2、Worker 是怎么实现AQS,不可重入的呢?

  • Worker本身继承了AQS(AbstractQueuedSynchronizer)
  • 并实现了tryAcuire()方法,这个方法实际就是tryLock()的真实调用方法。也就是说tryLock()时会调用tryAcuire()方法。
  • tryAcuire(),利用CAS方式设置0->1 ,只有当前值为0时才能设置成功。正因为这个特性才做到的不可重入。
public boolean tryLock()  { return tryAcquire(1); }
protected boolean tryAcquire(int unused) {
    //CAS方式设置0->1 ,只有当前值为0时才能设置成功。正因为这个特性才做到的不可重入。
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

(3)tryTerminate()

/**
 * Transitions to TERMINATED state if either (SHUTDOWN and pool
 * and queue empty) or (STOP and pool empty).  If otherwise
 * eligible to terminate but workerCount is nonzero, interrupts an
 * idle worker to ensure that shutdown signals propagate. This
 * method must be called following any action that might make
 * termination possible -- reducing worker count or removing tasks
 * from the queue during shutdown. The method is non-private to
 * allow access from ScheduledThreadPoolExecutor.
 * 
 * 在以下情况将线程池变为TERMINATED终止状态
 * shutdown 且 正在运行的worker 和 workQueue队列 都empty
 * stop 且  没有正在运行的worker
 * 
 * 这个方法必须在任何可能导致线程池终止的情况下被调用,如:
 * 减少worker数量
 * shutdown时从queue中移除任务
 * 
 * 这个方法不是私有的,所以允许子类ScheduledThreadPoolExecutor调用
 */
final void tryTerminate() {
    //这个for循环主要是和进入关闭线程池操作的CAS判断结合使用的
    for (;;) {
        int c = ctl.get();
         
        /**
         * 线程池是否需要终止
         * 如果以下3中情况任一为true,return,不进行终止
         * 1、还在运行状态
         * 2、状态是TIDYING、或 TERMINATED,已经终止过了
         * 3、SHUTDOWN 且 workQueue不为空
         */
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
         
        /**
         * 只有shutdown状态 且 workQueue为空,或者 stop状态能执行到这一步
         * 如果此时线程池还有线程(正在运行任务,正在等待任务)
         * 中断唤醒一个正在等任务的空闲worker
         * 唤醒后再次判断线程池状态,会return null,进入processWorkerExit()流程
         */
        if (workerCountOf(c) != 0) { // Eligible to terminate 资格终止
            interruptIdleWorkers(ONLY_ONE); //中断workers集合中的空闲任务,参数为true,只中断一个
            return;
        }
 
        /**
         * 如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated
         */
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //CAS:将线程池的ctl变成TIDYING(所有的任务被终止,workCount为0,为此状态时将会调用terminated()方法),期间ctl有变化就会失败,会再次for循环
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated(); //需子类实现
                } 
                finally {
                    ctl.set(ctlOf(TERMINATED, 0)); //将线程池的ctl变成TERMINATED
                    termination.signalAll(); //唤醒调用了 等待线程池终止的线程 awaitTermination() 
                }
                return;
            }
        }
        finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
        // 如果上面的CAS判断false,再次循环
    }
}

tryTerminate() 执行流程:

1、判断线程池是否需要进入终止流程(只有当shutdown状态+workQueue.isEmpty 或 stop状态,才需要)

2、判断线程池中是否还有线程,有则 interruptIdleWorkers(ONLY_ONE) 尝试中断一个空闲线程(正是这个逻辑可以再次发出中断信号,中断阻塞在获取任务的线程)

3、如果状态是SHUTDOWN,workQueue也为空了,正在运行的worker也没有了,开始terminated

会先上锁,将线程池置为tidying状态,之后调用需子类实现的 terminated(),最后线程池置为terminated状态,并唤醒所有等待线程池终止这个Condition的线程

二、shutdownNow() -- 强制终止线程池

(1)shutdownNow()

/**
 * Attempts to stop all actively executing tasks, halts the
 * processing of waiting tasks, and returns a list of the tasks
 * that were awaiting execution. These tasks are drained (removed)
 * from the task queue upon return from this method.
 * 尝试停止所有活动的正在执行的任务,停止等待任务的处理,并返回正在等待被执行的任务列表
 * 这个任务列表是从任务队列中排出(删除)的
 *
 * <p>This method does not wait for actively executing tasks to
 * terminate.  Use {@link #awaitTermination awaitTermination} to
 * do that.
 * 这个方法不用等到正在执行的任务结束,要等待线程池终止可使用awaitTermination()
 *
 * <p>There are no guarantees beyond best-effort attempts to stop
 * processing actively executing tasks.  This implementation
 * cancels tasks via {@link Thread#interrupt}, so any task that
 * fails to respond to interrupts may never terminate.
 * 除了尽力尝试停止运行中的任务,没有任何保证
 * 取消任务是通过Thread.interrupt()实现的,所以任何响应中断失败的任务可能永远不会结束
 *
 * @throws SecurityException {@inheritDoc}
 */
public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock(); //上锁
     
    try {
        //判断调用者是否有权限shutdown线程池
        checkShutdownAccess();
         
        //CAS+循环设置线程池状态为stop
        advanceRunState(STOP);
         
        //中断所有线程,包括正在运行任务的
        interruptWorkers();
         
        tasks = drainQueue(); //将workQueue中的元素放入一个List并返回
    } 
    finally {
        mainLock.unlock(); //解锁
    }
     
    //尝试终止线程池
    tryTerminate();
     
    return tasks; //返回workQueue中未执行的任务
}

shutdownNow() 和 shutdown()的大体流程相似,差别是:

  • 1、将线程池更新为stop状态

  • 2、调用 interruptWorkers() 中断所有线程,包括正在运行的线程

  • 3、将workQueue中待处理的任务移到一个List中,并在方法最后返回,说明shutdownNow()后不会再处理workQueue中的任务

(2)interruptWorkers()

/**
 * Interrupts all threads, even if active. Ignores SecurityExceptions
 * (in which case some threads may remain uninterrupted).
 */
private void interruptWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)
            w.interruptIfStarted();
    } finally {
        mainLock.unlock();
    }
}
  • 1、和interruptIdleWorkers()不同,这里并不会w.tryLock()尝试获取锁。所以它是忽略lock锁来直接操作的。

  • 2、但是 interruptIfStarted(),其中会判断worker的AQS state是否大于0,即worker是否已经开始运作,再调用Thread.interrupt()。这里在worker分析时已经说过了。worker创建时state为-1,当在addWorker()中t.start()时,在run()方法中会重置state = 0。只有state >= 0的时候说明t.start()线程启动了。否则没必要去intercept()

  • 3、需要注意的是,对于运行中的线程调用Thread.interrupt()并不能保证线程被终止,task.run()内部可能捕获了InterruptException,没有上抛,导致线程一直无法结束

(3) tasks = drainQueue();

todo

三、awaitTermination() -- 等待线程池终止

private final Condition termination = mainLock.newCondition();

public boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException {
    long nanos = unit.toNanos(timeout);
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //开启自旋
        for (;;) {
            //判断线程是否在termination状态之后,如果是则说明线程池已经结束了。
            if (runStateAtLeast(ctl.get(), TERMINATED))
                return true;
            //超过等待时间就返回false,告知线程池还没停止。
            if (nanos <= 0)
                return false;
            //通过condition.awaitNanos()方法,睡眠等待直到超过nanos时间,
            //仍没有结果则返回0。即下次循环直接return false了。
            nanos = termination.awaitNanos(nanos);
        }
    } finally {
        mainLock.unlock();
    }
}

参数:

timeout:超时时间
unit:     timeout超时时间的单位

返回:

true:线程池终止
false:超过timeout指定时间

在发出一个shutdown请求后,在以下3种情况发生之前,awaitTermination()都会被阻塞

1、所有任务完成执行

2、到达超时时间

3、当前线程被中断

awaitTermination() 循环的判断线程池是否terminated终止 或 是否已经超过超时时间,然后通过termination这个Condition阻塞等待一段时间

termination.awaitNanos() 是通过 LockSupport.parkNanos(this, nanosTimeout)实现的阻塞等待

阻塞等待过程中发生以下具体情况会解除阻塞(对上面3种情况的解释):

1、如果发生了 termination.signalAll()(内部实现是 LockSupport.unpark())会唤醒阻塞等待,且由于ThreadPoolExecutor只有在 tryTerminated()尝试终止线程池成功,
将线程池更新为terminated状态后才会signalAll(),故awaitTermination()再次判断状态会return true退出

2、如果达到了超时时间 termination.awaitNanos() 也会返回,此时nano==0,再次循环判断return false,等待线程池终止失败

3、如果当前线程被 Thread.interrupt(),termination.awaitNanos()会上抛InterruptException,awaitTermination()继续上抛给调用线程,会以异常的形式解除阻塞

故终止线程池并需要知道其是否终止可以用如下方式:

executorService.shutdown();
try{
    while(!executorService.awaitTermination(500, TimeUnit.MILLISECONDS)) {
        LOGGER.debug("Waiting for terminate");
    }
} 
catch (InterruptedException e) {
    //中断处理
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352

推荐阅读更多精彩内容