【死磕Java并发】-----J.U.C之线程池:ThreadPoolExecutor

原文出处http://cmsblogs.com/ 『chenssy』

作为Executor框架中最核心的类,ThreadPoolExecutor代表着鼎鼎大名的线程池,它给了我们足够的理由来弄清楚它。

下面我们就通过源码来一步一步弄清楚它。

内部状态

线程有五种状态:新建,就绪,运行,阻塞,死亡,线程池同样有五种状态:Running, SHUTDOWN, STOP, TIDYING, TERMINATED。

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;

    private static final int CAPACITY  = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits

    private static final int RUNNING    = -1 << COUNT_BITS;

    private static final int SHUTDOWN  =  0 << COUNT_BITS;

    private static final int STOP      =  1 << COUNT_BITS;

    private static final int TIDYING    =  2 << COUNT_BITS;

    private static final int TERMINATED =  3 << COUNT_BITS;

    // Packing and unpacking ctl

    private static int runStateOf(int c)    { return c & ~CAPACITY; }

    private static int workerCountOf(int c)  { return c & CAPACITY; }

    private static int ctlOf(int rs, int wc) { return rs | wc; }

变量ctl定义为AtomicInteger ,其功能非常强大,记录了“线程池中的任务数量”和“线程池的状态”两个信息。共32位,其中高3位表示"线程池状态",低29位表示"线程池中的任务数量"。

RUNNING            -- 对应的高3位值是111。

SHUTDOWN      -- 对应的高3位值是000。

STOP                  -- 对应的高3位值是001。

TIDYING              -- 对应的高3位值是010。

TERMINATED    -- 对应的高3位值是011。

RUNNING:处于RUNNING状态的线程池能够接受新任务,以及对新添加的任务进行处理。

SHUTDOWN:处于SHUTDOWN状态的线程池不可以接受新任务,但是可以对已添加的任务进行处理。

STOP:处于STOP状态的线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。

TIDYING:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。

TERMINATED:线程池彻底终止的状态。

各个状态的转换如下:

线程池状态转换

创建线程池

我们可以通过ThreadPoolExecutor构造函数来创建一个线程池:

    public ThreadPoolExecutor(int corePoolSize,

                              int maximumPoolSize,

                              long keepAliveTime,

                              TimeUnit unit,

                              BlockingQueue<Runnable> workQueue,

                              ThreadFactory threadFactory,

                              RejectedExecutionHandler handler) {

        if (corePoolSize < 0 ||

            maximumPoolSize <= 0 ||

            maximumPoolSize < corePoolSize ||

            keepAliveTime < 0)

            throw new IllegalArgumentException();

        if (workQueue == null || threadFactory == null || handler == null)

            throw new NullPointerException();

        this.corePoolSize = corePoolSize;

        this.maximumPoolSize = maximumPoolSize;

        this.workQueue = workQueue;

        this.keepAliveTime = unit.toNanos(keepAliveTime);

        this.threadFactory = threadFactory;

        this.handler = handler;

    }

共有七个参数,每个参数含义如下:

corePoolSize

线程池中核心线程的数量。当提交一个任务时,线程池会新建一个线程来执行任务,直到当前线程数等于corePoolSize。如果调用了线程池的prestartAllCoreThreads()方法,线程池会提前创建并启动所有基本线程。

maximumPoolSize

线程池中允许的最大线程数。线程池的阻塞队列满了之后,如果还有任务提交,如果当前的线程数小于maximumPoolSize,则会新建线程来执行任务。注意,如果使用的是无界队列,该参数也就没有什么效果了。

keepAliveTime

线程空闲的时间。线程的创建和销毁是需要代价的。线程执行完任务后不会立即销毁,而是继续存活一段时间:keepAliveTime。默认情况下,该参数只有在线程数大于corePoolSize时才会生效。

unit

keepAliveTime的单位。TimeUnit

workQueue

用来保存等待执行的任务的阻塞队列,等待的任务必须实现Runnable接口。我们可以选择如下几种:

