线程池工作原理

什么是线程池

Java中,使用线程来异步执行任务。Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建和销毁将消耗大量的计算资源。针对这种情况,我们需要使用线程池来管理线程,带来的好处有3个:
① 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
② 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
③ 提高线程的可管理性。线程是稀缺资源,不能无限制创建,否则不但会消耗资源,还会降低系统的稳定性,而使用线程池可以进行统一分配、调优和监控。而这些离不开对线程池原理的深入了解。

我想了解的东西

1.线程池是如何做到线程复用的
2.线程复用是怎么让线程不被回收而去等待执行下一个任务的(如CacheThreadPool可以让线程保留60s)

线程池构造方法与参数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

corePoolSize:核心线程数
maximumPoolSize:最大线程数
keepAliveTime:等待时间
unit:时间单位
workQueue:阻塞任务队列
handler:拒绝策略

当线程池被创建出来后,通过调用其execute(Runnable command)方法将继承了Runnable接口的任务提交到线程池中。
1.当任务数量不足corePoolSize时,直接创建线程执行任务即可。
2.当任务数量达到corePoolSize时,将任务放入阻塞队列等待执行。
3.当阻塞队列也满员了,这时候会尝试创建线程执行新任务,当前线程数量<maximumPoolSize,则创建线程成功,否则执行拒绝策略拒绝执行任务。
4.当一个线程执行完毕后会尝试从阻塞队列中读取新任务执行。(这里就是线程复用的关键)
5.keepAliveTime等待时间是当线程数量大于corePoolSize并且当前没有任务执行时生效,意义为非核心线程在没有任务执行时保持多久自动销毁。(核心线程一旦创建是不会被销毁的,但是线程池调用allowCoreThreadTimeOut(boolean)时,该参数对核心线程也会生效。)

java内置的线程池

java内置了一些线程池的实现通过工厂模式方便开发人员调用。通过Executors的静态方法,直接得到一些线程池的默认实现。

//CachedThreadPool
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();

内置的线程池如下
1.newSingleThreadExecutor

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

创建一个单线程的线程池。这个线程池只有一个线程在工作(核心/最大线程为1),也就是相当于单线程串行执行所有任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。

2.FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

创建固定大小的线程池(核心=最大=n)。每次提交一个任务就创建一个线程,直到线程达到线程池的最大大小。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。

3.CachedThreadPool

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

创建一个可缓存的线程池(无核心线程,最大线程几乎无穷,由于没有核心线程,都是从任务队列里拉去任务执行,我们今天源码就分析它)。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。

4.ScheduledThreadPool

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

创建一个定长线程池,支持定时及周期性任务执行。

源码分析

重头戏来了,为了这一刻查阅好多东西,结合源码来看,终于弄懂了。

线程池提交任务execute分析

我们创建线程池之后,想让它实际运行,第一步就是向线程池提交任务。通过execute方法。源码如下:

public void execute(Runnable command) {
// (1)如果任务为null,抛出NullPointerException
if (command == null)
throw new NullPointerException();
//(2)获取当前线程池的状态和线程池中运行的线程个数
int c = ctl.get();
//(3)判断当前运行的线程是否小于核心线程数?
if (workerCountOf(c) < corePoolSize) {
//(3.1)小于核心线程数,执行addWorker(command, true)方法,让worker去执行,成功执行则返回,没成功说明在并发状态下,其他线程执行了execute方法导致核心线程满了,我们再次更新c,即当前运行的线程数。
if (addWorker(command, true))
return;
c = ctl.get();
}
//(4)判断线程池的状态(主要看有没有被shundown),正常的runnable则添加任务到阻塞队列(因为(3)没有命中,说明此时核心线程满了)
if (isRunning(c) && workQueue.offer(command)) {
//(4.1)再次检查线程池状态,没啥好说的
int recheck = ctl.get();
//(4.2)如果当前线程池状态不是runnable了,从队列中删除任务,并执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//(5)(4也未命中说明阻塞队列也满了,这时候尝试创建新线程,调用addWorker方法,注意此时参数未false,代表此线程为非核心线程)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//(6)(5也未命中说明当前线程数已经达到最大线程数了,直接执行拒绝策略)
else if (!addWorker(command, false))
reject(command);
}

我们再进一步,进入addWorker方法中一探究竟,因为在execute就是用它来执行的创建线程执行任务的。
源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
//(1)第一层死循环,检查线程池状态,不为runnable返回false
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
//(2)第二层死循环,通过CAS操作增加线程个数
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
//CAS增加线程个数,同时执行的只有一个成功
                if (compareAndIncrementWorkerCount(c))
                    break retry;
//CAS失败了,先看线程池状态,变化了就跳到外层循环去检查目前线程池状态,未变化则重新进入下一轮循环去CAS增加线程池个数。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
//(3)到这里代表CAS成功了
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
//创建一个Worker(worker是任务的实际执行者,等会要重点分析的源码,注意它的属性),并将任务firstTask作为参数给它(firstTask就是execute提交来的command)
            w = new Worker(firstTask);
//获得worker里面的一个属性thread(它就是我们线程池里面的主角,线程池里的线程都是它)
            final Thread t = w.thread;
//如果线程不为空,直接用该线程执行任务
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
//(4)添加任务
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
//(5)执行线程,执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

t.start()方法调用了t的run,而run是执行了runWorker(Worker)方法。

public void run() {
            runWorker(this);
        }

不明白?往下看
这是我们的线程池的实际执行者类Worker的构造方法

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;//任务最终到了Worker的属性中
            this.thread = getThreadFactory().newThread(this);//创建一个线程(所以在addWorker中的t不会为空的,因为在构造方法中就新建了)
        }

最终是我们的执行者runWorker登场!

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
//firstTask给task
        Runnable task = w.firstTask;
//将worker中的firstTask设为空,为什么?往下看
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
    // (1)如果当前task为空(task执行完了),或从阻塞队列中读取为空(无任务),跳出while循环
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
//(2)真正的执行任务!为什么是run?start在那?还记得addWorker中得t.start吗,
//t就是worker中的那个thread属性啊!!这里的run当作普通方法调用。t只有那一个,
//所以是一个线程执行了所有该线程接到任务的run()!是不是复用了线程!
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

关于1和2在注释中都说完了,还有一点,为什么cachepool能让线程等待60秒再销毁(同样,为啥threadpool的核心线程在创建后能一直维持而不被销毁,我们的任务都执行完了啊。)答案就在1上,阻塞队列,我们在线程执行下一个提交任务的时候是看当前的task是否执行完了,还有阻塞队列中是否有任务!而阻塞队列是可以让线程阻塞在while的判断中!!大胆猜测,cacheThreadpool用读取任务方式是BlockingQueue中的poll(60,TimeUnit.SECONDS)!

至此,整个线程池的工作原理剖析完了,问题也说明了,舒服,之后会补充个线程池工作流程图。

补充:

源码中一直出现的int c = ctl.get();是在干什么?
要说明这个问题首先我们要看线程池想要记录什么,我们的源码一直在程序中判断,线程池有没有被shundown?有没有超过核心/最大线程数?
所以线程池要记录两个数据,线程个数,线程池状态。
ctl就是用来记录这两个数据的,它是一个integer的原子变量,32位2进制表示,其中高三位表示线程状态,低29位表示线程个数。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

111 running
000 shundown
001 stop
010 tidying
011 terminated
获取前三位:

private static int runStateof(int c)

获取后29位

private static int workerCountof(int c)

参考
https://www.jianshu.com/p/cb0f2dc087e3

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352