线程池核心类ThreadPoolExecutor源码解析

成员变量

BlockingQueue 任务阻塞队列
corePoolSize 核心线程数
maximumPoolSize 最大线程数
allowCoreThreadTimeOut 是否允许核心线程超时退出
keepAliveTime 非核心线程超时时间
ThreadFactory 线程池工厂,线程从该工厂中new
RejectedExecutionHandler 拒绝策略
AbortPolicy 直接抛出异常
DiscardPolicy 直接抛弃
DiscardOldestPolicy 抛弃最老的任务
CallerRunsPolicy 调用线程池线程去执行。
自定义 实现RejectedExecutionHandler,并自己定义策略模式

其中很重要的一个变量ctl
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
是原子整型变量,主要存线程池状态和工作线程数
高三位存储线程池状态,低29位存储工作线程数
private static final int RUNNING = -1 << COUNT_BITS; 运行态
private static final int SHUTDOWN = 0 << COUNT_BITS; 关闭
private static final int STOP = 1 << COUNT_BITS; 已停止
private static final int TIDYING = 2 << COUNT_BITS; 整理
private static final int TERMINATED = 3 << COUNT_BITS; 结束

线程池状态转换

未命名文件.png

初始是running状态,能够接受新提交的任务,同时也能够处理阻塞队列中的任务
调用shutdown方法,转换成shutdown状态,不接收新任务,只能够继续处理阻塞队列中的任务
调用shutdownnow方法,转换成stop状态,不接收新任务,同时也不能够继续处理新任务,中断线程。
调用shutdown 或者shutdownnow方法,线程池从running状态转换成shutdown状态或者stop状态后,会调用tryTerminate方法,该方法中,等所有线程执行完之后,线程池状态会转换成TIDYING 状态,随后会调用terminated方法,完成后
最后finally方法中线程池状态转换成TERMINATED 状态。
附上 tryTerminate 代码

final void tryTerminate() {
    for (;;) {
        int c = ctl.get();
        if (isRunning(c) ||
            runStateAtLeast(c, TIDYING) ||
            (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
            return;
        if (workerCountOf(c) != 0) { // Eligible to terminate
            interruptIdleWorkers(ONLY_ONE);
            return;
        }

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
// 等到线程池线程个数为0时转换成TIDYING状态
            if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                try {
                    terminated();
                } finally {
//最后转换成TERMINATED状态
                    ctl.set(ctlOf(TERMINATED, 0));
                    termination.signalAll();
                }
                return;
            }
        } finally {
            mainLock.unlock();
        }
        // else retry on failed CAS
    }
}

方法解析

execute

线程池入口在execute函数 主要流程如下:
1、新进来一个runnable 检查工作线程是否已经超过核心线程,如果没有,启动核心线程去执行addwokder方法
2、如果已经超过核心线程个数,runnable放到阻塞队列中去
3、如果阻塞队里已满,会启动非核心线程执行addwoker
4、如果启动非核心线程去执行失败,执行拒绝策略

核心代码如下

int c = ctl.get();
// 步骤1
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
// 步骤二
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 步骤三
else if (!addWorker(command, false))
// 步骤四
    reject(command);

addWorker

该函数主要功能是封装runnable为Woker,然后去执行。
先了解下woker,是ThreadPoolExecutor的内部类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
继承了aqs和Runnable
主要成员变量
/** Thread this worker is running in. Null if factory fails. /
final Thread thread; 工作线程
/
* Initial task to run. Possibly null. */
Runnable firstTask; 封装任务

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        //这里主要是一些线程池状态校验,和线程数校验
          int c = ctl.get();
          int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 获取工作线程个数,校验是否超过核心线程或者最大线程
                int wc = workerCountOf(c); 
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        //封装woker
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 这里查看线程池状态,如果是running状态可以添加worker,或者是SHUTDOWN 状态,不是新增加的任务(shuntdown状态不接收新任务,只接受阻塞队列任务)
                 //这种情况会在processWorkerExit方法中可见,具体看该方法解析
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 启动woker线程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 如果失败从workers中移除
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker

woker继承runnable接口,run方法的实现是该函数,即线程执行执行该方法

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 获取第一个任务
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 允许中断
    w.unlock(); // allow interrupts
    // 是否因为异常退出循环
    boolean completedAbruptly = true;
    try {
        // 如果task为空,则通过getTask从阻塞队列中来获取任务
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 如果是STOP状态,中断线程,不在继续执行。
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                //空函数,可被重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    //任务执行
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //空函数,可被重写
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }

总结一下runWorker方法的执行过程:
while循环不断地通过getTask()方法获取任务;
getTask()方法从阻塞队列中取任务;
如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态;
调用task.run()执行任务;
如果task为null则跳出循环,执行processWorkerExit()方法;
runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

