03_线程池_3_核心流程

一:execute相关逻辑

public class ThreadPoolExecutor extends AbstractExecutorService {
    
    /**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     * 在将来的某个时间执行给定的任务。该任务可能执行在一个新的线程里或者一个已经缓存的线程里。
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     * 如果任务不能提交以供执行,要么因为此执行器已关闭,要么因为容量已满,
     * 该任务将被当前的RejectedExecutionHandler执行。
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         * 分3步进行:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         * 如果当前运行中的线程数小于corePoolSize,尝试使用给定的指令作为其第一个任务来启动一个新的线程,
         * 对addWorker的调用以原子方式检查runState和workerCount,
         * 因此,通过返回false来警告,可以防止在不应该添加线程时错误添加。
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         * 如果一个任务可以被成功的排队,然后,我们仍然需要再次检查是否应该添加一个线程
         * (因为自上次检查以来,已有线程已死亡),或者自进入此方法以来,池是否已关闭。
         * 因此,我们重新检查状态,如有必要,在停止排队时回滚排队,或者在一个线程都没有时启动一个新线程。
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         * 如果我们不能将任务排队,将尝试添加一个新的线程。
         * 如果添加失败,我们知道我们已经被关闭或饱和,因此拒绝这个任务。
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //第一步:当前线程数量小于corePoolSize,则调用addWorker()直接创建新的线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        
        //第二步:到这里就说明线程数量大于等于corePoolSize了
        if (isRunning(c) && workQueue.offer(command)) {
            //当前状态是运行中,并且成功将任务加入workQueue队列。
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //再次检测状态,如果不在运行中了,则移除刚添加的任务,并执行拒绝处理
                //如果说offer进去了,然后shutdown或shutdownNow了,就要将这个任务移除。
                reject(command);
            else if (workerCountOf(recheck) == 0)
                /**
                 * 情况1: 是RUNNING状态,但是worker数量为0。
                 * 第一步判断当前线程数大于corePoolSize,但是执行到这里的时候核心核心线程全部超时终止了,
                 * 这时候如果不创建一个新的worker,那这个任务将一直停留在queue中,直到下次有提交任务触发创建worker。
                 * 
                 * 情况2:SHUTDOWN状态,但是remove(command)失败。
                 * 这个command还存在于任务队列中,SHUTDOWN是允许任务队列中已存在的任务被执行的,但是没有worker线程了,
                 * 就需要添加一个新的worker来将这个任务队列里的任务执行掉。
                 * 下面的addWorker逻辑其实也有这种状态的处理逻辑。
                 * 
                 * 注意,这里addWorker初始任务参数为null,因为该任务已经加到队列中了,
                 * 如果将command作为新的线程初始任务,加上队列里的任务,就会重复执行两次了。
                 * */
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            //第三步:直接创建新的线程,如果失败,执行拒绝处理
            reject(command);
    }

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     * 根据当前池状态和给定边界(核心或最大值)检查是否可以添加新的worker。
     * 如果可以,则相应的调整worker计数,并且如果可能的话,一个新的worker将创建并启动,
     * 将firstTask作为其第一个任务来运行。如果pool已关闭或者有资格关闭则该方法返回false。
     * 当线程工厂在请求创建一个线程失败时也将返回false。如果线程创建失败,
     * 或者是由于线程工厂返回null,或者是由于异常(通常是Thread.start()的OutOfMemoryError),
     * 我们将完全回滚。
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     * 新线程应该首先运行的任务(如果没有则为null)。
     * 当线程少于corePoolSize时(在这种情况下,我们总是启动一个线程),
     * 或者当队列已满时(在这种情况下,我们必须绕过队列),
     * 将使用初始的第一个任务(在方法execute()中)创建工作线程以绕过队列。
     * 最初空闲线程通常是通过prestartCoreThread创建的,或用于替换其他正在死亡的workers线程
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * 如果为true,则使用corePoolSize作为绑定,否则使用maximumPoolSize。
     * (这里使用boolean标志而不是值来确保在检查其他池状态后读取新值)
     *
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        //这里使用了标签逻辑,类似于goto
        retry:
        
        //这个for循环是用来检查状态和更新计数器的
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);//获取当前状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                    ! (rs == SHUTDOWN &&
                            firstTask == null &&
                            ! workQueue.isEmpty()))
                /**
                 * 这里和execute方法第二大步里的一种情况是对应的。
                 * 就是说当前状态是SHUTDOWN,但是任务队列不为空,可是没有可用的worker线程了,
                 * firstTask也为空,表示本次创建worker不是为了执行新提交的任务,
                 * 而是要将将任务队列里遗留的任务给处理掉,这时候是不能return掉的。
                 * */
                
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                        wc >= (core ? corePoolSize : maximumPoolSize))
                    //如果当前线程数大于等于限制数量,则不创建
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    //通过CAS操作来增加当前线程数,如果成功则跳出这两个for循环,继续往后执行了。
                    break retry;
                c = ctl.get();  // Re-read ctl 重新获取ctl
                if (runStateOf(c) != rs)
                    //新获取的状态和前面获取的rs不一致了,再次从头循环判断
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
                //compareAndIncrementWorkerCount 的CAS失败因为worker数量改变了,则再次执行内部循环
            }
        }

        //开始创建worker
        boolean workerStarted = false;//worker是否启动成功
        boolean workerAdded = false;//worker是否添加成功
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock. 在保持锁的同时重新检查
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    //在ThreadFactory出现故障或在获取锁之前关闭时退出。
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                            (rs == SHUTDOWN && firstTask == null)) {
                        //如果是RUNNING状态 或者 SHUTDOWN状态但是firstTask为null(要关闭,但是没有worker,但是任务队列不为空)
                        if (t.isAlive()) // precheck that t is startable
                            //如果新创建的已经被启动了,则抛出异常
                            throw new IllegalThreadStateException();
                        workers.add(w);//将新创建的worker加到workers集合中
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;//当前个数大于largestPoolSize,则更新之
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//启动线程
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                //如果线程启动失败,则之后失败处理逻辑
                addWorkerFailed(w);
        }
        return workerStarted;
    }
    
    /**
     * Removes this task from the executor's internal queue if it is
     * present, thus causing it not to be run if it has not already
     * started.
     * 如果存在此任务,则将其从执行器的内部队列中移除,从而导致尚未启动的任务无法运行。
     *
     * <p>This method may be useful as one part of a cancellation
     * scheme.  It may fail to remove tasks that have been converted
     * into other forms before being placed on the internal queue. For
     * example, a task entered using {@code submit} might be
     * converted into a form that maintains {@code Future} status.
     * However, in such cases, method {@link #purge} may be used to
     * remove those Futures that have been cancelled.
     * 此方法可作为取消方案的一部分使用。它可能无法删除在放入内部队列之前已转换为其他形式的任务。
     * 例如,使用submit输入的任务可能会转换为保持Future状态的形式。
     * 然而,在这种情况下,可以使用方法purge删除已取消的Futures。
     *
     * @param task the task to remove
     * @return {@code true} if the task was removed
     */
    public boolean remove(Runnable task) {
        boolean removed = workQueue.remove(task);
        tryTerminate(); //尝试终止,终止的主要逻辑,后面再细看。
        return removed;
    }
    
    /**
     * Invokes the rejected execution handler for the given command.
     * Package-protected for use by ScheduledThreadPoolExecutor.
     * 为给定命令调用被拒绝的执行处理程序。
     */
    final void reject(Runnable command) {
        handler.rejectedExecution(command, this);
    }
}

