java - ThreadPoolExecutor详解

线程池种类有很多,我们以其中最经典的ThreadPoolExecutor为例,对他进行分析。

  • ThreadPoolExecutor

包含一个核心线程池,如果核心线程池未满时,新加入的任务都会被添加到核心线程池中执行,如果已满,会添加到等待队列queue,是否可以无限添加任务,取决于等待队列的实现方式:
如果是LinkedBlockingQueue,就属于无边界队列,理论是可以无限添加任务执行
如果是ArrayBlockingQueue,属于有边界队列,只能添加一定数量的任务执行

注意

这里需要先解释一下,ctl 是一个线程安全的AtomicInteger 统计值,内部实现采用了CAS多线程控制,既然ctl这个参数既要记录线程的数量,又要记录线程池的状态,那只能把int的 32位进行切割,现有的状态定义有5中状态,所以需要至少3位才能表示,其中高3位用于记录线程池的运行状态,剩余29位用于记录线程池的大小,所以边界值 (1 << COUNT_BITS) - 1,左移29位-1,其中 RUNNING状态最为特殊,高3位都为 1。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }
  • CAPACITY 表示线程池中最大的线程数量值,后28位 都为1,高4位 都为 0。
  • ctlof(...) 使用了 | 按位或,则既保留了高3位的值,也保留了 线程数量 低29位的值。
  • runStateOf(...),这个方法是 ctlof的反运算,& 按位与CAPACITY ,得到了线程数量值
  • workerCountOf(...),这个方法是 ctlof的反运算,& 按位与 (CAPACITY的取反值),则 得到了线程池的状态值。
创建过程

