JAVA并发(13)— ThreadPoolExecutor的实现原理(源码分析)

测试代码

/**
 * @program: springbootclient2
 * @description: 自定义线程池,扩展任务
 * @create: 2020-02-27 10:46
 */
@Slf4j
public class MyThreadPoolExecutor extends ThreadPoolExecutor {

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    //装饰线程池中任务
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        //打印线程信息(当前线程是什么?)
        log.info("执行前的方法..." + Thread.currentThread());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        log.info("执行后的方法..." + Thread.currentThread());
    }
}
/**
 * @program: springbootclient2
 * @description: 探索Java线程池的实现原理
 * @create: 2020-02-26 10:33
 */
@Slf4j
public class TestThreadPoolExecutor {

    public static void main(String[] args) {
        //创建了一个线程池
        ThreadPoolExecutor pool =
                new MyThreadPoolExecutor(1, 3,
                        200, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
        //打印日志(核心线程执行)
        pool.execute(() -> {
            try {
                Thread.sleep(1000 * 200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            log.info("日志打印...哈哈哈");
        });
        //再次打印日志(阻塞队列执行)
        pool.execute(() -> {
            log.info("日志再打印...");
        });
        //再次打印日志(最大线程执行)
        pool.execute(() -> {
            log.info("日志再打印...");
        });
    }
}

实现原理

用户向线程池提交一个任务(实现Runnable接口)后

  1. 若小于核心线程数,那么直接开启一个线程执行;
  2. 若大于核心线程数,则将任务放入阻塞队列中;
  3. 若阻塞队列满了,则会使用最大线程数,继续开启线程执行任务;
  4. 若最大线程数满了,那么采取拒绝策略去拒绝任务;

而执行任务的流程:创建Worker线程,将传入的任务交给Worker线程去执行,而Worker线程是ThreadFactory产生的,若传入的任务为null,则会通过getTask()去workQueue中获取任务去执行。

源码中的细节:

  • 维护了一个AtomicInteger的ctl,前3位标识线程池的状态,后29位表示线程池中线程的数量。使用CAS来修改参数;
  • Worker线程使用装饰器的模式增强传入的任务,并预留两个钩子方法扩展。
  • 使用HashSet来保存Worker线程;
  • 核心线程数存活时间的实现:使用CAS+自旋,首先去队列中获取元素,若超时后,修改标识位。再次循环,若线程池中存在线程且任务队列中没有任务,那么通过CAS修改ctl中当前线程的数量,跳出方法去销毁线程。

源码分析

ThreadPoolExecutor关键属性

//存放当前运行的worker数量以及线程池状态
//int是32位的,这里把int的高3位拿来充当线程池状态标识位,后29位拿来充当当前运行的worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//存放工作线程(worker)的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//核心线程数量
private volatile int corePoolSize;
//最大线程的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
public void execute(Runnable command) {  
    if (command == null)  
        throw new NullPointerException();  
    int c = ctl.get();  
   //workerCountOf(c)会获取当前正在运行的worker数量。
    if (workerCountOf(c) < corePoolSize) {  
      //如果小于核心线程数,创建一个worker然后直接执行该任务。
        if (addWorker(command, true))  
            return;  
        c = ctl.get();  
    }  
    //当核心线程数满的时候,会执行该判断,判断线程池的状态为运行态时
    //会将任务放入到queue中(若queue满了,会返回false)
    if (isRunning(c) && workQueue.offer(command)) {  
        int recheck = ctl.get();  
        if (! isRunning(recheck) && remove(command))  
            reject(command);  
        else if (workerCountOf(recheck) == 0)  
            addWorker(null, false);  
    }  
    //使用最大线程去执行,若执行失败,返回false,将执行拒绝策略
    else if (!addWorker(command, false))  
        reject(command);  
}  

根据传入的任务,创建worker工作线程,并运行:

//firstTask是传入线程池的任务,core是使用核心线程去执行,还是最大线程去执行
private boolean addWorker(Runnable firstTask, boolean core) {  
    retry:  
    for (;;) {  
        //先获取线程池的状态
        int c = ctl.get();  
        //rs即线程池的状态(ctl的前3位表示)
        int rs = runStateOf(c);  
  
        // 如果线程池是关闭的(SHUTDOWN=0),或者workQueue队列非空,直接返回false,跳出方法
        if (rs >= SHUTDOWN &&  
            ! (rs == SHUTDOWN &&  
               firstTask == null &&  
               ! workQueue.isEmpty()))  
            return false;  
  
        for (;;) {  
            //获取worker的数量
            int wc = workerCountOf(c);
           //第一个判断是:wc>=536870911  
           //根据入参core,判断当前线程与核心线程数/最大线程数去比较。
            if (wc >= CAPACITY ||  
                wc >= (core ? corePoolSize : maximumPoolSize))  
                return false; 
           // 尝试修改ctl的workerCount的值(+1),这里使用的是CAS,如果失败,继续下一次重试,直到获取成功为止。
            if (compareAndIncrementWorkerCount(c))  
              //如果设置成功就跳出外层的for循环
                break retry;  
           //重读一次ctl,判断如果线程池的状态改变,会重新循环一次。
            c = ctl.get();  // Re-read ctl  
            if (runStateOf(c) != rs)  
                continue retry;  
        }  
    }  
    //执行到此处时,线程的数量+1,但实际上未开启新线程,下面是创建新worker线程。
    boolean workerStarted = false;  
    boolean workerAdded = false;  
    Worker w = null;  
    try {  
        //创建一个worker,将提交上来的任务直接交给worker。
        w = new Worker(firstTask); 
        //获取Worker线程中的thread对象。 
        final Thread t = w.thread;  
        if (t != null) {  
            final ReentrantLock mainLock = this.mainLock;  
            //新增Worker线程并存入HashSet中(加锁,该动作是串行画的)
            mainLock.lock();  
            try {  
               //获取线程池的状态
                int rs = runStateOf(ctl.get());  
               //线程池没有中断或者线程池已经中断,但是线程还持有任务均往下执行(否则该方法不会做操作)。
                if (rs < SHUTDOWN ||  
                    (rs == SHUTDOWN && firstTask == null)) {  
                    if (t.isAlive()) //如果worker的线程线程已经启动,抛异常
                        throw new IllegalThreadStateException();  
                   //添加新建的worker到HashSet<Worker>中
                    workers.add(w);  
                    int s = workers.size();  
                    //更新历史worker数量的最大值(和Worker Set容量进行对比)
                    if (s > largestPoolSize)  
                        largestPoolSize = s;  
                    //设置新增标志位
                    workerAdded = true;  
                }  
            } finally {  
                mainLock.unlock();  
            }  
           //如果worker是新增的,就启动该线程。
            if (workerAdded) {  
                t.start();  
                //成功启动线程,设置对应的标志位
                workerStarted = true;  
            }  
        }  
    } finally {  
        //启动失败,就会触发相应的方法。
        if (! workerStarted)  
            addWorkerFailed(w);  
    }  
    return workerStarted;  
}  

2. Worker结构

Worker是ThreadPoolExecutor内部定义的一个内部类

//实现了Runnable接口,所以可以作为线程使用
private final class Worker  
    extends AbstractQueuedSynchronizer  
    implements Runnable  
{  

    private static final long serialVersionUID = 6138294804551838833L;  
   //运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker。
    final Thread thread;  
    //当一个worker刚创建时,就会尝试执行这个任务
    Runnable firstTask;  
    //记录完成任务的数量
    volatile long completedTasks;  
  
    Worker(Runnable firstTask) {  
        setState(-1); // inhibit interrupts until runWorker  
        this.firstTask = firstTask;  
         //创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行这个worker。
        this.thread = getThreadFactory().newThread(this);  
    }  
    //使用装饰器的模式去扩展了run()方法。
    public void run() {  
       //调用了ThreadPoolExecutor的runWorker方法;
        runWorker(this);  
    }  
  ...
}  

当worker工作线程执行run方法时,实际上会执行该方法:
worker会判读自己是否持有任务,若未持有任务,会通过getTask()方法去workQueue中获取任务(Runnable任务)

final void runWorker(Worker w) {  
    //获取到当前线程
    Thread wt = Thread.currentThread();  
    //获取到worker中的任务
    Runnable task = w.firstTask;  
    w.firstTask = null;
    //执行unlock方法,允许其他线程来中断自己  
    w.unlock(); // allow interrupts  
    boolean completedAbruptly = true;  
    try {  
        //如果前面firstTask有值,那么直接执行这个任务;
        //如果没有具体的任务,就执行getTask()方法从队列中获取任务;
        while (task != null || (task = getTask()) != null) {  
        //执行任务前先锁住,这里的作用就是给shutdown()判断是否有worker在执行中。
        //shutdown方法会尝试给线程加锁,如果线程在执行,就不会中断它。
            w.lock();  
           //详见下面分析
            if ((runStateAtLeast(ctl.get(), STOP) ||  
                 (Thread.interrupted() &&  
                  runStateAtLeast(ctl.get(), STOP))) &&  
                !wt.isInterrupted())  
                wt.interrupt();  
            try {  
               //执行任务前被调用,Spring预留的方法,可扩展
                beforeExecute(wt, task);  
                Throwable thrown = null;  
                try {  
                    //正常调用run()方法。
                    task.run();  
                } catch (RuntimeException x) {  
                    thrown = x; throw x;  
                } catch (Error x) {  
                    thrown = x; throw x;  
                } catch (Throwable x) {  
                    thrown = x; throw new Error(x);  
                } finally {  
                    //执行任务后调用,预留的方法,可扩展
                    afterExecute(task, thrown);  
                }  
            } finally {  
                task = null;  
                //记录完成的任务数量
                w.completedTasks++;  
                w.unlock();  
            }  
        }  
        completedAbruptly = false;  
    } finally {  
        processWorkerExit(w, completedAbruptly);  
    }  
}  

线程的中断:

  • stop方法:是一个过时的方法,不应该在使用这种方法去中断线程;
  • isInterrupted方法:是Thread类的普通方法,会返回调用方法的线程类状态。
  • interrupted方法:是Thread类的静态方法,返回的是调用方法的线程的状态,interrupted方法会清除线程的中断状态。
  • interrupt方法:用来中断线程,也就是将中断标志设置为true的方法;

所谓的中断,只是将中断标识设置了下,并没有真正的中断线程的运行,一般我们需要自己检查线程的中断状态,并设计如何中断。

方法sleep()wait以及join会对中断标识有所处理,当线程中断标识为true时,会抛出异常。

关闭线程池的方法:

  • shutdown方法:告诉线程池拒绝接受新的任务,但是已经开始执行的以及进入队列中的任务将会完成执行;
  • shutdownNow方法:也是告诉线程池拒绝新的任务,但是它会试图将已经开始执行的任务以及队列中的任务取消。这种取消是通过中断线程来实现的。也就是说我们的任务中没有针对线程中断做处理的情况下,在实际的使用体验上,shutdownNow与shutdown效果是相同的。

当调用shutdownNow()方法时,线程池会变为stop状态。

  1. runStateAtLeast(ctl.get(), STOP)为true(即线程池被shutdownNow),那么如果线程没有中断,确保线程被中断

  2. runStateAtLeast(ctl.get(), STOP)为false(线程池没有被shutdownNow)。在线程没有被中断的情况下,不去中断线程。当然这种情况需要重新检查shutdownNow的风险,当清理中断线程时。

代码分析:

if ((runStateAtLeast(ctl.get(), STOP) ||
    (Thread.interrupted() &&
     runStateAtLeast(ctl.get(), STOP))) &&
    !wt.isInterrupted())
     //中断worker线程。
    wt.interrupt();

在阻塞队列中获取任务

在上面的java.util.concurrent.ThreadPoolExecutor#runWorker方法中task的获取是Worker中的firstTask属性或者getTask()方法来完成获取的。

private Runnable getTask() {  
    boolean timedOut = false; // Did the last poll() time out?  
  
    for (;;) {  
        int c = ctl.get();  
       //获取到线程池的状态
        int rs = runStateOf(c);  
  
        //如果返回null,那么上面runWorker()方法会跳出while循环,然后执行销毁worker线程。
        //SHUTDOWN:表示执行了shutdown()方法;
        //STOP:表示执行了shutdownNow()方法;
        //如果执行了shutdown方法且workQueue为空,那么ctl线程数量-1;
        //如果执行了shutdownNow方法,那么ctl线程数量也-1;
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {  
            decrementWorkerCount();  
            return null;  
        }  
        //获取当前正在运行中的worker数量
        int wc = workerCountOf(c);  
  
        // 是否允许核心线程超时或者当前运行的线程数超过了核心线程数。
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;  
        //timeOut默认false。若配置keepTimeOut并未获取到任务时,会置为true。此时,若存在工作线程,且队列为null,那么就销毁该线程。
        if ((wc > maximumPoolSize || (timed && timedOut))  
            && (wc > 1 || workQueue.isEmpty())) {  
            //通过CAS来设置workerCount,如果存在多个线程竞争,只有一个可以设置成功。
           //如果没有设置后才能给,就进入下一次循环。
            if (compareAndDecrementWorkerCount(c))  
                return null;  
            continue;  
        }  
  
        try {  
            //在workQueue中取任务,poll方法存在等待的超时时间keepAliveTime,但是在规定时间内没有在阻塞队列中获取任务,那么timedOut会被置为true。
           //take()方法会一直等待。
            Runnable r = timed ?  
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :  
                workQueue.take();  
            if (r != null)  
                return r;  
            //如果r为null,就设置timedOut为true(注意,方法并未跳出,开始自旋);
            timedOut = true;  
        } catch (InterruptedException retry) {  
            timedOut = false;  
        }  
    }  
}  

推荐阅读

Java线程池实现原理详解

历史文章

JAVA并发(1)—java对象布局
JAVA并发(2)—PV机制与monitor(管程)机制
JAVA并发(3)—线程运行时发生GC,会回收ThreadLocal弱引用的key吗?
JAVA并发(4)— ThreadLocal源码角度分析是否真正能造成内存溢出!
JAVA并发(5)— 多线程顺序的打印出A,B,C(线程间的协作)
JAVA并发(6)— AQS源码解析(独占锁-加锁过程)
JAVA并发(7)—AQS源码解析(独占锁-解锁过程)
JAVA并发(8)—AQS公平锁为什么会比非公平锁效率低(源码分析)
JAVA并发(9)— 共享锁的获取与释放
JAVA并发(10)—interrupt唤醒挂起线程
JAVA并发(11)—AQS源码Condition阻塞和唤醒
JAVA并发(12)— Lock实现生产者消费者

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,417评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,921评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,850评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,945评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,069评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,188评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,239评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,994评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,409评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,735评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,898评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,578评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,205评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,916评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,156评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,722评论 2 363
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,781评论 2 351