总结:

  1. 总体步骤代码注释已经很清晰了。
  2. 需要格外注意的是shutdown状态,但是任务队列不为空,
    且没有worker线程了,这种情况下是允许创建新的worker的。

二: runWorker 任务、线程执行逻辑

execute创建的worker是一个Runnable,其run方法被委托给了runWorker方法。

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     * worker运行循环。重复从队列中获取任务并执行它们,同时处理许多问题:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     * 我们可以从一个初始任务开始,在这种情况下,我们不需要得到第一个任务。
     * 否则,只要池在运行,我们就可以从getTask获取任务。
     * 如果返回null,则worker将由于池状态或配置参数的更改而退出。
     * 其他退出源于外部代码中的异常抛出,在这种情况下completedAbruptly保持不变,
     * 这通常会导致processWorkerExit替换此线程。
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     * 在运行任何任务之前,都会获取锁以防止任务执行时发生其他池中断,
     * 然后我们会确保除非池停止,否则该线程不会设置其中断。
     * 
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     * 每个任务运行之前都会调用beforeExecute,这可能会引发异常,在这种情况下,
     * 会导致线程在不处理该任务的情况下死亡(使用CompletedThroughtly true中断循环)。
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     * 假设beforeExecute正常完成,我们运行任务,收集其抛出的任何异常以发送给afterExecute。
     * 我们分别处理RuntimeException、Error(这两个规范都保证我们可以捕获)和任意的Throwable。
     * 因为我们无法在Runnable.run中重新抛出Throwable,所以在退出时将它们包装在Error中(到线程的UncaughtExceptionHandler)。
     * 任何抛出的异常都会保守地导致线程死亡。
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     * task.run 完成后,我们调用afterExecute,其也会抛出一个异常,其也会导致线程死亡。
     * 根据JLS第14.20节,即使是task.run抛出的异常,此异常也将生效。
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     * 异常机制的最终效果是afterExecute和线程的UncaughtExceptionHandler具有尽可能准确的信息,
     * 我们可以提供有关用户代码遇到的任何问题的信息。
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        
        //state初始值为-1, unlock之后,state就从-1变成0了,就可以中断了
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;//用户任务是否抛出异常
        try {
            while (task != null || (task = getTask()) != null) {
                //执行任务的第一步,先获取锁,这个很重要。锁定状态表示该worker正在执行任务。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 如果pool正在停止,确保线程已经被中断;否则,确保线程没有被中断。
                // 这需要在第二种情况下进行重新检查,以在清除中断的同时处理与shutdownNow的竞争
                // 啥意思呢? 就是说可能执行到这里的时候发现被shutdownNow,状态已经为STOP了,要重新中断线程。
                // 那为什么shutdownNow了,还能走到这里呢?总的来说是因为shutdownNow和runWorker在两个线程里几乎同时执行。
                // 原因1: shutdownNow只是中断已启动的线程,如果新的worker还未第一次unlock,state还是-1,就不会被中断。
                // 原因2: 在shutdownNow的drainQueue之前,getTask已经拿到了任务。
                if ((runStateAtLeast(ctl.get(), STOP) ||
                        (Thread.interrupted() && //该方法是会clear掉中断状态的
                                runStateAtLeast(ctl.get(), STOP))) &&
                        !wt.isInterrupted())
                    //重新中断线程
                    wt.interrupt();
                try {
                    //先执行钩子方法beforeExecute
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行任务,并捕获各种异常
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //用户任务执行完后,再次执行钩子方法afterExecute
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;//顺利执行到这里代表没有抛出异常
        } finally {
            //如果该worker没有再获取到task,则执行退出逻辑。
            processWorkerExit(w, completedAbruptly);
        }
    }


    /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 根据当前的配置设置,执行任务的阻塞或定时等待,如果由于以下任何原因必须退出此工作进程,则返回null:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     *    线程数超过了maximumPoolSize。
     *
     * 2. The pool is stopped.
     *    pool已停止。
     *
     * 3. The pool is shutdown and the queue is empty.
     *    pool被关闭并且queue是空的。
     *
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait, and if the queue is
     *    non-empty, this worker is not the last thread in the pool.
     *    此worker在等待任务时超时,超时的worker在超时等待前后都会终止,
     *    如果队列非空,则此工作线程不是池中的最后一个线程。
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     *         如果worker必须退出则返回null,这种情况下workerCount将递减
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out? 上次执行poll是否超时

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                //rs>=SHUTDOWN && workQueue为空,执行了shutdown,并且队列已经处理完,则返回null,让线程(worker)消亡。
                //rs=STOP 表示执行了shutdownNow,返回null,让线程(worker)消亡 
                decrementWorkerCount();//减少当前worker数量并返回null
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling? worker是否会被淘汰
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                    && (wc > 1 || workQueue.isEmpty())) {
                //(当前线程数>最大线程数 || (允许淘汰线程 && 已超时)) && (当前线程数>1 || 队列为空) 
                if (compareAndDecrementWorkerCount(c)) //减少当前worker数量并返回null
                    return null;
                continue;
            }

            try {
                //如果没有任务,线程在此处,阻塞到workQueue该阻塞队列上。
                // 如果该线程可以被淘汰,则使用带超时时间的poll方法,否则使用take一直阻塞到有任务添加到queue中。
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                //捕获中断异常,shutdown时会中断这些阻塞的线程。
                //被中断了,说明不是超时
                timedOut = false;
            }
        }
    }

    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     * 为即将死亡的worker进行清理和簿记。仅从worker线程调用。
     * 除非设置了completedAbruptly,否则假定workerCount已被调整以考虑退出。
     * 此方法从工作线程集中删除线程,如果由于用户任务异常而退出工作线程,
     * 或者如果正在运行的工作线程少于corePoolSize,或者队列为非空但没有工作线程,
     * 则可能终止池或替换工作线程。
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     *                           worker是否由于用户任务异常导致死亡
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();//尝试终止线程池,后面终止的逻辑再细看,知道这时候会调用该方法即可。

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            //状态小于STOP,代表是RUNNING或者SHUTDOWN
            if (!completedAbruptly) {
                //如果不是用户任务报错而执行worker的退出逻辑
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    //如果min==0,并且workerQueue不为空,则最小线程数为1,因为要有线程去执行queue中的任务呀
                    min = 1;
                if (workerCountOf(c) >= min)
                    //如果当前线程数大于等于min,则无须处理。
                    return; // replacement not needed
            }
            
            //走到这里就说名上面的return没有执行,要添加一个worker了,然后也不给初始任务。
           //情况1:用户任务报错执行了销毁,则创建一个新的worker来代替
           //情况2:RUNNING状态,核心线程允许超时销毁,并且也销毁完了,当前线程数是0,但是任务队列非空,要创建worker来执行。
           //情况3:shutdown状态,当前线程数是0,但是任务队列非空,要创建worker来执行。
           //后两种逻辑其实和execute方法中的第二大部的处理一个意思。
            addWorker(null, false);
        }
    }
}

