线程池源码解析 Jdk 1.8

基础

学习一个类,我们应该先从其字段开始。首先看看ThreadPoolExecutor对应的属性有哪些。

private volatile int corePoolSize; // 核心线程数,线程池在阻塞获取任务时可以保持永久存活的线程的最大值。当线程池内的线程超过此值的线程会通过poll(keepAliveTime)获取任务
private volatile int maximumPoolSize; // 线程池中允许的最大的线程数,这里使用volatile修饰,保证多线程下的可见性
private volatile long keepAliveTime; // Woker从workQueue获取任务的最大等待时间,超过这个时间后,worker会被回收掉(run方法执行完毕,线程不可复生)
private final BlockingQueue<Runnable> workQueue; // 提交的任务的排队队列,这是一个接口,通过不同的策略实现不同的线程池机制
private int largestPoolSize; // 线程池中最大的pool size,只会增加不会减少,其是一个统计信息
private final HashSet<Worker> workers = new HashSet<Worker>(); // 内部运行的Worker存放的地方,通过mainLock保证线程安全
private final ReentrantLock mainLock = new ReentrantLock(); //内部的一个独占锁,主要保证线程池的一些统计信息(最大的线程数、完成的任务数)和worker添加到集合的安全性
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //线程安全类型,最高位为符号位,次高3位为状态值,低28位为当前的线程数
private volatile boolean allowCoreThreadTimeOut; // 是否允许核心线程从阻塞队列获取任务时销毁。默认为false
private volatile ThreadFactory threadFactory; // 内部为worker提供任务执行的线程的生成工厂。我们通过自定义的工厂来使得业务日志更为清晰或者执行不同的业务逻辑
private volatile RejectedExecutionHandler handler; // 拒绝策略,默认拒绝策略为抛出异常。线程池的拒绝策略是策略模式在JDK中的一个应用点。可以自定义拒绝策略,在生产者的速度远远大于消费者时将超出的任务持久化到外部存储。

其中corePoolSize、maximumPoolSize、keepAliveTime等变量使用volatile修饰,是因为线程池提供了public的set方法让我们可以对其进行修改,这里需要使用volatile来使得修改对多线程可见。
其他属性的修改在mainLock的控制下进行。

线程池状态

了解线程池必须了解其状态机制。线程池内部使用AtomicInteger类型的clt属性来进行状态控制。其中次高三位分别表示running、shutdown、stop、tidying、teminated这5种状态

线程池状态图.png

常用的方法

  1. 任务提交
public void execute(Runnable command) {
        // NPE检查,线程池不允许提交NULL任务
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get(); // 获取当前的clt,AtomicInteger类型保证线程安全
        if (workerCountOf(c) < corePoolSize) { //如果当前运行的线程数小于核心线程数
            if (addWorker(command, true)) //如果添加核心线程数成功则方法返回
                return;
            c = ctl.get();//执行到这里必定是添加核心线程失败,重新读取最新的clt
        }
        /**
         * 这里分析一下添加核心态worker失败的几种场景:
         * 1、线程池为shutdown以上的状态
         * 2、当前线程池中运行的worker的数量超过其本身最大限制(2^29  -1 )
         * 3、当前线程池中运行的worker的数量超过corePoolSize
         */
        // 如果线程池处于running状态,则将当前提交的任务提交到内部的阻塞队列进行排队等待worker处理
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            /**
             * double check是否线程池仍在运行中
             * 如果线程池不在running状态则将刚才进行排队的任务移除,并拒绝此次提交的任务
             * 如果此时在线程池中运行的worker数量减少到0(corePoolSize为0的线程池在并发的情况下会出现此场景)
             * 则添加一个不携带任何任务的非核心态的worker去处理刚才排队成功的任务
             */
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))//如果排队失败(有界的阻塞队列)则添加一个非核心态的worker
         //添加失败:当前运行的worker数量超过maximumPoolSize或者本身最大的限制;线程池状态在shutdown以上
            reject(command);
    }
  1. 新增处理线程(worker)
