谈谈ThreadPoolExecutor的实现

概述

​ 线程作为系统稀缺资源,如果在应用中进行频繁的创建和销毁,会为我们的应用带来灾难性的体验,增大系统负荷,降低效率。池化技术为该问题的解决提供了一种有效的思路,通过建立一个线程池,每次线程的时候从池中取出一个空闲的线程,这样就省去了线程创建和销毁。java的线程池实现是在jdk1.5开始引入的,本文将对其中最常用的ThreadPoolExecutor的实现进行详细的介绍,系统可以通过本文了解到如何去实现一个线程池,并向Doug Lea大神致敬。

使用

​ 我们先看下面的线程池使用的例子,在该例子中我声明一个核心线程数是2,最大线程数是5,非核心线程线程存活时间1s,阻塞队列大小为1,拒绝策略为AbortPolicy,我们会输出程序执行过程中的线程池达到的最大线程数以及在所有任务执行结束后线程池中线程的数量。代码如下:

/**
 * Created by yuanqiongqiong on 2019/4/10.
 */
public class ThreadPoolExecutorTest {

    private static Logger LOGGER = LoggerFactory.getLogger(ThreadPoolExecutorTest.class);

    //声明一个线程池
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 5, 1,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue(1),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.AbortPolicy());

    public static void main(String []args) {
        for (int i = 0; i< 7;i++) {
            String runnableName = "test" + i;
            PersonRunnable personRunnable = new PersonRunnable(runnableName);
            try {
                threadPoolExecutor.execute(personRunnable);
            } catch (Exception e) {
                LOGGER.error("执行{}任务异常", runnableName, e);
            }
        }
        try {
            Thread.sleep(500);
            LOGGER.info("线程池当前线程数目 = {}", threadPoolExecutor.getPoolSize());
            Thread.sleep(2000);
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
        LOGGER.info("线程池中达到的最大线程数目 = {}", threadPoolExecutor.getLargestPoolSize());
        LOGGER.info("线程池当前线程数目 = {}", threadPoolExecutor.getPoolSize());
        LOGGER.info("线程池已经完成的任务数量 = {}", threadPoolExecutor.getCompletedTaskCount());
    }
    static class PersonRunnable implements Runnable {
        private String name;

        public PersonRunnable(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            LOGGER.info("我是" + name + "我在线程" + Thread.currentThread().getName());
            try {
                Thread.sleep(100);
            } catch (Exception e) {
                LOGGER.error("任务{}执行异常", Thread.currentThread().getName(), e);
            }
        }
    }
}

​ 输出结果如下:

20:22:26.571 [pool-1-thread-3] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test3我在线程pool-1-thread-3
20:22:26.571 [pool-1-thread-2] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test1我在线程pool-1-thread-2
20:22:26.571 [pool-1-thread-4] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test4我在线程pool-1-thread-4
20:22:26.571 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test5我在线程pool-1-thread-5
20:22:26.571 [pool-1-thread-1] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test0我在线程pool-1-thread-1
20:22:26.576 [main] ERROR com.meituan.campaign.ThreadPoolExecutorTest - 执行test6任务异常
java.util.concurrent.RejectedExecutionException: Task com.meituan.campaign.ThreadPoolExecutorTest$PersonRunnable@46f7f36a rejected from java.util.concurrent.ThreadPoolExecutor@421faab1[Running, pool size = 5, active threads = 5, queued tasks = 1, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at com.meituan.campaign.ThreadPoolExecutorTest.main(ThreadPoolExecutorTest.java:30)
20:22:26.681 [pool-1-thread-5] INFO com.meituan.campaign.ThreadPoolExecutorTest - 我是test2我在线程pool-1-thread-5
20:22:27.082 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池当前线程数目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池中达到的最大线程数目 = 5
20:22:29.084 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池当前线程数目 = 2
20:22:29.085 [main] INFO com.meituan.campaign.ThreadPoolExecutorTest - 线程池已经完成的任务数量 = 6

​ 由于我们代码设置了最大线程数是5个,并且阻塞队列大小是1,所以同一时间最多会有6个任务被执行,其中1个任务放在阻塞队列中。线程池达到的最大线程数目是5个,因为线程池设置了maximumPoolSize=5。非核心线程会在1s空闲后被回收,因此最终线程池线程数目还是2个。

实现分析

​ 抛开ThreadPoolExecutor,我们先想下实现一个线程池需要哪些成员变量,个人感觉以下变量是必不可少的:(1) 一个存放线程的容器或数组;(2) 一个队列用来在线程池线程不足是存放排队的任务;(3) 一个状态字段表示线程池的状态,用来表示线程池不同生命周期状态。下面,我们看下ThreadPoolExecutor的成员变量:

//表示线程状态和线程数,高三位代表线程状态,低29位代表线程数
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//值为29
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大线程数,大概为5亿,可以肯定不会达到这么多线程
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
//线程池处于运行状态可以接收新任务并执行任务队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
//该状态下线程池不再接收新任务,但是会把任务队列中的任务执行完成,调用shutDown()会进入该状态
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//该状态下线程池不接受新任务并抛弃任务队列中的任务中断所有正在执行的线程,调用shutDownNoW()会进入该状态
private static final int STOP       =  1 << COUNT_BITS;
//已经没有任务可以执行,会从SHUTDOWN和STOP状态变换为该状态
private static final int TIDYING    =  2 << COUNT_BITS;
//在执行完terminated()操作后会进入该状态
private static final int TERMINATED =  3 << COUNT_BITS;
//任务阻塞队列,存放排队任务
private final BlockingQueue<Runnable> workQueue;
//存放线程的hashset
private final HashSet<Worker> workers = new HashSet<Worker>();
//线程工厂,生成新线程
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//线程池核心线程数
private volatile int corePoolSize;
//线程池最大线程数
private volatile int maximumPoolSize;

​ 上述代码的注释给出了线程池各个状态的含义,我们看下各个状态之间的状态转换关系,具体如下:

(1) RUNNING -> SHUTDOWN:调用了shutdown()函数;

(2) (RUNNING or SHUTDOWN) -> STOP:调用了shutdownNow();

(3)SHUTDOWN -> TIDYING:当线程池线程为空并者任务队列为空;

(4)STOP -> TIDYING:当线程池线程为空;

(5)TIDYING -> TERMINATED:当调用了terminated()方法;

​ 如上示例,我们把一个任务放入线程池的execute()函数中,线程池会为我们选择一个线程来执行我们提交的任务。在这个选择线程的过程中,如果线程池中线程数量小于corePoolSize,那么将创建新线程执行任务;当线程池数量大于等于corePoolSize并且小于maximumPoolSize,线程池会把任务放到阻塞队列workQueue中直到workQueue满了去创建新线程;当线程池线程数量等于maximumPoolSize并且workQueue满时会执行拒绝策略。下面我们通过execute()函数的逻辑来理解上述过程:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //如果线程数小于核心线程数,那么创建一个新的线程来执行任务command
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程数大于等于核心线程数,线程数处于RUNNING状态(可以将任务加入阻塞队列)并且加入阻塞队列成功(即阻塞队列未满),那么任务就被加入阻塞队列等待空闲线程。
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            //再次检查线程池状态,如果不是RUNNING状态,从阻塞队列中移除任务,执行拒绝策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //线程处RUNNING状态并且线程数是0,则创建个空闲新线程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //如果线程数大于核心线程并且阻塞队列已满,则以maximumPoolSize为线程数最大值进行处理
        else if (!addWorker(command, false))
            //线程池中线程达到最大线程数并且阻塞队列已经满执行拒绝策略
            reject(command);
    }