总结:
runWorker执行的步骤大体如下几步:

  1. worker.unlock(),将worker的状态从-1变为0(从未启动变为已启动未锁定)。
  2. while(getTask()) 循环获取任务
    • 2.1:worker.lock() 先锁定,表示正在执行任务,其它业务不用中断我。
    • 2.2:beforeExecute() 执行钩子方法,如果报错,则不会继续执行,线程将跳出循环执行终止逻辑。
    • 2.3:task.run() 执行用户任务,捕获异常,并将异常传递给afterExecute方法。
    • 2.4:afterExecute(task, throw)执行钩子方法,如果报错,则不会继续执行,线程将跳出循环执行终止逻辑。
    • 2.5:worker.unlock() 释放锁,表示目前没有再执行业务。
  3. processWorkerExit 跳出了while循环,没有获取到任务(超时时间已过,或者已关闭),线程执行终止逻辑。

特别注意:worker.lock之后对于STOP状态的判断逻辑,注释写的很清楚喽。

getTask获取任务:

  1. 如果是SHUTDOWN状态 并且 workerQueue为空,说明执行了shutdown,并且任务队列也已经处理完了,
    则返回null,让线程消亡。
  2. 如果是STOP状态,表示执行了shutdownNow,返回null,让线程消亡。
  3. 根据该线程是否允许超时,决定使用pool(keepAliveTime)还是take方法。