ArrayBlockingQueue:基于数组结构的有界阻塞队列,FIFO。【死磕Java并发】----J.U.C之阻塞队列:ArrayBlockingQueue

LinkedBlockingQueue:基于链表结构的有界阻塞队列,FIFO。

SynchronousQueue:不存储元素的阻塞队列,每个插入操作都必须等待一个移出操作,反之亦然。【死磕Java并发】----J.U.C之阻塞队列:SynchronousQueue

PriorityBlockingQueue:具有优先界别的阻塞队列。【死磕Java并发】----J.U.C之阻塞队列:PriorityBlockingQueue

threadFactory

用于设置创建线程的工厂。该对象可以通过Executors.defaultThreadFactory(),如下:

    public static ThreadFactory defaultThreadFactory() {

        return new DefaultThreadFactory();

    }

返回的是DefaultThreadFactory对象,源码如下:

    static class DefaultThreadFactory implements ThreadFactory {

        private static final AtomicInteger poolNumber = new AtomicInteger(1);

        private final ThreadGroup group;

        private final AtomicInteger threadNumber = new AtomicInteger(1);

        private final String namePrefix;

        DefaultThreadFactory() {

            SecurityManager s = System.getSecurityManager();

            group = (s != null) ? s.getThreadGroup() :

                                  Thread.currentThread().getThreadGroup();

            namePrefix = "pool-" +

                          poolNumber.getAndIncrement() +

                        "-thread-";

        }

        public Thread newThread(Runnable r) {

            Thread t = new Thread(group, r,

                                  namePrefix + threadNumber.getAndIncrement(),

                                  0);

            if (t.isDaemon())

                t.setDaemon(false);

            if (t.getPriority() != Thread.NORM_PRIORITY)

                t.setPriority(Thread.NORM_PRIORITY);

            return t;

        }

    }

ThreadFactory的左右就是提供创建线程的功能的线程工厂。他是通过newThread()方法提供创建线程的功能,newThread()方法创建的线程都是“非守护线程”而且“线程优先级都是Thread.NORM_PRIORITY”。

handler

RejectedExecutionHandler,线程池的拒绝策略。所谓拒绝策略,是指将任务添加到线程池中时,线程池拒绝该任务所采取的相应策略。当向线程池中提交任务时,如果此时线程池中的线程已经饱和了,而且阻塞队列也已经满了,则线程池会选择一种拒绝策略来处理该任务。

线程池提供了四种拒绝策略:

AbortPolicy:直接抛出异常,默认策略;

CallerRunsPolicy:用调用者所在的线程来执行任务;

DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

DiscardPolicy:直接丢弃任务;

当然我们也可以实现自己的拒绝策略,例如记录日志等等,实现RejectedExecutionHandler接口即可。

线程池

Executor框架提供了三种线程池,他们都可以通过工具类Executors来创建。

FixedThreadPool

FixedThreadPool,可重用固定线程数的线程池,其定义如下:

    public static ExecutorService newFixedThreadPool(int nThreads) {

        return new ThreadPoolExecutor(nThreads, nThreads,

                                      0L, TimeUnit.MILLISECONDS,

                                      new LinkedBlockingQueue<Runnable>());

    }

corePoolSize 和 maximumPoolSize都设置为创建FixedThreadPool时指定的参数nThreads,意味着当线程池满时且阻塞队列也已经满时,如果继续提交任务,则会直接走拒绝策略,该线程池不会再新建线程来执行任务,而是直接走拒绝策略。FixedThreadPool使用的是默认的拒绝策略,即AbortPolicy,则直接抛出异常。

keepAliveTime设置为0L,表示空闲的线程会立刻终止。

workQueue则是使用LinkedBlockingQueue,但是没有设置范围,那么则是最大值(Integer.MAX_VALUE),这基本就相当于一个无界队列了。使用该“无界队列”则会带来哪些影响呢?当线程池中的线程数量等于corePoolSize 时,如果继续提交任务,该任务会被添加到阻塞队列workQueue中,当阻塞队列也满了之后,则线程池会新建线程执行任务直到maximumPoolSize。由于FixedThreadPool使用的是“无界队列”LinkedBlockingQueue,那么maximumPoolSize参数无效,同时指定的拒绝策略AbortPolicy也将无效。而且该线程池也不会拒绝提交的任务,如果客户端提交任务的速度快于任务的执行,那么keepAliveTime也是一个无效参数。

