线程池ThreadPoolExecutor实现原理及分析

关于线程池的类图如下


image.png

因为线程池好多方法都是ThreadPoolExecutor来实现的,这里我们先看下ThreadPoolExecutor
在文章开头我们先陈述下概念
在这篇文章中我们将我们需要提交给线程池执行的 统称为任务如:

 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        for (int i=0;i<20;i++){
          //new ThreadDemo("task-"+i)就是一个任务
            threadPoolExecutor.execute(new ThreadDemo("task-"+i));
        }

一、参数

/**记录线程池状态以及数量   前三位表示线程池状态 
**初始状态为                  1110  0000  0000  0000  0000  0000  0000  0000*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//线程数量统计位数29  Integer.SIZE为32
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池容量最大值 二进制     0001  1111  1111  1111  1111  1111  1111  1111 
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
/**线程池运行状态   JAVA -1二进制为32个1 (二进制首位为1代表)
  **所以二进制为              1110  0000  0000  0000  0000  0000  0000  0000 **/
private static final int RUNNING    = -1 << COUNT_BITS;
//线程池关闭状态 二进制为     0000  0000  0000  0000  0000  0000  0000  0000
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//线程池停止状态 二进制为     0010  0000  0000  0000  0000  0000  0000  0000
private static final int STOP       =  1 << COUNT_BITS;
//线程池整理状态 二进制为     0100  0000  0000  0000  0000  0000  0000  0000
private static final int TIDYING    =  2 << COUNT_BITS;
// 线程池终止状态二进制为     0110  0000  0000  0000  0000  0000  0000  0000
private static final int TERMINATED =  3 << COUNT_BITS;
//这里将一部分方法放到这里来看便于理解线程池的状态
/**返回线程池状态 ~CAPACITY取反操作   正数的补码为正数的原码
**CAPACITY的补码 ~CAPACITY 为 1110  0000  0000  0000  0000  0000  0000  0000 **/   
private static int runStateOf(int c)     { return c & ~CAPACITY; }
//返回线程个数 初始状态 ctl &CAPACITY  为0
private static int workerCountOf(int c)  { return c & CAPACITY; }
//
private static int ctlOf(int rs, int wc) { return rs | wc; }
//存放阻塞任务的队列
private final BlockingQueue<Runnable> workQueue;
//锁
private final ReentrantLock mainLock = new ReentrantLock();
//线程池中正在运行状态的线程(工作线程) 
private final HashSet<Worker> workers = new HashSet<Worker>();
//锁条件
private final Condition termination = mainLock.newCondition();
//
private int largestPoolSize;
//统计已经完成的任务
private long completedTaskCount; 
//线程工厂
private volatile ThreadFactory threadFactory;
//线程池拒绝策略
private volatile RejectedExecutionHandler handler;
//空闲线程存活时间
private volatile long keepAliveTime;
//默认为false,为false时,核心线程空闲时,仍然运行,为true 核心线程数空闲keepAliveTime时间将停止
private volatile boolean allowCoreThreadTimeOut;
//
private volatile int corePoolSize;
//线程池的最大容量,受CAPACITY影响
private volatile int maximumPoolSize;
//
private final AccessControlContext acc;

二、线程池状态及转换

在上面我们可以看到线程池有五种状态 运行、关闭、停止、整理、终止五种状态。在这里我们先简单画个状态转换图


image.png

线程池区别线程,对于线程池状态,只能单向转换,并不能像线程一样有些状态可以双向转换,我们平时操作的也是shutdown()、shutdownNow()方法,对于线程池状态由SHUTDOWN(关闭)或STOP(停止)转向TIDYING(整理)状态是由shutdown()或shutdownNow()内部进行自行整理。
shutdown()和shutdownNow()区别在于,shutdown()会等待阻塞任务队列执行完毕(能正常添加阻塞队列中任务到工作线程只可能是RUNNING 状态),shutdownNow会立即向正在运行的工作线程执行中断请求,并且不会将阻塞任务添加到工作线程。
在这里我们先看2个例子,有助于我们理解后续线程池方法(我们可以在后续很条件判断看到线程池状态判断 区分运行或关闭状态为一种情况,其他状态为另一种种情况)。
例1:

public class FixedThreadPoolDemo {

    static class ThreadDemo implements Runnable{

        private String threadName;

        public ThreadDemo(String name){
            threadName= name;
        }
        @Override
        public void run() {
            Date date = new Date();
            SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
            System.out.println("当前线程运行的是:"+ threadName+ ",开始时间为:"+dateFormat.format(date));

            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String []args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        for (int i=0;i<20;i++){
            threadPoolExecutor.execute(new ThreadDemo("task-"+i));
        }
        Thread.sleep(5000);
        System.out.println("开始执行shutdown");
        threadPoolExecutor.shutdown();
    }
}

因为我们执行的是shutdown(),所以这20个任务都会执行完毕。

当前线程运行的是:task-2,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-0,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-1,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-4,开始时间为:2019-05-18 :10:38:46
当前线程运行的是:task-3,开始时间为:2019-05-18 :10:38:46
开始执行shutdown
当前线程运行的是:task-5,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-8,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-6,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-7,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-9,开始时间为:2019-05-18 :10:38:51
当前线程运行的是:task-10,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-11,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-13,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-14,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-12,开始时间为:2019-05-18 :10:38:56
当前线程运行的是:task-15,开始时间为:2019-05-18  :10:39:01
当前线程运行的是:task-18,开始时间为:2019-05-18  :10:39:01
当前线程运行的是:task-19,开始时间为:2019-05-18  :10:39:01
当前线程运行的是:task-17,开始时间为:2019-05-18  :10:39:01
当前线程运行的是:task-16,开始时间为:2019-05-18  :10:39:01

例2:

public class FixedThreadPoolDemo2 {

    static class ThreadDemo implements Runnable{

        private String threadName;

        public ThreadDemo(String name){
            threadName= name;
        }
        @Override
        public void run() {
            Date date = new Date();
            SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
            System.out.println("当前线程运行的是:"+ threadName+ ",开始时间为:"+dateFormat.format(date));

            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Date date1 = new Date();
            System.out.println("-----当前线程运行的是:"+ threadName + ",结束时间为:"+dateFormat.format(date1));
        }
    }

    public static void main(String []args) throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());
        for (int i=0;i<20;i++){
            threadPoolExecutor.execute(new ThreadDemo("task-"+i));
        }
        Thread.sleep(6000);
        System.out.println("开始执行shutdownNow");
        threadPoolExecutor.shutdownNow();
    }
}

因为我们执行shutdownNow()方法,所以我们只会执行前10个任务,在前面
线程状态与方法我们说过interrupt()方法的用处,这里也可以看到当线程执行interrupt()将中断标记置为true,线程被sleep阻塞,则会抛出InterruptedException()异常

当前线程运行的是:task-4,开始时间为:2019-05-18  :10:43:43
当前线程运行的是:task-0,开始时间为:2019-05-18  :10:43:43
当前线程运行的是:task-3,开始时间为:2019-05-18  :10:43:43
当前线程运行的是:task-2,开始时间为:2019-05-18  :10:43:43
当前线程运行的是:task-1,开始时间为:2019-05-18  :10:43:43
-----当前线程运行的是:task-0,结束时间为:2019-05-18  :10:43:48
-----当前线程运行的是:task-3,结束时间为:2019-05-18  :10:43:48
-----当前线程运行的是:task-2,结束时间为:2019-05-18  :10:43:48
-----当前线程运行的是:task-4,结束时间为:2019-05-18  :10:43:48
-----当前线程运行的是:task-1,结束时间为:2019-05-18  :10:43:48
当前线程运行的是:task-5,开始时间为:2019-05-18  :10:43:48
当前线程运行的是:task-6,开始时间为:2019-05-18  :10:43:48
当前线程运行的是:task-8,开始时间为:2019-05-18  :10:43:48
当前线程运行的是:task-7,开始时间为:2019-05-18  :10:43:48
当前线程运行的是:task-9,开始时间为:2019-05-18  :10:43:48
开始执行shutdownNow
java.lang.InterruptedException: sleep interrupted
-----当前线程运行的是:task-5,结束时间为:2019-05-18  :10:43:49
-----当前线程运行的是:task-8,结束时间为:2019-05-18  :10:43:49
-----当前线程运行的是:task-7,结束时间为:2019-05-18  :10:43:49
-----当前线程运行的是:task-9,结束时间为:2019-05-18  :10:43:49
-----当前线程运行的是:task-6,结束时间为:2019-05-18  :10:43:49
    at java.lang.Thread.sleep(Native Method)
    at com.example.threaddemo.threadpool.FixedThreadPoolDemo2$ThreadDemo.run(FixedThreadPoolDemo2.java:31)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at com.example.threaddemo.threadpool.FixedThreadPoolDemo2$ThreadDemo.run(FixedThreadPoolDemo2.java:31)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