processWorkerExit任务退出逻辑:

  1. 要尝试调用tryTerminate() 终止线程池。
  2. 要判断下是否需要再新建一个不带初始任务的Worker。

三:关闭逻辑shutdown

public class ThreadPoolExecutor extends AbstractExecutorService {

    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     * 启动有序的关闭,执行以前已提交的任务,但不接受新任务。
     * 如果调用时已经关闭,没有其它影响。
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     * 此方法不会等待以前提交的任务完成执行。
     * 使用awaitTermination来完成此操作。
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查是否有权限关闭线程
            checkShutdownAccess();
            //状态改为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //中断空闲的worker(注意,正在执行任务的worker是不会被中断的)
            interruptIdleWorkers();
            // 该类中onShutdown为空,啥都不执行
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();//尝试终止
    }

    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }

    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     * 中断可能正在等待任务的线程(表示未被锁定--如果在执行任务,worker首先会将自己进行锁定,runWorker中的逻辑),
     * 以便它们可以检查终止或配置更改。
     * 忽略SecurityExceptions(在这种情况下,某些线程可能会保持不中断)。
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     * 如果是true,最多中断一个worker。
     * 只有在以其他方式启用终止但仍有其他worker时,才从tryTerminate调用此函数。
     * 在这种情况下,如果所有线程当前都在等待,
     * 则最多会中断一个等待的工作线程来传播关闭信号(这个传播关闭信号的很关键,后面在tryTerminate中就会调用体现)。
     * 中断任意线程可以确保自关闭开始以来新到达的worker最终也会退出。
     * 为了保证最终的终止,总是只中断一个空闲的工作进程就足够了,
     * 但是shutdown()会中断所有空闲的工作进程,
     * 这样多余的工作进程就会迅速退出,而不是等待一个散乱的任务完成。
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                if (!t.isInterrupted() && w.tryLock()) {
                    //未被中断,并且tryLock成功表示未在执行任务(runWorker中执行任务的时候会先lock的)
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    //如果是就循环处理一次,break出去不再继续循环,但是可不一定就能成功中断一个线程,比如说所有的worker都在执行任务,tryLock都失败。
                    //这时候其实也就没有传播了。
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     * 如果(状态为SHUTDOWN且池和队列为空)或(状态为STOP且池为空),则转换为TERMINATED状态。
     * 如果有资格终止,但workerCount不为零,则中断一个空闲worker,以确保关闭信号传播。
     * 必须在可能导致终止的任何操作之后调用此方法 -- 减少worker数量或者关闭期间从queue中移除任务。
     * 该方法是非私有的,允许从ScheduledThreadPoolExecutor进行访问。
     * 
     * 该方法是最终终止线程池的方法,
     * 状态变化从SHUTDOWN->TIDYING->TERMINATED 或者 STOP->TIDYING->TERMINATED
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            if (isRunning(c) ||
                    runStateAtLeast(c, TIDYING) ||
                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                //这里的三个if条件表明不能终止或不用继续执行:
                //1. isRunning(c) 如果是RUNNING状态,则返回退出,不终止
                //2. runStateAtLeast(c,TIDYING) 如果状态至少是TIDYING,就是说状态为TIDYING或者TERMINATED,则返回退出。
                // 如果为TIDYING表示已经有一个线程调用该方法走到后面的逻辑了,该线程就退出不用处理了;如果为TERMINATED表示已经终止了,直接退出。
                //3. runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty() SHUTDOWN状态是允许将队列中的任务执行完的,所以不能终止,则返回退出。
                return;
            
            //过了上面的if走到这里,表明可以终止了
            //原因1:调用shutdown,目前状态为SHUTDOWN,并且queue为空了。
            //原因2:调用shutdownNow,目前状态是STOP(STOP时queue肯定空了,看shutdownNow方法)。
            
            
            if (workerCountOf(c) != 0) { // Eligible to terminate
                /**
                 * 当前worker数量不为0,哪种场景呢?
                 * shutdown时,调用了interruptIdleWorkers(中断并关闭了空闲的worker),
                 * 那不空闲(正在执行任务,已经被lock)的worker还会继续存在,这里就可能不为0了。
                 * 
                 * 传播终止信号(propagate shutdown signals)的逻辑。
                 * 这里的ONLY_ONE是true, interruptIdleWorkers的注释描述ONLY_ONE时说,用于传播终止的信号。
                 * 该方法tryTerminate是唯一一个执行终止的方法,但是这里就要return出去不继续执行了,
                 * 那么就需要传播给其它方法来再次调用此方法来进行终止,这就是传播的概念。
                 * 有那些方法会调用该方法呢?注释写的很清楚:可能导致终止的任何操作之后调用此方法 -- 减少worker数量或者关闭期间从queue中移除任务。
                 * 有一下几个方法:
                 * 1. shutdown() --关闭
                 * 2. shutdownNow() --立即关闭
                 * 3. addWorkerFailed(Worker) --添加worker失败
                 * 4. processWorkerExit(Worker, boolean) --worker执行完退出
                 * 5. remove(Runnable) --移除任务
                 * 6. purge() --移除任务
                 * 
                 * 1. 剩余的worker都已经空闲,目前阻塞在queue.take(),则interruptIdleWorkers会中断其中一个,
                 *    然后再调用到这里,然后再去中断其中一个worker。直到所有都中断死亡,再跳过该if往后执行。
                 * 2. 剩余的worker都还在处理任务,interruptIdleWorkers一个都中断不成功,这里也就返回了,
                 *    shutdown方法也就执行完了,当前状态就处在SHUTDOWN。那何时才能真正终止完成呢?
                 *    runWorker()中,当worker执行完当前任务,再次getTask()获取任务时,当前状态会导致getTask()返回null,
                 *    会继续执行到processWorkerExit中,再被执行到本方法。直到所有的worker都按这套路走一遭,最后一个就跨过这个if往后执行了。
                 * 3. 剩余的worker有的已经空闲,有的还在处理,那就看interruptIdleWorkers循环的第一个worker是什么状态了。
                 * 
                 * */
                
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //状态改为TIDYING
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();//钩子方法,该方法本类中为空方法,子类可以自己实现
                    } finally {
                        //状态改为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //给所有等待在该终止Condition上的线程发信号。
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }


    //这个方法就是其它线程可以来等待线程池终止,然后await在Condition上,
    //等着终止完被唤醒。
    public boolean awaitTermination(long timeout, TimeUnit unit)
            throws InterruptedException {
        long nanos = unit.toNanos(timeout);
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (;;) {
                if (runStateAtLeast(ctl.get(), TERMINATED))
                    return true;
                if (nanos <= 0)
                    return false;
                nanos = termination.awaitNanos(nanos);
            }
        } finally {
            mainLock.unlock();
        }
    }
}

