ThreadPoolExecutor 源码分析

Integer.SIZE = 32
COUNT_BITS = Integer.SIZE - 3 = 32 - 3 = 29

为了方便做如下假设
Integer.SIZE = 10 则
COUNT_BITS = 10 - 3 = 7

CAPACITY = (1 << COUNT_BITS) - 1
CAPACITY: 0001111111

RUNNING: 1111111111 -1<< 7 => 1110000000
SHUTDOWN: 0000000000 0 << 7 => 0000000000
STOP: 0000000001 1 << 7 => 0010000000
TIDYING: 0000000010 2 << 7 => 0100000000
TERMINATED: 0000000011 3 << 7 => 0110000000

runStateOf 线程池的运行状态,获取高三位
c & ~capacity
1110000000 running
& 1110000000
=
1110000000

workerCountOf 工作线程的容量,获取后7位
c & CAPACITY
1110000000
& 0001111111
=
0001111111

通过上面的内容可以看到高三位作为状态控制位,后面的作为capacity容量大小

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int ctlOf(int rs, int wc) { return rs | wc; }
|: 两个位只要有一个为1,那么结果就是1,否则就为0

ctl
1110000000
| 0000000000
= 1110000000 保留高3位的线程池的运行状态

The main pool control state, ctl, is an atomic integer packing two conceptual fields
 workerCount, indicating the effective number of threads
 runState,    indicating whether running, shutting down etc

ctl有两个作用, workerCount 有效的线程数, runState 运行状态running,shutting,STOP,TIDYING,TERMINATED

状态解释说明
The runState provides the main lifecycle control, taking on values:

RUNNING: Accept new tasks and process queued tasks
SHUTDOWN: Don't accept new tasks, but process queued tasks
STOP: Don't accept new tasks, don't process queued tasks,
and interrupt in-progress tasks
TIDYING: All tasks have terminated, workerCount is zero,
the thread transitioning to state TIDYING
will run the terminated() hook method
TERMINATED: terminated() has completed

ThreadPoolExecutor 核心参数

ThreadFactory线程工厂类,创建工作线程
RejectedExecutionHandler 拒绝策略类, 默认为AbortPolicy,其他还有CallerRunsPolicy,DiscardPolicy,DiscardOldestPolicy
keepAliveTime 线程的空闲时间,超过空闲时间,线程会被回收
allowCoreThreadTimeOut 允许核心线程超时,默认core线程不会被回收,当为true时,核心线程的Idle时间超过keepAliveTime也会被回收
corePoolSize 核心线程数
maximumPoolSize 最大线程数

线程池的工作原理

If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing.
If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread.
If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize,
 in which case, the task will be rejected.

如果正在运行的线程数少于corePoolSize,Executor添加新线程,而不是排队。
如果大于等于corePoolSize大小的线程数正在运行,Executor对请求进行排队,而不是添加新线程。
如果无法将请求放入队列中,则将创建一个新线程,除非创建的线程数超过了maximumPoolSize大小,在这种情况下,该任务将被拒绝。

ThreadPoolExecutor.execute(Runnable command)执行流程分析

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //如果当前worker数量小于corePoolSize,尝试直接创建一个worker,创建成功后直接返回
            if (addWorker(command, true))
                return;
            //如果创建失败,重新获取ctl的值
            c = ctl.get();
        }
        //如果当前worker大于等于corePoolSize ,ThreadPoolExecutor状态是运行中,将task添加到queue中
        if (isRunning(c) && workQueue.offer(command)) {
            //重新检查ctl,
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                //当线程池不处于运行中状态,而且task被添加到队列中了,则使用拒绝策略来处理添加的task
                reject(command);
            else if (workerCountOf(recheck) == 0)
                 //当当前的worker数量为0,创建一个新的worker,firstTask为null,
                addWorker(null, false);
        }
       //当当前worker数量大于等于corePoolSize,并且queue队列已经满了,则直接创建新的worker
        else if (!addWorker(command, false))
             //如果线程池不是running,或者当前的worker的数量超过了maximumPoolSize
            reject(command);
    }

ThreadPoolExecutor.addWorker(Runnable command)执行流程分析

//core: 如果为true,以corePoolSize作为边界,否则以maximumPoolSize作为边界
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);
            //线程池的状态不是running,返回false
            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                 //worker count超过了边界,返回false
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //创建一个worker,在worker中会使用ThreadFactory创建一个线程
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                       //将worker添加到workers Set集合中
                        workers.add(w);
                        int s = workers.size();
                        //记录线程池的最大数量
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //调用Thread.start方法执行执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker

//Worker继承AQS,所以Worker是线程安全的,
//Worker实现Runnable接口,作为参数传递给getThreadFactory().newThread()方法,当Thread.start()时,会执行Worker的run方法
private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        final Thread thread;
        Runnable firstTask;
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            setState(-1); 
            this.firstTask = firstTask;
            
            this.thread = getThreadFactory().newThread(this);
        }

        public void run() {
            runWorker(this);
        }
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            //如果firstTask不为null,或者从队列中取到了task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                try {
                   //钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                       //task就是我们调用execute(Runnable command)提交的任务
                        task.run();
                    } finally {
                        //钩子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
           //处理worker退出逻辑,主要是将worker从workers移除,中断worker相关的thread,worker count -1
            processWorkerExit(w, completedAbruptly);
        }
    }

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut为true或者wc大于corePoolSize核心线程数
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            //如果worker count大于maximumPoolSize, 并且worker queue为空, 返回null task,
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //如果timed为true,poll超时获取,如果超时,返回null
               //如果timed为false, (allowCoreThreadTimeOut为false, worker count小于corePoolSize),阻塞获取
               //从而保证小于corePoolSize的worker不会清理掉
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

核心方法是: addWorker,runWorker,getTask.

通过上面的分析可以回答一下几个问题

  1. 如果worker count大于corePoolSize,则通过workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS)超时获取task方法,如果返回的是null,则执行processWorkerExit方法做worker的清理工作
    2、如果worker count <= corePoolSize并且allowCoreThreadTimeOut为false,通过workQueue.take()方法阻塞获取task,从而不让processWorkerExit方法执行达到worker的保活的目的
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342