其运行图如下(参考《Java并发编程的艺术》):

FixedThreadPool

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor,定义如下:

    public static ExecutorService newSingleThreadExecutor() {

        return new FinalizableDelegatedExecutorService

            (new ThreadPoolExecutor(1, 1,

                                    0L, TimeUnit.MILLISECONDS,

                                    new LinkedBlockingQueue<Runnable>()));

    }

作为单一worker线程的线程池,SingleThreadExecutor把corePool和maximumPoolSize均被设置为1,和FixedThreadPool一样使用的是无界队列LinkedBlockingQueue,所以带来的影响和FixedThreadPool一样。

SingleThreadExecutor

CachedThreadPool

CachedThreadPool是一个会根据需要创建新线程的线程池 ,他定义如下:

    public static ExecutorService newCachedThreadPool() {

        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

                                      60L, TimeUnit.SECONDS,

                                      new SynchronousQueue<Runnable>());

    }

CachedThreadPool的corePool为0,maximumPoolSize为Integer.MAX_VALUE,这就意味着所有的任务一提交就会加入到阻塞队列中。keepAliveTime这是为60L,unit设置为TimeUnit.SECONDS,意味着空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。阻塞队列采用的SynchronousQueue,而我们在【死磕Java并发】----J.U.C之阻塞队列:SynchronousQueue中了解到SynchronousQueue是一个没有元素的阻塞队列,加上corePool = 0 ,maximumPoolSize = Integer.MAX_VALUE,这样就会存在一个问题,如果主线程提交任务的速度远远大于CachedThreadPool的处理速度,则CachedThreadPool会不断地创建新线程来执行任务,这样有可能会导致系统耗尽CPU和内存资源,所以在使用该线程池是,一定要注意控制并发的任务数,否则创建大量的线程可能导致严重的性能问题。

CachedThreadPool

任务提交

线程池根据业务不同的需求提供了两种方式提交任务:Executor.execute()、ExecutorService.submit()。其中ExecutorService.submit()可以获取该任务执行的Future。

我们以Executor.execute()为例,来看看线程池的任务提交经历了那些过程。

定义:

public interface Executor {

    void execute(Runnable command);

}

ThreadPoolExecutor提供实现:

    public void execute(Runnable command) {

        if (command == null)

            throw new NullPointerException();

        int c = ctl.get();

        if (workerCountOf(c) < corePoolSize) {

            if (addWorker(command, true))

                return;

            c = ctl.get();

        }

        if (isRunning(c) && workQueue.offer(command)) {

            int recheck = ctl.get();

            if (! isRunning(recheck) && remove(command))

                reject(command);

            else if (workerCountOf(recheck) == 0)

                addWorker(null, false);

        }

        else if (!addWorker(command, false))

            reject(command);

    }

执行流程如下:

如果线程池当前线程数小于corePoolSize,则调用addWorker创建新线程执行任务,成功返回true,失败执行步骤2。

如果线程池处于RUNNING状态,则尝试加入阻塞队列,如果加入阻塞队列成功,则尝试进行Double Check,如果加入失败,则执行步骤3。

如果线程池不是RUNNING状态或者加入阻塞队列失败,则尝试创建新线程直到maxPoolSize,如果失败,则调用reject()方法运行相应的拒绝策略。

在步骤2中如果加入阻塞队列成功了,则会进行一个Double Check的过程。Double Check过程的主要目的是判断加入到阻塞队里中的线程是否可以被执行。如果线程池不是RUNNING状态,则调用remove()方法从阻塞队列中删除该任务,然后调用reject()方法处理任务。否则需要确保还有线程执行。

addWorker

当线程中的当前线程数小于corePoolSize,则调用addWorker()创建新线程执行任务,当前线程数则是根据ctl变量来获取的,调用workerCountOf(ctl)获取低29位即可:

    private static int workerCountOf(int c)  { return c & CAPACITY; }