总结:
shutdown分为一下几步:

  1. mainLock.lock() 上锁
  2. 校验是否有权限关闭线程。
  3. 将运行状态改为SHUTDOWN。
  4. 中断空闲的worker,非空闲的(被lock的)不会被中断。
  5. 执行onShutdown()钩子方法。
  6. mainLock.unlock() 解锁
  7. 执行tryTerminate()终止方法,该方法返回后可不一定线程池就终止了。

tryTerminate方法还是比较重要的,东西不多,直接看代码里的注释吧。

awaitTermination允许其它线程等待线程池终止时被通知。

四:关闭逻辑shutdownNow

public class ThreadPoolExecutor extends AbstractExecutorService {
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     * 尝试停止所有正在执行的任务,停止正在等待的任务的处理,
     * 并返回正在等待执行的任务的列表。从该方法返回时,
     * 这些任务将从任务队列中排出(删除)。
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     * 此方法不会等待主动执行的任务终止。使用waittermination完成此操作。
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     * 除了尽最大努力尝试停止处理积极执行的任务之外,没有其他保证。
     * 此实现通过{@link-Thread#interrupt}取消任务,
     * 因此任何未能响应中断的任务可能永远不会终止。
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查权限,和shutdown一样
            checkShutdownAccess();
            //状态改为STOP
            advanceRunState(STOP);
            interruptWorkers();
            //排空queue,并返回出去,所以一旦shutdownNow了,状态改为STOP了,queue就空了。
            //execute的时候也就不会再往里offer了,即使临界状态offer进去了,也会再次判断给移除掉。
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //尝试终止,和shutdown一样,上面已经分析过了
        tryTerminate();
        //返回所有queue中未执行的任务
        return tasks;
    }

    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     * 中断所有的线程,尽管时活跃的。忽视SecurityExceptions。
     * (这种情况下一些线程可能仍然未中断,可能阻塞在一些未想用中断的方法上,比如说AQS的acquireQueued就会吞掉中断响应)
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                //只中断已启动的线程,这里就和runWorker中的再次判断是否时STOP状态的逻辑呼应上了。
                //可能有些线程这时还未启动,然后立即又被启动了,就是shutdownNow和runWorker方法几乎同时执行导致。
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
}

总结:
shutdownNow和shutdown差别不大。

  1. 将状态改为STOP。
  2. 中断所有的已启动的线程(不管是否空闲),但是未启动的中断不了,所以再runWorker方法中有一段判断STOP状态的逻辑。
  3. 会将任务队列中的未执行的任务返回出去。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,546评论 6 507
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,224评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,911评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,737评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,753评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,598评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,338评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,249评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,696评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,888评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,013评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,731评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,348评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,929评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,048评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,203评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,960评论 2 355

推荐阅读更多精彩内容