getTask

该方法主要是从阻塞队列中来获取任务

private Runnable getTask() {
    // timeOut变量的值表示上次从阻塞队列中取任务时是否超时
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        /*
         * 如果线程池状态rs >= SHUTDOWN,也就是非RUNNING状态,再进行以下判断:
         * 1. rs >= STOP,线程池是否正在stop;
         * 2. 阻塞队列是否为空。
         * 如果以上条件满足,则将workerCount减1并返回null。
         * 因为如果当前线程池状态的值是SHUTDOWN或以上时,不允许再向阻塞队列中添加任务。
         */
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // timed变量用于判断是否需要进行超时控制。
        // allowCoreThreadTimeOut默认是false,也就是核心线程不允许进行超时;
        // wc > corePoolSize,表示当前线程池中的线程数量大于核心线程数量;
        // 对于超过核心线程数量的这些线程,需要进行超时控制 ,即非核心线程有超时限制
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        /*
         * wc > maximumPoolSize的情况是因为可能在此方法执行阶段同时执行了setMaximumPoolSize方法;
         * timed && timedOut 如果为true,表示当前操作需要进行超时控制,并且上次从阻塞队列中获取任务发生了超时
         * 接下来判断,如果有效线程数量大于1,或者阻塞队列是空的,那么尝试将workerCount减1;
         * 如果减1失败,则返回重试。
         * 如果wc == 1时,也就说明当前线程是线程池中唯一的一个线程了。
         */
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            /*
             * 根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null,针对非核心线程。
             * 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空。
             * 
             */
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            // 如果 r == null,说明已经超时,timedOut设置为true
            timedOut = true;
        } catch (InterruptedException retry) {
            // 如果获取任务时当前线程发生了中断,则设置timedOut为false并返回循环重试
            timedOut = false;
        }
    }
}

如果返回null runwoker方法会跳出循环 执行processWorkerExit方法

processWorkerExit

该方法是线程退出,修改线程池监控信息,包括增加完成任务数,减少工作线程数

private void processWorkerExit(Worker w, boolean completedAbruptly) {
    // completedAbruptly线程执行时是否发生异常。
    // 如果completedAbruptly值为true,则说明线程执行时出现了异常,需要将workerCount减1;
    // 如果线程执行时没有出现异常,说明在getTask()方法中已经已经对workerCount进行了减1操作,这里就不必再减了。  
    if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
        decrementWorkerCount();

    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //统计完成的任务数
        completedTaskCount += w.completedTasks;
        // 从workers中移除,也就表示着从线程池中移除了一个工作线程
        workers.remove(w);
    } finally {
        mainLock.unlock();
    }

    // 根据线程池状态进行判断是否结束线程池
    tryTerminate();

    int c = ctl.get();
    /*
     * 当线程池是RUNNING或SHUTDOWN状态时,如果worker是异常结束,那么会直接addWorker;
     * 如果allowCoreThreadTimeOut=true,并且等待队列有任务,至少保留一个worker;
     * 如果allowCoreThreadTimeOut=false,workerCount不少于corePoolSize。
     */
    if (runStateLessThan(c, STOP)) {
        if (!completedAbruptly) {
            int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
            if (min == 0 && ! workQueue.isEmpty())
                min = 1;
            if (workerCountOf(c) >= min)
                return; // replacement not needed
        }
        addWorker(null, false);
    }
}

tryTerminate

该函数是尝试去终止线程池

final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            /**
            * 如果是运行状态,退出返回
            * 如果是TIDYING状态,说明已经正在终止,不必继续尝试终止,返回
            * 如果是SHUTDOWN状态,但是阻塞队列不为空,说明还有些任务没执行完,返回。
            **/
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            if (workerCountOf(c) != 0) { // Eligible to terminate
                interruptIdleWorkers(ONLY_ONE);
                return;
            }

            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加锁设置状态TIDYING,
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        //结束动作,可重写
                        terminated();
                    } finally {
                        //设置最终状态TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        //唤醒termination Condition等待队列,相应的等待操作在awaitTermination函数
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // else retry on failed CAS
        }
    }

shutdown

该方法主要就是把线程池状态设置成SHUTDOWN,然后尝试终止线程池

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

shutdownNow

该方法主要就是把线程池状态设置成STOP,然后终止运行中线程,返回阻塞队列剩余任务

    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers();
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
    }

finalize

ThreadPoolExecutor重写了finalize方法,即在垃圾回收的时候会执行,调用shutdown方法,保证线程池垃圾回收时,线程池没有线程了

    protected void finalize() {
        SecurityManager sm = System.getSecurityManager();
        if (sm == null || acc == null) {
            shutdown();
        } else {
            PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
            AccessController.doPrivileged(pa, acc);
        }
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容