这里再注意一点,只有线程池处于RUNNING (运行)状态时,才可以往阻塞任务队列添加任务,在这里我们要区分阻塞任务队列和工作线程。
这里看个例子

public class FixedThreadPoolDemo3 {

    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>());

    static class ThreadDemo implements Runnable{

        private String threadName;

        public ThreadDemo(String name){
            threadName = name;
        }
        @Override
        public void run() {
            Date date = new Date();
            SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
            System.out.println("当前线程运行的是:"+ threadName + ",开始时间为:"+dateFormat.format(date));
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class AddThread implements Runnable{
        @Override
        public void run() {
            //往线程池添加任务进入
            for (int i=0;i<20;i++){
                threadPoolExecutor.execute(new ThreadDemo("task-"+i));
                System.out.println("-------------添加task-"+i);
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void main(String []args) throws InterruptedException {
        Thread thread = new Thread(new AddThread());
        thread.start();
        Thread.sleep(8000);
        System.out.println("开始执行shutdown");
        threadPoolExecutor.shutdown();
    }
}

因为只有线程池处于RUNNING (运行)状态时,才可以将多余任务放入阻塞任务队列,处于关闭状态时,线程池只能从阻塞任务队列去轮询,而不能去添加新的阻塞任务。
结果如下

-------------添加task-0
当前线程运行的是:task-0,开始时间为:2019-05-18  :01:59:38
-------------添加task-1
当前线程运行的是:task-1,开始时间为:2019-05-18  :01:59:39
-------------添加task-2
当前线程运行的是:task-2,开始时间为:2019-05-18  :01:59:40
-------------添加task-3
当前线程运行的是:task-3,开始时间为:2019-05-18  :01:59:41
-------------添加task-4
当前线程运行的是:task-4,开始时间为:2019-05-18  :01:59:42
-------------添加task-5
当前线程运行的是:task-5,开始时间为:2019-05-18  :01:59:43
-------------添加task-6
当前线程运行的是:task-6,开始时间为:2019-05-18  :01:59:44
-------------添加task-7
当前线程运行的是:task-7,开始时间为:2019-05-18  :01:59:45
开始执行shutdown
Exception in thread "Thread-0" java.util.concurrent.RejectedExecutionException: Task com.example.threaddemo.threadpool.FixedThreadPoolDemo3$ThreadDemo@426b98be rejected from java.util.concurrent.ThreadPoolExecutor@763f3e2e[Shutting down, pool size = 4, active threads = 4, queued tasks = 0, completed tasks = 4]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
    at com.example.threaddemo.threadpool.FixedThreadPoolDemo3$AddThread.run(FixedThreadPoolDemo3.java:44)
    at java.lang.Thread.run(Thread.java:748)

三、内部类

1、Worker

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**序列化ID**/
        private static final long serialVersionUID = 6138294804551838833L;

        final Thread thread;
     
        Runnable firstTask;

        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1);
            this.firstTask = firstTask;
           /**默认采用Executors中内部类DefaultThreadFactory 默认情况下会创建一个相同的线程组,
              这里这个this 把自身传入进去,这是实现线程复用的关键**/
            this.thread = getThreadFactory().newThread(this);
        }
        //这部分重写run方法,因为上面构造thread传入this,所以当thread进行start()方法时,会调用外层重写的这个run()
        public void run() {
            runWorker(this);
        }

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
         //因为我们刚开始将state状态置为-1,所以shutdown加锁会不成功
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }
        //这里后续shutdownNow方法会调用到
        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

四、方法

1、ThreadPoolExecutor

线程池的构造方法都是通过这个构造方法实现的

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

2、execute

提交任务,如果可以线程池可以开启新的任务,则执行,否则这个任务会放到阻塞任务队列,或者拒绝任务。

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
         //获取线程池状态以及数量 初始状态为线程池运行状态
        int c = ctl.get();
        /**在这里进行三步判断
        *   1.如果线程池中线程数量小于线程池核心线程数,则去尝试新增线程
        *  这里我们考虑个问题Q1 :为什么这里没有对线程池状态进行判断**/
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
         /**2.如果线程池处于运行状态并且可以将任务添加进入阻塞任务队列 
        *offer为各个队列实现的方法,后续文章讲解**/
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
           /**如果线程不是运行状态并且可以从阻塞队列移除任务(加锁操作)
            *这里我们考虑个问题Q2:如果线程池在上面一层判断是处于运行状态,且
           *已经成功加入到阻塞队列,中间调用shutdown方法,线程池状态变为关闭
           *状态,那么此时将从阻塞队列移除掉这个任务吗? **/
            if (! isRunning(recheck) && remove(command))
                reject(command);
           //当前线程池中工作线程数为0
            else if (workerCountOf(recheck) == 0)
             //这里因为上面已经将任务添加到阻塞任务队列,所以传null进入,等待线程轮询任务
                addWorker(null, false);
        }
        /**3.添加新增线程失败(对比maximumPoolSize ) 则执行拒绝策略**/
        else if (!addWorker(command, false))
            reject(command);
    }