我们看一下最根本的调用方法

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        // workQueue 阻塞队列,或者叫做等待队列,这个标志着ThreadPoolExecutor的特性,有无边界
        // threadFactory 任务创建器,传入的是一个runnable,需要使用threadFactory 对他进行封装
        // 默认可以使用 Executors.defaultThreadFactory()创建器
        // handler 当ThreadPoolExecutor,进入消亡时,再添加任务时,会被拒绝添加,
        // RejectedExecutionHandler 就是拒绝添加时的操作,是抛出错误呢,还是做特定的事。
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        // 
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
execute添加执行任务
/**
     * Executes the given task sometime in the future.  The task
     * may execute in a new thread or in an existing pooled thread.
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        // 取出存储的线程相关的信息
        int c = ctl.get();
       // workerCountOf 这个方法用于获取当前核心线程的数量
        if (workerCountOf(c) < corePoolSize) {
            // addWorker用于创建并添加到 核心线程中
           // 如果添加成功,直接返回
            if (addWorker(command, true))
                return;
           // 如果添加失败,重新获取一次 相关信息,(多线程操作)
            c = ctl.get();
        }
        // 添加失败,或者核心线程已经满了
        // 判断当前线程池是不是还处于running状态,并且在等待队列中添加新的工作任务
        if (isRunning(c) && workQueue.offer(command)) {
            // 再次获取 添加之后的  状态信息
            int recheck = ctl.get();
          // 如果当前线程池 不处于running 状态,并且移除等在队列中新加的工作任务成功
            if (! isRunning(recheck) && remove(command))
                // 拒绝操作
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 添加非核心线程工作任务
        else if (!addWorker(command, false))
            // 拒绝操作
            reject(command);
    }

这里我们可以总结一下:

  • 如果当前线程池中的线程数量小于corePoolSize核心线程数量,则新增的任务为核心线程任务
  • 如果核心线程池已满,或者添加核心任务失败,就会尝试添加到等待队列,如果当前线程池状态已经进入了销毁过程,执行拒绝任务操作。
  • 如果添加等待队列失败,尝试添加非核心队列,如果添加失败执行拒绝任务操作
addWorker
/**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread.start()), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        // 定义一个代码块,校验是否可以添加工作任务
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            // 判断当前的 线程池状态是否处于RUNNING的状态
            // 或者初始状态 SHUTDOWN 并且  firstTask 任务为空,workQueue等待队列为空
            // 也就是说,如果当前的线程池不满足这两个条件,是不能添加任何的 任务了
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            // 如果是 运行正常的状态
            for (;;) {
                int wc = workerCountOf(c);
                // 判断当前执行线程数量,是否超过了限制值
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 当前的线程数量 + 1,如果成功  并且退出判断流程,进入创建流程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 如果线程数量 + 1,操作失败,需要更新ctl的值
                c = ctl.get();  // Re-read ctl
                // 如果更新之后的 ctl值,线程池运行状态不变,继续一个循环判断
                if (runStateOf(c) != rs)
                    // 如果运行状态发生改变,需要重新判断 线程池状态
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 创建 工作任务
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 获取 ReentrantLock 重入锁,是的接下来的操作具有原子性,保证线程安全
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());
                    // 再次校验,线程池的运行状态
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                         // 如果线程已经在执行了,抛出异常
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 添加 执行任务
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    // 释放重入锁
                    mainLock.unlock();
                }
                // 如果工作任务已经添加完成, 启动线程,执行任务
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果添加任务启动失败,需要把ctl 值还原,另外当前的线程池状态需要检测
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  • retry: 意思就是定义一个代码块,可以直接操作循环,或者退出代码块,至于叫retry,这是命名而已,换成 A,B 都可以,这里的意思就是 第一个for 循环是定义的一个代码块,名字叫retry,这样的方便之处在于,如果循环很深,有一个退出循环,就可以直接使用代码块退出,方便直接,当然也可以把这段的写法替换成一个方法,也是可以的。

  • 添加任务都是添加到 工作任务中,如果添加成功了,就会立刻启动执行work 任务

runWorker
    /**
     * Main worker run loop.  Repeatedly gets tasks from queue and
     * executes them, while coping with a number of issues:
     *
     * 1. We may start out with an initial task, in which case we
     * don't need to get the first one. Otherwise, as long as pool is
     * running, we get tasks from getTask. If it returns null then the
     * worker exits due to changed pool state or configuration
     * parameters.  Other exits result from exception throws in
     * external code, in which case completedAbruptly holds, which
     * usually leads processWorkerExit to replace this thread.
     *
     * 2. Before running any task, the lock is acquired to prevent
     * other pool interrupts while the task is executing, and then we
     * ensure that unless pool is stopping, this thread does not have
     * its interrupt set.
     *
     * 3. Each task run is preceded by a call to beforeExecute, which
     * might throw an exception, in which case we cause thread to die
     * (breaking loop with completedAbruptly true) without processing
     * the task.
     *
     * 4. Assuming beforeExecute completes normally, we run the task,
     * gathering any of its thrown exceptions to send to afterExecute.
     * We separately handle RuntimeException, Error (both of which the
     * specs guarantee that we trap) and arbitrary Throwables.
     * Because we cannot rethrow Throwables within Runnable.run, we
     * wrap them within Errors on the way out (to the thread's
     * UncaughtExceptionHandler).  Any thrown exception also
     * conservatively causes thread to die.
     *
     * 5. After task.run completes, we call afterExecute, which may
     * also throw an exception, which will also cause thread to
     * die. According to JLS Sec 14.20, this exception is the one that
     * will be in effect even if task.run throws.
     *
     * The net effect of the exception mechanics is that afterExecute
     * and the thread's UncaughtExceptionHandler have as accurate
     * information as we can provide about any problems encountered by
     * user code.
     *
     * @param w the worker
     */
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
          // 当前线程任务执行完成之后,会尝试冲 等待队列中获取任务,然后复用work 继续执行等待中的任务
            while (task != null || (task = getTask()) != null) {
              // 将任务锁住,不允许其他人进行修改
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
               // 如果当前的线程池已经处于 STOP状态,并且当前的线程池并没有中断,则执行 中断interrupt操作
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                  // 当前线程池已经进入了销毁过程中,当前的执行线程就会 执行中断操作,并退出当前的while循环
                    wt.interrupt();
                try {
                    // 每个 任务都会 执行定义的  执行前操作
                    // 在线程外执行,可以做一些不耗时操作
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                      // 每个任务执行都在线程中执行
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                         // 不管任务执行成功与否,都会 执行定义的  执行后操作
                        // 在线程外执行,可以做一些不耗时操作
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 记录当前work 任务完成的任务数量
                    w.completedTasks++;
                    // 释放锁
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 执行退出当前任务操作
            // 如果是正常退出 work任务,则 completedAbruptly = false
            // 如果是异常退出 work任务,则 completedAbruptly = true,异常退出就需要查找一下原因
            processWorkerExit(w, completedAbruptly);
        }
    }
  • 核心任务如果执行完之后,会尝试从等待队列中获取runnable任务继续执行等待任务,并且会复用当前的work 对象,work 对象中会记录执行完成的任务数量。
