线程池ThreadPoolExecutor源码解析(一)

1.线程池介绍

线程池是什么

基本思想还是一种对象池的思想,开辟一块内存空间,里面存放了众多(未死亡)的线程,池中线程执行调度由池管理器来处理。当有线程任务时,从池中取一个,执行完成后线程对象归池,这样可以避免反复创建线程对象所带来的性能开销,节省了系统的资源。

和线程池相关的一些问题

1.什么场景适合使用线程池,使用线程池有什么好处?
2.线程池有几种状态,它们之间又是怎么转换的?
3.线程池主要分为哪几类,有什么特点,适合什么场景。
4..线程池有哪些构造参数,分别有什么含义?
5.任务提交到线程池,经历了什么过程?
6.shutdown()和shutdownNow()方法有什么区别?

使用线程池有什么好处

线程池解决了两个不同的问题
1.通过执行大量的异步任务,避免频繁的创建和销毁线程,减少了每个任务的开销,提升了性能。
2.提供了限制和管理资源的手段,包括线程、执行任务时的消耗。

什么场景下适合使用线程池?

1.单个任务处理时间比较短
2.需要处理的任务数量很大

2.ThreadPoolExecutor

Java中的线程池是用ThreadPoolExecutor类来实现的.带着问题去看源码

一个重要的变量

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctl是一个AtomicInteger的变量,由两部分信息组成。workerCount runState
线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)。
为了把两部分信息包装到一个Integer类型里面,高3位保存runState,低29位保存workerCount。这样高三位就可以保存线程池的5个状态,且最大的线程数量(CAPACITY)为2^29-1.

部分重要的常量

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

线程池的5个状态。

RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
SHUTDOWN:关闭状态,不接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。
STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。
TIDYING:如果所有的任务都已终止了,有效线程数为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
TERMINATED:在terminated() 方法执行完后进入该状态。

那么这5个状态之间是怎么转换的呢?如图:
状态转换

一些包装和拆解ctl的方法

 // Packing and unpacking ctl
    // 获取线程池的运行状态;
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 获取线程池的活动线程数;
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    //根据运行状态和活动线程数拼装 ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

ThreadPoolExecutor构造方法

/**
     * Creates a new {@code ThreadPoolExecutor} with the given initial
     * parameters.
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     * @throws IllegalArgumentException if one of the following holds:<br>
     *         {@code corePoolSize < 0}<br>
     *         {@code keepAliveTime < 0}<br>
     *         {@code maximumPoolSize <= 0}<br>
     *         {@code maximumPoolSize < corePoolSize}
     * @throws NullPointerException if {@code workQueue}
     *         or {@code threadFactory} or {@code handler} is null
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

构造方法中的入参含义如下:
● corePoolSize:核心线程数量,即使是空闲的,也不会被回收。除非allowCoreThreadTimeOut被设置;
● maximumPoolSize:最大线程数量;
● unit keepAliveTime的单位;
● keepAliveTime:线程池维护线程所允许的空闲时间。如果线程池中的线程数量大于corePoolSize,那些超出的非核心线程不会立即销毁,而是会等待一定时间,直到等待的时间超过了keepAliveTime才会被销毁;
● workQueue:等待队列,当任务提交时,如果线程池中的线程数量大于等于corePoolSize的时候,把该任务放入等待队列;
● threadFactory:创建线程的工厂;
● handler:如果阻塞队列满了并且没有空闲的线程,这时如果继续提交任务,就需要采取一种策略处理该任务。
  线程池提供了4种策略:

  1. AbortPolicy:直接抛出异常,这是默认策略;
  2. CallerRunsPolicy:用调用者所在的线程来执行任务;
  3. CallerRunsPolicy:用调用者所在的线程来执行任务;
  4. DiscardPolicy:直接丢弃任务;

execute方法(任务的提交)

代码如下:

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

任务提交的流程如下:
1.如果workerCount < corePoolSize,新建核心线程,并把当前任务作为第一个任务,启动线程。
2.如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添加到该阻塞队列中;
任务添加队列成功之后,还需要做一个双重检查,因为线程池状态有可能在写入之后改变了,
      2.1 如果线程池不是RUNNING状态,就会去移除任务,当任务从队列中被移除成功,则直接拒绝任务。
      2.2 如果线程池是RUNNING状态,需要判断一下活跃线程数目是否为0,如果为0的话,则新建一个非核心线程。
      2.3如果线程池不是RUNNING状态且任务从队列中被移除失败,也会去判断一下活跃线程数目是否为0,如果为0的话,则新建一个非核心线程。
3.如果workerCount >= corePoolSize 且 workerCount < maximumPoolSize且线程池内的阻塞队列已满,则创建非核心线程并把当前任务作为第一个任务,启动线程。
4.如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

整体的大流程还是比较清晰的,double check那块可能会有点绕,看一下流程图可能会更加清晰:


image.png

未完待续

接下来会分析下Worker类相关的东西。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容