关于问题Q2.我们可以做个例子验证下,在这个例子中我们要把ThreadPoolExecutor源码修改如下(采取将源码复制出来修改部分代码,后续会放上代码链接),具体如下

/**对于ThreadPoolExecutor类中execute方法修改如下,不想直接改源码,所以采用
*的将源码复制出来**/
if (isRunning(c) && workQueue.offer(command)) {
            System.out.println("第一次判断线程池状态是否运行状态:"+isRunning(c));
            //休眠一段时间,保证后续状态改变
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            int recheck = ctl.get();
            System.out.println("第二次判断线程池状态是否运行状态:"+isRunning(recheck));
            if (! isRunning(recheck) && remove(command)) {
                System.out.println("####任务被拒绝");
                reject(command);
            } else if (workerCountOf(recheck) == 0) {
                addWorker(null, false);
            }
        }

public class FixedThreadPoolDemo4 {

    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 5,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<Runnable>());

    static class ThreadDemo implements Runnable{

        private String threadName;

        public ThreadDemo(String name){
            threadName = name;
        }
        @Override
        public void run() {
            Date date = new Date();
            SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
            System.out.println("当前线程运行的是:"+ threadName + ",开始时间为:"+dateFormat.format(date));
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Date date1 = new Date();
            System.out.println("-----当前线程运行的是:"+ threadName + ",结束时间为:"+dateFormat.format(date1));
        }
    }

    static class AddThread implements Runnable{
        @Override
        public void run() {
            //往线程池添加任务进入
            for (int i=0;i<20;i++){
                System.out.println("-------------添加task-"+i+"开始");
                threadPoolExecutor.execute(new ThreadDemo("task-"+i));
                System.out.println("-------------添加task-"+i+"结束");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
             }
        }
    }

    static class UpdateThread implements Runnable{

        @Override
        public void run() {
            Date date = new Date();
            SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd :hh:mm:ss");
            System.out.println("执行线程池关闭开始时间:"+dateFormat.format(date));
            threadPoolExecutor.shutdown();
            Date date1 = new Date();
            System.out.println("执行线程池关闭关闭时间:"+dateFormat.format(date));
        }
    }
    public static void main(String []args) throws InterruptedException {
        Thread thread = new Thread(new AddThread());
        thread.start();
        Thread.sleep(6000);
        Thread thread1 = new Thread(new UpdateThread());
        thread1.start();
    }
}

