看了这篇文章,还不理解线程池执行流程,过来找我要钱

实际编程中,频繁创建和销毁线程开销很大,所以一般使用线程的方式是线程池。

很方便的,java给我们提供了现成的线程池创建函数ThreadPoolExecutor,这个创建函数也成了不少公司面试必考题,当然,要想彻底理清线程池执行过程,需要剖析源码,这里我们就来仔细分析分析。

首先是线程池创建函数。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

可以把线程池类比成一个小公司,公司有少量正式员工,执行平时的一些工作量。如果工作量太大,正式员工忙不过来,会雇佣部分外包人员。
原则上,如果任务量突然变多,先把任务临时缓存起来,等正式员工有空闲时,交由正式员工由处理(节约成本嘛)。如果缓存队列满了,这时候就要考虑找外包人员了。
公司总预算有限,所以正式员工数量是固定的,且雇佣的外包人员也有最大人数限制。
如果工作量变少,为了节约成本,就要释放部分外包人员。
如果工作量实在太大了,正式员工、外包人员也达到最大预算人数,且所有人都在拼命完成工作任务,这时候,就要拒绝一部分任务了。

接下来这几个参数就好理解了。

  • corePoolSize: 核心线程数量,可以类比正式员工数量,常驻线程数量。
  • maximumPoolSize: 最大的线程数量,公司最多雇佣员工数量(包含外包人员)。常驻+临时线程数量。
  • workQueue:任务等待队列,所有的正式员工都在处理任务,再来任务就先放到队列吧,队列如果也满了,那就要找外包了。
  • keepAliveTime:非核心线程空闲时间,就是外包人员等了多久,如果还没有活干,就被解雇了。
  • threadFactory: 创建线程的工厂,在这个地方可以统一处理创建的线程的属性。比如每个员工的名字,工号不一致,方便区分和安排任务。
  • handler:线程池拒绝策,什么意思呢?就是当任务实在是太多,人也不够,需求池也排满了,还有任务咋办?默认是不处理,抛出异常告诉任务提交者,我这忙不过来了。

我们使用线程池的时候,一般是直接调用execute,提交一个任务,对应到线程池里是怎么处理的呢?

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {  //关键步骤1
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {  //关键步骤2
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))  //关键步骤3
        reject(command);
}

首先得理解这个ctl是什么意思?

线程池里为了充分利用int型的每一位,使用一个AtomicInteger的ctl来记录线程池中线程的数量及当前线程池的状态。低29bit位表示线程池中线程的数量,高3bit位用来记录线程池的状态是RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED中的一种。

添加任务步骤

  • 获取线程池中线程数量。

  • 线程数量小于核心线程数吗。

    • 小于核心线程数,添加一个核心线程,添加成功的话直接返回。
    • 如果添加核心线程失败,因为是多线程同时执行,再获取一遍线程池中线程数量,继续下一步。
  • 线程数量大于等于核心线程数或上面添加核心线程失败。

    • 线程池还在运行且添加任务到任务队列成功。
      • 重新检查线程池是否还在运行
        • 线程池不在运行,且从任务队列删除任务成功,拒绝该任务。
        • 线程池在运行,但线程池中没有线程(核心线程数也可以设置成0),添加一个非核心线程。
      • 线程池不在运行或添加任务到任务队列失败
        • 尝试添加非核心线程去处理该任务。
        • 如果添加非核心线程失败,拒绝该任务。

整个过程有点绕,可以对比图形,再理解一遍。

在这里插入图片描述

添加任务最关键的函数就是addWorker。

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

要理解这个问题,首先要知道java里的标签是怎么用的,有点类似goto的意思,可以参考:https://blog.csdn.net/chanllenge/article/details/90266538

addWorker的核心思想是

  • 添加线程时时,区分核心和非核心线程,并可指定该线程的第一个处理任务。
  • 判断线程池状态及工作队列,线程数量等参数的合法性。
  • 通过CAS自旋方式,增加线程数量。
  • 加悲观锁,并结合参数合法性,添加一个线程worker,到线程队列workers。
  • 线程添加成功且正常启动,返回true,其他情况,添加线程失败,移除该线程,并线程数量减1,返回false。

线程池中,通过一个set来存储所有的线程。

/**
 * Set containing all worker threads in pool. Accessed only when
 * holding mainLock.
 */
private final HashSet<Worker> workers = new HashSet<Worker>();

线程添加成功之后,该线程首先会执行指定的第一个处理任务,然后从工作队列的队首依次取任务去执行。

源代码:

    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

看到这里,线程池的创建和执行你理解了吗?欢迎留言讨论!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,245评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,749评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,960评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,575评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,668评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,670评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,664评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,422评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,864评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,178评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,340评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,015评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,646评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,265评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,494评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,261评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,206评论 2 352