addWorker(Runnable firstTask, boolean core)方法用于创建线程执行任务,源码如下:

    private boolean addWorker(Runnable firstTask, boolean core) {

        retry:

        for (;;) {

            int c = ctl.get();

            // 获取当前线程状态

            int rs = runStateOf(c);

            if (rs >= SHUTDOWN &&

                    ! (rs == SHUTDOWN &&

                            firstTask == null &&

                            ! workQueue.isEmpty()))

                return false;

            // 内层循环,worker + 1

            for (;;) {

                // 线程数量

                int wc = workerCountOf(c);

                // 如果当前线程数大于线程最大上限CAPACITY  return false

                // 若core == true,则与corePoolSize 比较,否则与maximumPoolSize ,大于 return false

                if (wc >= CAPACITY ||

                        wc >= (core ? corePoolSize : maximumPoolSize))

                    return false;

                // worker + 1,成功跳出retry循环

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                // CAS add worker 失败,再次读取ctl

                c = ctl.get();

                // 如果状态不等于之前获取的state,跳出内层循环,继续去外层循环判断

                if (runStateOf(c) != rs)

                    continue retry;

            }

        }

        boolean workerStarted = false;

        boolean workerAdded = false;

        Worker w = null;

        try {

            // 新建线程:Worker

            w = new Worker(firstTask);

            // 当前线程

            final Thread t = w.thread;

            if (t != null) {

                // 获取主锁:mainLock

                final ReentrantLock mainLock = this.mainLock;

                mainLock.lock();

                try {

                    // 线程状态

                    int rs = runStateOf(ctl.get());

                    // rs < SHUTDOWN ==> 线程处于RUNNING状态

                    // 或者线程处于SHUTDOWN状态,且firstTask == null(可能是workQueue中仍有未执行完成的任务,创建没有初始任务的worker线程执行)

                    if (rs < SHUTDOWN ||

                            (rs == SHUTDOWN && firstTask == null)) {

                        // 当前线程已经启动,抛出异常

                        if (t.isAlive()) // precheck that t is startable

                            throw new IllegalThreadStateException();

                        // workers是一个HashSet<Worker>

                        workers.add(w);

                        // 设置最大的池大小largestPoolSize,workerAdded设置为true

                        int s = workers.size();

                        if (s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded = true;

                    }

                } finally {

                    // 释放锁

                    mainLock.unlock();

                }

                // 启动线程

                if (workerAdded) {

                    t.start();

                    workerStarted = true;

                }

            }

        } finally {

            // 线程启动失败

            if (! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

判断当前线程是否可以添加任务,如果可以则进行下一步,否则return false;

rs >= SHUTDOWN ,表示当前线程处于SHUTDOWN ,STOP、TIDYING、TERMINATED状态

rs == SHUTDOWN , firstTask != null时不允许添加线程,因为线程处于SHUTDOWN 状态,不允许添加任务

rs == SHUTDOWN , firstTask == null,但workQueue.isEmpty() == true,不允许添加线程,因为firstTask == null是为了添加一个没有任务的线程然后再从workQueue中获取任务的,如果workQueue == null,则说明添加的任务没有任何意义。

内嵌循环,通过CAS worker + 1

获取主锁mailLock,如果线程池处于RUNNING状态获取处于SHUTDOWN状态且 firstTask == null,则将任务添加到workers Queue中,然后释放主锁mainLock,然后启动线程,然后return true,如果中途失败导致workerStarted= false,则调用addWorkerFailed()方法进行处理。

在这里需要好好理论addWorker中的参数,在execute()方法中,有三处调用了该方法:

第一次:workerCountOf(c) < corePoolSize ==> addWorker(command, true),这个很好理解,当然线程池的线程数量小于 corePoolSize ,则新建线程执行任务即可,在执行过程core == true,内部与corePoolSize比较即可。

第二次:加入阻塞队列进行Double Check时,else if (workerCountOf(recheck) == 0) ==>addWorker(null, false)。如果线程池中的线程==0,按照道理应该该任务应该新建线程执行任务,但是由于已经该任务已经添加到了阻塞队列,那么就在线程池中新建一个空线程,然后从阻塞队列中取线程即可。

第三次:线程池不是RUNNING状态或者加入阻塞队列失败:else if (!addWorker(command, false)),这里core == fase,则意味着是与maximumPoolSize比较。

在新建线程执行任务时,将讲Runnable包装成一个Worker,Woker为ThreadPoolExecutor的内部类

Woker内部类

Woker的源码如下:

    private final class Worker extends AbstractQueuedSynchronizer

            implements Runnable {

        private static final long serialVersionUID = 6138294804551838833L;

        // task 的thread

        final Thread thread;

        // 运行的任务task

        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {

            //设置AQS的同步状态private volatile int state,是一个计数器,大于0代表锁已经被获取

            setState(-1);

            this.firstTask = firstTask;

            // 利用ThreadFactory和 Worker这个Runnable创建的线程对象

            this.thread = getThreadFactory().newThread(this);

        }

        // 任务执行

        public void run() {

            runWorker(this);

        }

    }

从Worker的源码中我们可以看到Woker继承AQS,实现Runnable接口,所以可以认为Worker既是一个可以执行的任务,也可以达到获取锁释放锁的效果。这里继承AQS主要是为了方便线程的中断处理。这里注意两个地方:构造函数、run()。构造函数主要是做三件事:1.设置同步状态state为-1,同步状态大于0表示就已经获取了锁,2.设置将当前任务task设置为firstTask,3.利用Worker本身对象this和ThreadFactory创建线程对象。

当线程thread启动(调用start()方法)时,其实就是执行Worker的run()方法,内部调用runWorker()。

runWorker

    final void runWorker(Worker w) {

        // 当前线程

        Thread wt = Thread.currentThread();

        // 要执行的任务

        Runnable task = w.firstTask;

        w.firstTask = null;

        // 释放锁,运行中断

        w.unlock(); // allow interrupts

        boolean completedAbruptly = true;

        try {

            while (task != null || (task = getTask()) != null) {

                // worker 获取锁

                w.lock();

                // 确保只有当线程是stoping时,才会被设置为中断,否则清楚中断标示

                // 如果线程池状态 >= STOP ,且当前线程没有设置中断状态,则wt.interrupt()

                // 如果线程池状态 < STOP,但是线程已经中断了,再次判断线程池是否 >= STOP,如果是 wt.interrupt()

                if ((runStateAtLeast(ctl.get(), STOP) ||

                        (Thread.interrupted() &&

                                runStateAtLeast(ctl.get(), STOP))) &&

                        !wt.isInterrupted())

                    wt.interrupt();

                try {

                    // 自定义方法

                    beforeExecute(wt, task);

                    Throwable thrown = null;

                    try {

                        // 执行任务

                        task.run();

                    } catch (RuntimeException x) {

                        thrown = x; throw x;

                    } catch (Error x) {

                        thrown = x; throw x;

                    } catch (Throwable x) {

                        thrown = x; throw new Error(x);

                    } finally {

                        afterExecute(task, thrown);

                    }

                } finally {

                    task = null;

                    // 完成任务数 + 1

                    w.completedTasks++;

                    // 释放锁

                    w.unlock();

                }

            }

            completedAbruptly = false;

        } finally {

            processWorkerExit(w, completedAbruptly);

        }

    }

运行流程

根据worker获取要执行的任务task,然后调用unlock()方法释放锁,这里释放锁的主要目的在于中断,因为在new Worker时,设置的state为-1,调用unlock()方法可以将state设置为0,这里主要原因就在于interruptWorkers()方法只有在state >= 0时才会执行;

通过getTask()获取执行的任务,调用task.run()执行,当然在执行之前会调用worker.lock()上锁,执行之后调用worker.unlock()放锁;

在任务执行前后,可以根据业务场景自定义beforeExecute() 和 afterExecute()方法,则两个方法在ThreadPoolExecutor中是空实现;

如果线程执行完成,则会调用getTask()方法从阻塞队列中获取新任务,如果阻塞队列为空,则根据是否超时来判断是否需要阻塞;

task == null或者抛出异常(beforeExecute()、task.run()、afterExecute()均有可能)导致worker线程终止,则调用processWorkerExit()方法处理worker退出流程。

getTask()

    private Runnable getTask() {

        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {

            // 线程池状态

            int c = ctl.get();

            int rs = runStateOf(c);

            // 线程池中状态 >= STOP 或者 线程池状态 == SHUTDOWN且阻塞队列为空,则worker - 1,return null

            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {

                decrementWorkerCount();

                return null;

            }

            int wc = workerCountOf(c);

            // 判断是否需要超时控制

            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {

                if (compareAndDecrementWorkerCount(c))

                    return null;

                continue;

            }

            try {

                // 从阻塞队列中获取task

                // 如果需要超时控制,则调用poll(),否则调用take()

                Runnable r = timed ?

                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

                        workQueue.take();

                if (r != null)

                    return r;

                timedOut = true;

            } catch (InterruptedException retry) {

                timedOut = false;

            }

        }

    }

timed == true,调用poll()方法,如果在keepAliveTime时间内还没有获取task的话,则返回null,继续循环。timed == false,则调用take()方法,该方法为一个阻塞方法,没有任务时会一直阻塞挂起,直到有任务加入时对该线程唤醒,返回任务。

在runWorker()方法中,无论最终结果如何,都会执行processWorkerExit()方法对worker进行退出处理。

processWorkerExit()

    private void processWorkerExit(Worker w, boolean completedAbruptly) {

        // true:用户线程运行异常,需要扣减

        // false:getTask方法中扣减线程数量

        if (completedAbruptly)

            decrementWorkerCount();

        // 获取主锁

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            completedTaskCount += w.completedTasks;

            // 从HashSet中移出worker

            workers.remove(w);

        } finally {

            mainLock.unlock();

        }

        // 有worker线程移除,可能是最后一个线程退出需要尝试终止线程池

        tryTerminate();

        int c = ctl.get();

        // 如果线程为running或shutdown状态,即tryTerminate()没有成功终止线程池,则判断是否有必要一个worker

        if (runStateLessThan(c, STOP)) {

            // 正常退出,计算min:需要维护的最小线程数量

            if (!completedAbruptly) {

                // allowCoreThreadTimeOut 默认false:是否需要维持核心线程的数量

                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;

                // 如果min ==0 或者workerQueue为空,min = 1

                if (min == 0 && ! workQueue.isEmpty())

                    min = 1;

                // 如果线程数量大于最少数量min,直接返回,不需要新增线程

                if (workerCountOf(c) >= min)

                    return; // replacement not needed

            }

            // 添加一个没有firstTask的worker

            addWorker(null, false);

        }

    }

首先completedAbruptly的值来判断是否需要对线程数-1处理,如果completedAbruptly == true,说明在任务运行过程中出现了异常,那么需要进行减1处理,否则不需要,因为减1处理在getTask()方法中处理了。然后从HashSet中移出该worker,过程需要获取mainlock。然后调用tryTerminate()方法处理,该方法是对最后一个线程退出做终止线程池动作。如果线程池没有终止,那么线程池需要保持一定数量的线程,则通过addWorker(null,false)新增一个空的线程。

addWorkerFailed()

在addWorker()方法中,如果线程t==null,或者在add过程出现异常,会导致workerStarted == false,那么在最后会调用addWorkerFailed()方法:

    private void addWorkerFailed(Worker w) {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            // 从HashSet中移除该worker

            if (w != null)

                workers.remove(w);

            // 线程数 - 1

            decrementWorkerCount();

            // 尝试终止线程

            tryTerminate();

        } finally {

            mainLock.unlock();

        }

    }

整个逻辑显得比较简单。

tryTerminate()

当线程池涉及到要移除worker时候都会调用tryTerminate(),该方法主要用于判断线程池中的线程是否已经全部移除了,如果是的话则关闭线程池。

    final void tryTerminate() {

        for (;;) {

            int c = ctl.get();

            // 线程池处于Running状态

            // 线程池已经终止了

            // 线程池处于ShutDown状态,但是阻塞队列不为空

            if (isRunning(c) ||

                    runStateAtLeast(c, TIDYING) ||

                    (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

                return;

            // 执行到这里,就意味着线程池要么处于STOP状态,要么处于SHUTDOWN且阻塞队列为空

            // 这时如果线程池中还存在线程,则会尝试中断线程

            if (workerCountOf(c) != 0) {

                // /线程池还有线程,但是队列没有任务了,需要中断唤醒等待任务的线程

                // (runwoker的时候首先就通过w.unlock设置线程可中断,getTask最后面的catch处理中断)

                interruptIdleWorkers(ONLY_ONE);

                return;

            }

            final ReentrantLock mainLock = this.mainLock;

            mainLock.lock();

            try {

                // 尝试终止线程池

                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {

                    try {

                        terminated();

                    } finally {

                        // 线程池状态转为TERMINATED

                        ctl.set(ctlOf(TERMINATED, 0));

                        termination.signalAll();

                    }

                    return;

                }

            } finally {

                mainLock.unlock();

            }

        }

    }

在关闭线程池的过程中,如果线程池处于STOP状态或者处于SHUDOWN状态且阻塞队列为null,则线程池会调用interruptIdleWorkers()方法中断所有线程,注意ONLY_ONE== true,表示仅中断一个线程。

interruptIdleWorkers

    private void interruptIdleWorkers(boolean onlyOne) {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            for (Worker w : workers) {

                Thread t = w.thread;

                if (!t.isInterrupted() && w.tryLock()) {

                    try {

                        t.interrupt();

                    } catch (SecurityException ignore) {

                    } finally {

                        w.unlock();

                    }

                }

                if (onlyOne)

                    break;

            }

        } finally {

            mainLock.unlock();

        }

    }

onlyOne==true仅终止一个线程,否则终止所有线程。

线程终止

线程池ThreadPoolExecutor提供了shutdown()和shutDownNow()用于关闭线程池。

shutdown():按过去执行已提交任务的顺序发起一个有序的关闭,但是不接受新任务。

shutdownNow() :尝试停止所有的活动执行任务、暂停等待任务的处理,并返回等待执行的任务列表。

shutdown

    public void shutdown() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            // 推进线程状态

            advanceRunState(SHUTDOWN);

            // 中断空闲的线程

            interruptIdleWorkers();

            // 交给子类实现

            onShutdown();

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

    }

shutdownNow

    public List<Runnable> shutdownNow() {

        List<Runnable> tasks;

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            checkShutdownAccess();

            advanceRunState(STOP);

            // 中断所有线程

            interruptWorkers();

            // 返回等待执行的任务列表

            tasks = drainQueue();

        } finally {

            mainLock.unlock();

        }

        tryTerminate();

        return tasks;

    }

与shutdown不同,shutdownNow会调用interruptWorkers()方法中断所有线程。

    private void interruptWorkers() {

        final ReentrantLock mainLock = this.mainLock;

        mainLock.lock();

        try {

            for (Worker w : workers)

                w.interruptIfStarted();

        } finally {

            mainLock.unlock();

        }

    }

同时会调用drainQueue()方法返回等待执行到任务列表。

    private List<Runnable> drainQueue() {

        BlockingQueue<Runnable> q = workQueue;

        ArrayList<Runnable> taskList = new ArrayList<Runnable>();

        q.drainTo(taskList);

        if (!q.isEmpty()) {

            for (Runnable r : q.toArray(new Runnable[0])) {

                if (q.remove(r))

                    taskList.add(r);

            }

        }

        return taskList;

    }

欢迎扫一扫我的公众号关注 — 及时得到博客订阅哦!

个人微信公众号

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,001评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,210评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,874评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,001评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,022评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,005评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,929评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,742评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,193评论 1 309
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,427评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,583评论 1 346
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,305评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,911评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,564评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,731评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,581评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,478评论 2 352

推荐阅读更多精彩内容