线程池你真的懂了吗

阅读任何源码,我们都应该带着几个问题去阅读,从源码中找出这些问题的答案,这样才能彻底搞明白某个知识点。

下面我们就带着这样几个问题,一起看一下ThreadPoolExecutor的源码

  1. 为什么要用线程池
  2. 为什么不推荐使用juc直接创建的线程池
  3. 线程池的几个核心参数
  4. 线程池是什么时候创建线程的?
  5. 线程池是如何重复利用线程的?
  6. 任务提交的顺序和执行的顺序是一样的吗?

1、为什么要使用线程池

这个其实可以写一个简单的程序去跑一下,比如使用线程池去跑1000个task和开1000个线程去跑这1000个task,线程池的效率会高出很多倍,原因是线程池能够重复利用线程,没有创建和销毁线程的开销。
其实池化的技术在很多地方都会用到比如数据库的连接池,字符串常量池,netty的对象池等等

2、为什么不推荐使用juc直接创建线程池的方式

我们找两个Exectors创建线程池的源码

a、newCachedTreadPool的源码

 public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

可以看到这里的最大线程数是0xffff个,这个最大线程数在大并发提交任务的情况下会创建大量线程,会导致CPU100%

b、newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

//看一下LinkedBlockingQueue的实现
 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }

会初始化一个容量为0xffff的队列,由于这个队列太大,如果我们提交的任务数很多并且自定义的线程里的对象又很大的话,就很容易发生oom的问题。

从这个工具类创建线程的参数我们可以看到底层调用的都是ThreadPoolExecutor的构造方法,所以我们建议根据具体业务的规模设置合适的线程池参数。

new ThreadPoolExecutor()

3、线程池的几个核心参数

ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) 

corePoolSize:最大线程数
maximumPoolSize:最大线程数
keepAiveTime:线程存活时间
TimeUnit:存活时间的参数
BlockingQueue:线程池的任务队列

task投递到线程池中的整个过程如下


image.png

看一下具体的代码

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();

        //这里判断线程数量是否小于corePoolSize,如过小于corePoolSize则直接创建worker线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //到这个if说明worker线程数量大于了corePoolSize了,这里直接添加到任务队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //到这个if说明任务加入队列失败,队列满了,则再创建线程worker线程,如果创建失败则执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

看到这里从宏观上我们就能看到整个task投递到线程池的一个过程,其实这里最主要的方法是addWorker,其实addworker里才是线程池的精华,里面有如何创建线程及start的逻辑,如何回收过期的线程,如何重复利用创建的线程去运行task的逻辑

4、线程池是什么时候创建线程的

线程池的线程不是线程池创建的时候创建的,线程池的线程是在调用addWorker方法,并且addWorker执行成功才会创建
下面我们一起分析一下addworker代码,这个方法很长,其实要看明白这个方法我们要先看一下Worker这个类,可以看到这个类实际上实现了Runnable接口,其实线程池中运行的runnable任务都会被包装成Worker对象

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
     
        private static final long serialVersionUID = 6138294804551838833L;
        //当前worker会绑定一个线程
        final Thread thread;
       
        //当前worker处理的第一个任务
        Runnable firstTask;
        volatile long completedTasks;
        //创建worker时就会生成 一个线程及赋值firstTask,
        //注意线程池的线程就是在这里被创建的
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            //注意看这里,这个线程创建的时候传的是this,也就意味着一会this.thread.start时,
            //执行的是worker对象的run方法
            //这个大家想一下,回味一下
            this.thread = getThreadFactory().newThread(this);
        }
}

下面在具体分析一下addWorker是如何start线程及重复利用线程运行任务的

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            //获取worker数量
            int c = ctl.get();
            //获取线程池状态
            int rs = runStateOf(c);

            // 如果线程池状态为SHUTDOWN就不接收任务了直接returnfalse
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //这里会判断一下worker数量
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        
        //如果代码运行到了这里,就表示线程池状态正常,任务达到了创建worker线程的条件
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //这里会创建一个worker对象,
            //结合上面的代码,worker对象中会包含一个线程和一个firstTask
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //下面这一部分是用来判断需不需要将worker线程进行缓存,给其他的任务使用
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //注意看这个workers对象,这是一个hashset,用来缓存创建好的worker对象
                        //注意在强调一下这个worker对象包含一个Thread引用及Runnable引用
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }

                //缓存好worker线程后,这里会执行线程的start逻辑
                //注意线程池的任务就是在这里开启start使任务真正进入runnable状态的
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

下面我们看一下start的具体逻辑:
刚刚我们已经看到了,thread是worker对象的一个属性,实际上创建线程时,传的参数就是worker对象本身,所以线程start执行的逻辑就是worker对象的run方法

//worker的run方法很简单,下面我们看一下runWorker方法
public void run() {
            runWorker(this);
        }

看完这个我们就基本上看完了线程池的核心逻辑了,但是还有一些细节没有仔细看,后面我会提到,留给大家思考

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取一下fisrtTask
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
           //这个task什么时候不为空?
           //注意这里会有一个隐含的问题,我们提交的任务存放的顺序是 核心worker->队列->最大线程worker
          //这里的getTask是从队列里获取的任务
          //这里task的执行顺序就变成了,核心worker->最大线程worker->队列
          //不知道大家有没有get到我的点,可以做一个实验就是为task编一个号,比如安1-10的顺序提交任务,最终执行的结果却是 1,2,3,4,8,9,10,5,6,7这种顺序,这里就是出现这种提交顺序和执行顺序不一样的原理
            while (task != null || (task = getTask()) != null) {
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //这里会最终调用task的run方法,到此线程池的创建到运行任务就看结束了
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {

            //这里会销毁线程,在队列为空的时候,会销毁线程,让线程数量停留在corePoolsize范围内,什么时候会执行到这,当task为空的时候,什么时候task为空,看getTask的逻辑,getTask会判断队列是否为空及活跃线程的时间来返回task的值,这里就不细讲了
            processWorkerExit(w, completedAbruptly);
        }
    }

5、线程池是如何重复利用线程的

看完上面的分析,其实就能回答这个问题了,当任务提交时会创建worker对象,这个对象里会有绑定一个线程,同时会将worker对象放入workers Set中,创建成功后,会立马调用worker.thread.start方法启动线程,这个线程的run方法进行了包装,首先判断fisrtTask是否为空如果不为空则直接运行,否则会while循环拿缓存队列中的任务,知道缓存队列为空,或者空闲线程超过了keepalive时间就会销毁线程,以保证线程维持在corePoolSize的大小

6、任务提交的顺序和执行的顺序是一样的吗?

不一样,提交顺序是 核心worker线程->队列->非核心worker线程,
但是执行的顺序却是核心worker任务->非核心worker任务->队列任务
如果能理解这个,线程池就真的理解的差不多了

7、其他

上面几个问题有些是我在看源码时的困惑点,有些是我看完源码之后的一些想法,除了这些问题外,线程池还有很多精妙的地方比如,
a、线程池的状态和核心线程数其实是用一个4个字节int表示的,为什么要这么表示
b、线程池中使用到的设计模式有哪些
c、线程池本身就是并发场景下提交任务的,那它自己的安全性是如何保证的,execute方法是如何保证安全性的

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容