​ 看到上面的代码逻辑,我们会发现主要的逻辑还是在addWorker里,这个函数主要功能就是为任务分配线程并执行,我们在看这块逻辑之前需要取看一个重要的Worker类。该类封装了线程及任务,可以在内部执行任务,具体定义如下:

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        private static final long serialVersionUID = 6138294804551838833L;
        //具体线程
        final Thread thread;
        //线程要执行的任务
        Runnable firstTask;
        //线程完成的任务数
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //调用线程工厂创建新的线程,threadFactory由我们的线程池构造函数传入,没有指定则使用默认的,这块会创建一个新的线程
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }
        //可以看出Worker实现了AQS,其本身也是不可重入锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

​ 思考为什么Worker要继承AQS实现一个独占锁?这个问题我们后面分析。

​ 了解了worker的构成,我们就可以具体看下addWorker函数的执行逻辑了,具体如下:

//core为true,那么创建线程是以corePoolSize作为线程数最大值,否则以maximumPoolSize作为线程数最大值
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
          
            // 线程状态是非RUNNING状态不再进行任务提交处理,其中SHUTDOWN状态下已经提交进行任务和阻塞队列         中的任务要继续处理
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //线程池中线程大于最大线程数或者大于要求的阈值,返回失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //符合要求后,CAS增大线程数,跳出自旋,走下面的线程创建逻辑
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;//标记线程是否启动
        boolean workerAdded = false;//标记线程是否添加成功
        Worker w = null;
        try {
            //创建新的线程并封装为一个Work对象
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    //对线程池创建的线程状态进行检查
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //如果新线程检查成功,将新线程加入workers中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //更新全局变量,线程池达到的最大线程数,该值可以输出作为线程池参数设定的指标
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //这块重要了,线程创建成功后,开始执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

​ 我们继续跟着上述代码思路走,看下任务如何执行,t.start()的会调用Worker类run()方法,而该方法会调用runWorker来从任务队列中获取任务,执行任务,具体看下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果Worker中创建时存在任务,则执行;否则,调用getTask从阻塞队列中获取任务,当阻塞队列中没有任务并且线程不应该被回收时,线程会一直阻塞等待获取任务,具体在getTask方法中分析
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                // 上面的英文注释很清楚了,这块为了处理调用shutdownNow时需要停止所有的线程
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //这个函数可以自己实现,默认为空
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //执行具体任务的业务逻辑
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                       //这个函数可以自己实现,默认为空
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //执行线程销毁过程
            processWorkerExit(w, completedAbruptly);
        }
    }

​ 在getTask方法中, Worker线程会一直循环的从阻塞队列中获取任务,直到遇到以下情况会返回null,进而执行上面的线程销毁过程processWorkerExit:

(1) 线程池状态为SHUTDOWN并且任务队列为空;

(2) 线程数状态变大于SHUTDOWN (STOP TIDYING TERMINATED);

(3) 线程池线程数大于最大线程数或者线程超时未获取任务的情况下,任务队列为空或者工作线程数大于1;

这块逻辑具体代码如下:

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 线程池状态为STOP或者(状态为SHUTDOWN&&任务队列为空),这个时候无需在执行任务
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);
            //设置了允许核心线程超过keepAliveTime空闲后销毁线程 或者 线程数大于核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
               //从阻塞队列中获取任务,如果进行超时控制,则调用poll方法,否则调用take一直阻塞到队列中有任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

​ 以上就是线程池中任务执行的大致过程,接下来我们对线程池结束及其中实现的一些细节进行分析。

原文

袁琼琼的技术博客,欢迎指针
http://yuanqiongqiong.cn/2019/04/10/%E8%B0%88%E8%B0%88ThreadPoolExecutor%E7%9A%84%E5%AE%9E%E7%8E%B0/

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352