private boolean addWorker(Runnable firstTask, boolean core) {
        //自旋进行线程状态check
        retry:
        for (;;) {
            int c = ctl.get(); //读取最新的clt,其本身具有可见性
            int rs = runStateOf(c);
            // 检查线程池状态是否在shutdown以上
            if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
                return false;
            /**
             * 自旋进行worker数量自增
             * 如果当前新增的是核心态的worker则与corePoolSize进行比较
             * 如果当期新增的是非核心态的worker则与maximumPoolSize进行比较
             * 不满足数量限制则直接添加失败,进入后续的排队 or 拒绝流程
             */
            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                /**
                 * 通过CAS进行worker数量+1。为什么不直接调用AtomicInteger提供的incrementAndGet() 方法?
                 * 因为我们是需要将worker数量+1,而后者并不能提供单纯的+1功能。将c-> c+1而不是变成c -> c + N
                 */
                if (compareAndIncrementWorkerCount(c))
                    break retry; //如果CAS成功则跳出自旋
                c = ctl.get();  // 重新读clt,代码执行到这里意味着clt的值必定被其他线程修改,本次读会从主存读取最新的值到工作内存
                if (runStateOf(c) != rs)// 如果线程池状态发生变化(只有running状态才接受新任务),则跳到外层循环执行拒绝
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }
        // 代码执行到此处,意味着worker的数量成功+1,则可以进行worker的构造过程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // new 一个worker,将本次提交的任务封装到其内部
            w = new Worker(firstTask);
            final Thread t = w.thread; // worker内部真正用来执行任务的线程
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                /**
                 * 进行线程池状态检查,thread状态检查,进行运行的最大线程数(largestPoolSize)统计
                 * 将worker添加到wokrers容器(HashSet)中
                 * 修改workerAdded为true
                 */
                try {
                   ...省略此处代码
                } finally {
                    mainLock.unlock();
                }
                //在这里workerAdded为false:thread已经调用该start方法;线程池状态为shutdown以上
                if (workerAdded) {
                    // 启动worker内部的线程,其会调用worker内部的run方法
                    t.start();
                    // 添加成功
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
  1. 执行任务 (worker的工作流程)

什么是worker?

 private final class Worker extends AbstractQueuedSynchronizer implements Runnable
      Worker(Runnable firstTask) {
         setState(-1);
         this.firstTask = firstTask; //外部提交的任务
         this.thread = getThreadFactory().newThread(this); // 真实执行任务的线程
       }

从这里我们可以看出其实际是一个Runnable,并且是AQS的子类,那么我们可以简单的猜测到其能够进行并发的控制(lock、unlock)

  final void runWorker(Worker w) {
        //在添加worker的流程中执行thread.start()之后真实执行的方法
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask; // 获取当前worker携带的任务
        w.firstTask = null;
        /**
         * 直接unlock???在unlock之前一定要lock吗?从这里我们可以看出不一定
         */
        w.unlock(); // 修改state为0,将占用锁的线程设为null(第一次执行之前没有线程占用)
        boolean completedAbruptly = true;
        try {
            // 自旋。先执行自己携带的任务,然后从阻塞队列中获取一个任务直到无法获取任务
            while (task != null || (task = getTask()) != null) {
                // 将state修改为1,设置占有锁的线程为自己
                w.lock();
                /**
                 * check线程池的状态,如果状态为stop以上(stop以上不执行任务),则中断当前线程
                 * 如果当前线程已被中断(其他线程并发的调用线程池的shutdown()或shutdownNow()方法),则check线程池状态是否为stop以上
                 * 最后如果当前线程未被中断则中断当前线程(不可能!笔者还未想到此种场景)
                 */
                if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);// 空方法,留给子类实现
                    Throwable thrown = null;
                    try {
                        task.run(); //执行外部提交的任务,通过try-catch来保证异常不会影响线程池本身的功能
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);// 空方法,留给子类实现
                    }
                } finally {
                    task = null;
                    w.completedTasks++; //已完成任务数量统计
                    w.unlock();
                }
            }
            // 如果执行到这里代表非核心线程在keepAliveTime内无法获取任务而退出
            completedAbruptly = false;
        } finally {
            /**
             * 从上面可以看出如果实际业务(外部提交的Runnable)出现异常会导致当前worker终止
             * completedAbruptly 此时为true意味着worker是突然完成,不是正常退出
             */
            processWorkerExit(w, completedAbruptly);// 执行worker退出收尾工作
        }
    }
  1. 获取任务
  private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?
        // 自旋获取任务(因为是多线程环境)
        for (;;) {
            int c = ctl.get();// 读取最新的clt
            int rs = runStateOf(c);
            /**
             * 1、线程池状态为shutdown并且任务队列为空
             * 2、线程池状态为stop状态以上
             * 这2种情况直接减少worker数量,并且返回null从而保证外部获取任务的worker进行正常退出
             */
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            int wc = workerCountOf(c);
            /**
             * 1、允许核心线程退出
             * 2、当前的线程数量超过核心线程数
             * 这时获取任务的机制切换为poll(keepAliveTime)
             */
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            /**
             * 1、线程数大于maximumPoolSize(什么时候会出现这种情况? 当maximumPoolSize初始设置为0或者其他线程通过set方法对其进行修改)
             * 2、线程数未超过maximumPoolSize但是timed为true(允许核心线程退出或者线程数量超过核心线程)
             * 并且上次获取任务超时(没获取到任务,我们推测本次依旧会超时)
             * 3、在满足条件1或者条件2的情况下进行check:运行线程数大于1或者任务队列没有任务
             */
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c)) // CAS进行worker数量-1,成功则返回null进行worker退出流程,失败则继续自旋
                    return null;
                continue;
            }
            try {
                // 如果允许超时退出,则调用poll(keepAliveTime)获取任务,否则则通过tack()一直阻塞等待直到有任务提交到队列
                Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;// 当等待超过keepAliveTime时间未获取到任务时,标记为true。在下次自旋时会进入销毁流程
            } catch (InterruptedException retry) {
                // 什么时候会抛出异常?当调用shutdown或者shutdownNow方法触发worker内的Thread调用interrupt方法时会执行到此处
                timedOut = false;
            }
        }
    }
  1. 关闭线程池
  public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        // 利用排它锁进行上锁,保证只有一个线程执行关闭流程
        mainLock.lock();
        try {
            // 安全检查
            checkShutdownAccess();
            // 内部通过自旋+CAS修改线程池状态为shutdown
            advanceRunState(SHUTDOWN);
            // 遍历所有的worker,进行线程中断通知
            interruptIdleWorkers();
            // 钩子函数
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        // 进行最后的整理工作
        tryTerminate();
    }
  public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        ...和shutdown类似,将状态修改为stop并返回在任务队列排队的任务 ...
        return tasks;
    }

总结

线程池能为我们减少线程创建的开销,但是相应参数的设置需要不断测试从而到达一个相对最优的配置

  1. 过大的线程数可能导致CPU切换过于频繁从而导致效率降低
  2. 过小的线程数可能导致CPU利用率不高
  3. 有界队列可以防止资源耗尽,但是我们需要考虑在生产速度大于消费速度时提交任务带来的拒绝问题
  4. 无界队列在消费速度小于生产队列时可能导致频繁的GC从而降低系统响应速度

以上所述都是个人学习源码之中的一点心得体会,如果不实之处,望大家谅解和指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容