processWorkerExit
    /**
     * Performs cleanup and bookkeeping for a dying worker. Called
     * only from worker threads. Unless completedAbruptly is set,
     * assumes that workerCount has already been adjusted to account
     * for exit.  This method removes thread from worker set, and
     * possibly terminates the pool or replaces the worker if either
     * it exited due to user task exception or if fewer than
     * corePoolSize workers are running or queue is non-empty but
     * there are no workers.
     *
     * @param w the worker
     * @param completedAbruptly if the worker died due to user exception
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // 如果是异常退出,当前的work 数量 -1 
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        // 获取重入锁,保证原子性,为了线程安全,也为了 completedTaskCount 完成的任务数的统计线程安全
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 统计总共完成的任务数量
            completedTaskCount += w.completedTasks;
            // 移除当前的 work任务
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试去 销毁当前的线程池
        tryTerminate();
      
       // 检测当前的线程池状态
        int c = ctl.get();
        // 当前的线程池状态处于 Running,Shut Down状态
        if (runStateLessThan(c, STOP)) {
          // 判断当前的 work 是否是正常退出
            if (!completedAbruptly) {
              // 正常退出, allowCoreThreadTimeOut  默认是false
              // 如果设置成了 true,意思表明 核心线程,最终会全部归零,也就是最终核心work 都会被销毁
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果当前的 work任务数 >= min, 代表work任务数已经满了, 不需要再添加work任务了
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            // 如果不满足,则添加  初始的 null 空任务的 非核心 work 任务
            addWorker(null, false);
        }
    }

好了,上面已经把线程池中的任务添加到执行分析完了,需要注意的有几点:

  • 添加核心任务,只有在线程池数量小于核心线程池大小的时,才会是核心的任务
  • 线程池复用,是 Work 类的复用,只是去等待队列中获取等待任务执行
    接下来我们分析如何停止线程池,或者线程任务。
shutdown
/**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted.
     * Invocation has no additional effect if already shut down.
     *
     * <p>This method does not wait for previously submitted tasks to
     * complete execution.  Use {@link #awaitTermination awaitTermination}
     * to do that.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查是否有权限关闭 当前线程池以及当前的工作线程
            checkShutdownAccess();
            // 把当前的线程池状态先 设置成 SHUTDOWN
            // 不会再有新加的任务了
            advanceRunState(SHUTDOWN);
          // 依次遍历workers列表,将对应的线程任务停止
          // 
            interruptIdleWorkers();
            // 这个方法是一个空方法,用于继承扩展
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
       // 尝试销毁
        tryTerminate();
    }
interruptIdleWorkers
    /**
     * Interrupts threads that might be waiting for tasks (as
     * indicated by not being locked) so they can check for
     * termination or configuration changes. Ignores
     * SecurityExceptions (in which case some threads may remain
     * uninterrupted).
     *
     * @param onlyOne If true, interrupt at most one worker. This is
     * called only from tryTerminate when termination is otherwise
     * enabled but there are still other workers.  In this case, at
     * most one waiting worker is interrupted to propagate shutdown
     * signals in case all threads are currently waiting.
     * Interrupting any arbitrary thread ensures that newly arriving
     * workers since shutdown began will also eventually exit.
     * To guarantee eventual termination, it suffices to always
     * interrupt only one idle worker, but shutdown() interrupts all
     * idle workers so that redundant workers exit promptly, not
     * waiting for a straggler task to finish.
     */
    private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍历整个 worker队列
            for (Worker w : workers) {
                Thread t = w.thread;
                // 尝试获取 线程锁,并且将没有 中断的 线程中断
                // 如果 当前的worker线程正在执行,就无法中断操作
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                       // 中断线程执行
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                // 如果只关闭一个 线程,则退出循环
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

    /**
     * Common form of interruptIdleWorkers, to avoid having to
     * remember what the boolean argument means.
     */
    private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
tryTerminate
    /**
     * Transitions to TERMINATED state if either (SHUTDOWN and pool
     * and queue empty) or (STOP and pool empty).  If otherwise
     * eligible to terminate but workerCount is nonzero, interrupts an
     * idle worker to ensure that shutdown signals propagate. This
     * method must be called following any action that might make
     * termination possible -- reducing worker count or removing tasks
     * from the queue during shutdown. The method is non-private to
     * allow access from ScheduledThreadPoolExecutor.
     */
    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            // 检测当前线程池的状态,如果正在运行中,或者 已经处于销毁的过程中, 或者 处于 SHUTDOWN并且等待队列不为空 
          // 代表当前的 线程池的销毁条件不满足,直接返回
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            // 如果当前的worker 任务 不为空
            if (workerCountOf(c) != 0) { // Eligible to terminate
                // 将重要的  一个任务暂停掉即可返回
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 先将当前的线程池 设置成 TIDYING,工作线程数量设置成 0 ,表明正在 销毁
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 这个方法是空的 用于用户扩展
                        terminated();
                    } finally {
                      // 最终 将线程池 设置成 已销毁的状态 TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

注意:调用了 shutdown 就一定能关闭当前线程池的 所有线程,并把线程池 设置到 销毁状态(TIDYING)吗?答案是否定的,shutdown 方法的确把任务的添加入口给 关掉了,但是对于现有的worker任务处理时,只会取关闭当前没有任务执行的worker任务,对于正在执行的 worker任务时不处理的。而且最后在 tryTerminate中会去检测,如果处于了SHUTDOWN 状态,但是 等待队列(workQueue)中还有任何任务未执行完成时,销毁线程池是不允许的。

shutdownNow
    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks
     * that were awaiting execution. These tasks are drained (removed)
     * from the task queue upon return from this method.
     *
     * <p>This method does not wait for actively executing tasks to
     * terminate.  Use {@link #awaitTermination awaitTermination} to
     * do that.
     *
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so any task that
     * fails to respond to interrupts may never terminate.
     *
     * @throws SecurityException {@inheritDoc}
     */
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 检查是否有权限关闭 当前线程池以及当前的工作线程
            checkShutdownAccess();
            // 把当前的线程池状态先 设置成 STOP
            // 这样可以把后续的添加任务动作给阻止掉,不会再有新加的任务了
            advanceRunState(STOP);
            // 依次遍历workers列表,将对应的线程任务停止
            interruptWorkers();
            // 将等待队列的 任务全部拷贝移除,并返回回去
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
      // 尝试销毁 
        tryTerminate();
        return tasks;
    }
interruptWorkers
    /**
     * Interrupts all threads, even if active. Ignores SecurityExceptions
     * (in which case some threads may remain uninterrupted).
     */
    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 遍历 worker
            for (Worker w : workers)
               // 中断当前 worker 任务执行
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

        void interruptIfStarted() {
            Thread t;
            // 只要当前的 worker 线程已经开始运行了,并且没有 中断
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    // 不管有没有 其他的任务执行,都会 直接中断掉当前的线程,比较的暴力
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }

注意:调用shutdownNow就比较暴力了,但是同样的问题,一定能关闭当前的线程池吗?答案当然是在权限正常的情况下,没有Exception的基础上,是的,可以关闭当前线程池,但是这种暴力关闭,会是你想要的吗,可以看到,对于worker 任务处理时,不管有没有任务正在执行,都会强行对worker线程进行中断,这会造成worker中的正在执行的任务执行不明,对于等待队列(workQueue)直接清空返回。

好了,线程池的创建,添加任务,销毁线程池已经弄完了。有错误之处欢迎留言指正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,907评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,987评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,298评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,586评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,633评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,488评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,275评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,176评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,619评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,819评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,932评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,655评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,265评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,871评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,994评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,095评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,884评论 2 354

推荐阅读更多精彩内容

  • 我们总觉得如果一切重新来过,结局会不会不一样,可是不管重新来过多少遍,你依旧是你,不会有任何改变,你的选择,你要遇...
    春风十里不如你lyy阅读 200评论 0 1
  • blairbellac阅读 227评论 0 0
  • 之前一直在唐人街的影院看到《末代公主》的宣传海报,一直想找时间去看,都没抽出时间去看,昨天突然在网路影院看到,便立...
    安迩東阅读 29,141评论 5 18
  • 遇到好多人,都问过我同样的问题“你是哪里人?”奇怪的是,无论我说自己是哪儿的人,都会被否认说不像呀!或许吧!一个习...
    我说璐璐啊阅读 232评论 0 1