因为我们在第二次读取线程池状态前进行了shutdown操作,所以导致线程池直接拒绝任务,结果如下(对于其中的空指针异常可以忽略,因为在复制出来的ThreadPoolExecutor类代码部分要reject()引用jre原来的类,为了编译通过将传递对象置为空)。

-------------添加task-0开始
-------------添加task-0结束
当前线程运行的是:task-0,开始时间为:2019-05-18 :02:25:10
-------------添加task-1开始
-------------添加task-1结束
当前线程运行的是:task-1,开始时间为:2019-05-18 :02:25:11
-------------添加task-2开始
-------------添加task-2结束
当前线程运行的是:task-2,开始时间为:2019-05-18 :02:25:12
-------------添加task-3开始
-------------添加task-3结束
当前线程运行的是:task-3,开始时间为:2019-05-18 :02:25:13
-------------添加task-4开始
-------------添加task-4结束
当前线程运行的是:task-4,开始时间为:2019-05-18 :02:25:14
-------------添加task-5开始
第一次判断线程池状态是否运行状态:true
执行线程池关闭开始时间:2019-05-18 :02:25:16
执行线程池关闭关闭时间:2019-05-18 :02:25:16
第二次判断线程池状态是否运行状态:false
####任务被拒绝
Exception in thread "Thread-0" java.lang.NullPointerException
-----当前线程运行的是:task-0,结束时间为:2019-05-18 :02:25:20
    at com.example.threaddemo.util.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:1859)
    at com.example.threaddemo.util.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:563)
    at com.example.threaddemo.util.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1138)
    at com.example.threaddemo.threadpool.FixedThreadPoolDemo4$AddThread.run(FixedThreadPoolDemo4.java:50)
    at java.lang.Thread.run(Thread.java:748)
-----当前线程运行的是:task-1,结束时间为:2019-05-18 :02:25:21
-----当前线程运行的是:task-2,结束时间为:2019-05-18 :02:25:22
-----当前线程运行的是:task-3,结束时间为:2019-05-18 :02:25:23
-----当前线程运行的是:task-4,结束时间为:2019-05-18 :02:25:24

关于execute方法流程图如下


image.png

3、addWorker

addWorker向线程池新增线程,返回true代表线程新增并且启动成功。
以下几种情况会添加失败

  • 1.线程池处于停止、整理、终止状态
  • 2.线程池状态为关闭状态 , firstTask 不为空或者workQueue为空
  • 3.线程池线程数大于核心线程数或者最大线程数 (core决定)
 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            /***  condition1
              ** 1.rs>=SHUTDOWN  线程池处于者停止、整理、终止、关闭状态
             **   2.! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty())
             **    这个表达式会为true是以下几种情况:
             **    --线程池处于停止、整理、终止状态  
             **    --  firstTask 不为空,线程池状态为 关闭状态   
             **    -- workQueue 为空,线程池状态为 关闭状态   
             **  整体表达式成立的话是以下三种情况中一种或者多种
             **    --线程池处于停止、整理、终止状态  
             **    --firstTask 不为空 ,线程池状态为关闭状态      
             **    --workQueue 为空,线程池状态为关闭状态   
             **  整体表达式不成立是以下几种情况之一
             **   --线程池处于运行状态
             **   --线程池处于关闭状态且firstTask =null且workQueue不为空
             **  我们之前在二、线程池状态及转换中简单介绍了线程池状态,对于可以
             **  添加到工作线程,只可能是线程是处于运行状态或者线程池关闭状态
             **对于关机状态而言,我们只能添加阻塞队列中的任务到工作线程,这也
            ** 印证了我们之前的问题 Q1
             ***/
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
             //判断是否超出工作线程最大值
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //CAS操作成功,则跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  
               //如果线程池状态发生改变,则去检查是否符合condition1
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //线程池处于运行中状态或者线程池处于关闭状态并且firstTask 为null
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) 
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();            
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                  //因为前面判断了核心线程数和最大线程数,所以只有有限个核心线程数或者最大线程数线程可以启动
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
               //处理失败线程
                addWorkerFailed(w);
        }
        return workerStarted;
    }

关于此方法流程图如下

image.png

4、runWorker

前面在内部类说了下,创建Worker时构建thread会将自身传入进去,所以当thread调用start方法,会调用Worker中的run方法,run方法调用runWorker()方法。也就是说我们创建了多少个线程(取决于corePoolSize、maximumPoolSize),就会执行多少次runWorker()方法。
我们提交给线程池的任务就是firstTask
这里我们强调一个概念,虽然我们提交给线程池的任务一般都是实现Runnable接口或者继承Thread类,但是我们没有start()这些实例,所以这些任务还是单纯的实例,并不是线程,我们执行任务的run()方法,和执行普通方法没有什么差别。这样我们才能在方法中轮询任务(如果不是这样的话,我们每次在方法中新启线程,对于新的线程我们是没法做到栈同步)。
换句话说,如果我们任务实现的Runnable接口包含一个test方法,如果下面源码中task.run(),换成task.test(),我们只需要在我们任务重写test方法中实现我们的逻辑,提交到线程池也是会正常执行的(这个例子有个前提是线程start方法也会调用test方法)。

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
               /**Thread.interrupted() 保证清除线程中断状态
                **这个表达式保证线程池处于运行或关闭状态 中断状态标记清除
                **线程池处于停止、整理、终止状态 线程中断
                **/
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                       //这个和执行普通方法没有什么区别,并不是线程start方法后需要调用 run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            //判断线程有没有发生异常
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

5、getTask

获取阻塞队列中的任务
这个方法中包含对keepAliveTime判断,如果当前阻塞队列没有任务,我们设置了keepAliveTime,则阻塞队列会在有限时间内做出返回,如果我们不设置,则会一直等待直到有新的任务加入(在此期间会一直执行在 workQueue.take(),导致线程中代码不往下执行,所以线程不会被回收),

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            /** 有2种情况会不通过,也就是我们上面说的 线程状态分为2种 大多数情况下
              **1.线程处于运行状态
              **2.线程处于关闭状态而且阻塞队列不为空*/
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);

            // 判断当前工作线程数大于核心线程数或allowCoreThreadTimeOut 
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
             // 当前工作线程数大于最大线程数
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    /**阻塞队列各自实现类实现  ,后续讲解(这些方法会在阻塞队列中移除掉任务)
                      *poll 如果当前队列为空,则等待keepAliveTime时间返回
                      *take队列为空,则一直等待
                      *这个是实现线程池工作线程空闲时间基础
                      * 如果我们没有设置keepAliveTime,则当任务执行完毕,线程池中线程* 
                      *会一直等待新的任务**/                     
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

6、processWorkerExit

当线程中没有任务时执行整理线程

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
      //线程发生异常,线程数减一
       if (completedAbruptly) 
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //统计任务完成数
            completedTaskCount += w.completedTasks;
           // 
           workers.remove(w);
        } finally {
            mainLock.unlock();
        }
       //整理线程池
        tryTerminate();

        int c = ctl.get();
        //线程池处于运行或者关闭状态去检查存活线程数量,维持一个空的线程
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return;
            }
            addWorker(null, false);
        }
    }

7、tryTerminate

    final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
             /**排除 1、线程池处于运行状态   2、线程池处于整理状态 
               *     3、线程池处于终止状态   4、线程池处于关闭状态但是阻塞队列不为空**/
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { 
               //尝试去中断工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
           //提供一些方法以供后续子类使用
            try {
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        ctl.set(ctlOf(TERMINATED, 0));
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
        }
    }

8、shutdown

这里我们看下前面说shutdown方法和shutdownNow方法区别

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查权限
            checkShutdownAccess();
          //线程池状态置为 关闭状态
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
           //子类实现
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

  private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
    }
//尝试去中断线程
  private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                /**注意这里调用内部类tryLock方法 ,因为我们刚开始将state状态置为-1,所 
                    *以shutdown加锁会不成功,直到队列中任务执行完毕,线程被删除**/
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }


9、shutdownNow

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
           //线程池状态置为停止
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

    private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }

    void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    //直接中断线程
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
 

示例代码地址https://github.com/endlesswork/thread-demo

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352