Java线程池

Java线程池

线程池测试代码

public static void main(String[] args) {
    /**
     * 核线程数
     * 最大线程数
     * 非核心线程最大存活时间
     * 非核心线程最大存存活时间单位
     * 任务阻塞队列
     * 线程工厂
     * 任务拒绝策略
     */
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        2,
        10,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        },
        new ThreadPoolExecutor.AbortPolicy()
    );

    for (int i = 0; i < 20; i++) {
        int finalI = i;
        threadPoolExecutor.execute(()->{
            System.out.println(finalI +"XXXXXXXXXXX"+Thread.currentThread().getName());
        });
    }

}

线程池的执行过程:

image.png

线程池的状态切换

image.png

核心类ThreadPoolExecutor

系统自带的线程池,通过 ThreadPoolExecutor 实现的。

线程池的状态
//ctl int 类型的数值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 表述了两个状态: 
// 1. 表示线程池当前的状态(高3位)
// 2. 表示线程池当前的工作线程个数(低29位)
// 29:  00000000 00000000 00000000 00011101 
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大数量 57:0000000 0000000 0000000 00111010
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的5种状态: runState存储在高位
// 10000000 00000000 0000000 00000001 左移 29 位 : 11100000 00000000 00000000 00000000
// 111
private static final int RUNNING    = -1 << COUNT_BITS;

// 00000000 00000000 0000000 00000000 左移 29 位: 00000000 00000000 00000000 00000000
// 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 00000000 00000000 0000000 00000001 左移 29 位 : 00100000 00000000 00000000 00000000
// 001
private static final int STOP       =  1 << COUNT_BITS;

// 00000000 00000000 0000000 00000010 左移 29 位 : 01000000 00000000 00000000 00000000
// 010
private static final int TIDYING    =  2 << COUNT_BITS;

// 00000000 00000000 00000000 00000011 左移 29 位 :01100000 00000000 00000000 00000000
// 011
private static final int TERMINATED =  3 << COUNT_BITS;

// 计算出当前线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 计算当前线程池的工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
构造函数
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,  
                          long keepAliveTime,   
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
    
  • corePoolSize 核心线程数

  • maximumPoolSize 最大线程数

  • keepAliveTime 非核心线程等待任务时的超时时间

  • unit 非核心线程等待任务时的超时时间单位

  • workQueue 多余任务等待队列(当线程池中所有线程都在工作,新来的任务进入队列)

  • threadFactory 线程工厂(线程创建)

  • handler 拒绝策略(阻止执行的处理程序)

执行优先级:核心线程 -> 队列 -> 非核心线程

执行过程execute
public void execute(Runnable command) {
    //非空判断
    if (command == null)
        throw new NullPointerException();
    
    /**
     * 分为三步:
     * 1.如果运行的线程数少于核心线程数,添加一个核心线程启动该任务
     * 2.第一步的条件已不成立,进行第二步:加入阻塞队列进行排队
     * 3.如果前两步都不成立:尝试添加一个新线程;如果它失败了,我们知道我们已经关闭或饱和,因此拒绝该任务。
     **/
    //获取线程池状态
    int c = ctl.get();
    
    //工作线程个数 是否小于 核心线程数
    if (workerCountOf(c) < corePoolSize) { 
        //添加核心线程成功,返回
        if (addWorker(command, true)) 
            return;
        // 如果在并发情况下,添加核心线程失败的线程,重新获取线程池状态
        c = ctl.get();
    }
    
    // 当线程池为运行状态,将任务件加到工作队列  成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 添加任务到工作队列成功
        // 再次获取 ctl
        int recheck = ctl.get();
        // 判断线程池是否 RUNNING 状态,如果不是 RUNNING ,需要将任务从工作队列移除
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略(线程池状态不正确,执行拒绝策略)
            reject(command);
        
        // 判断工作线程个数是否为 0 (这时工作队列存在任务)
        else if (workerCountOf(recheck) == 0)
            // 工作线程数为0, 但是工作队列中有任务排队
            // 添加一个空任务非核心线程,为了处理在工作队列排队的任务
            addWorker(null, false);
    }
    
    // 任务添加到工作队列失败,添加非核心线程去执行当前任务
    else if (!addWorker(command, false))
        //添加非核心线程失败,执行拒绝策略
        reject(command);
}
线程池的执行 addWorker
// 添加工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
    // ========== 对线程池状态的判断,以及对线程数量的判断 =========
    
    // 外层for循环标识
    retry:
    for (;;) {
        //获取ctl值
        int c = ctl.get();
        //获取线程池的运行状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果线程池的状态不是 RUNNING ,就再次做后续判断,查看当前任务是否可以不处理
        if (rs >= SHUTDOWN &&
            // 线程池状态为 SHUTDOWN ,并且任务为空 , 并且工作队列任务不为空
            // 如果满足了这三个要求,那就是要处理工作队列当前任务,否则结束
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        
        for (;;) {
            //基础 ctl 获取当前工作线程数量 
            int wc = workerCountOf(c);
            //判断工作线程是否大于最大值
            if (wc >= CAPACITY ||
                // 如果是核心线程,是否大于设置的 corePoolSize (核心线程数)
                // 如果非核心线程,是否大于设置的 maximumPoolSize(最大线程数)
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            
            // 以 CAS 的方式,对工作线程 +1 ,如果成功
            if (compareAndIncrementWorkerCount(c))
                //直接跳出外层 for 循环
                break retry;
            //否则:重新获取 ctl 
            c = ctl.get();  // Re-read ctl
            // 基于新获取的 ctl 拿到线程池的状态,判断是否和之前的 rs 状态一致
            if (runStateOf(c) != rs)
                // 说明并发操作导致线程池的状态变化,需要重新判断状态
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // ========== 添加工作线程,并启动工作线程 =========
    
    //工作线程是否启动了
    boolean workerStarted = false;
    //工作线程是否添加了
    boolean workerAdded = false;
    // worker 就是工作线程
    Worker w = null;
    try {
        // new Worker 构建工作线程,将任务扔到了 worker 中
        w = new Worker(firstTask);
        // 拿到 Worker 中绑定的 Thread 线程
        final Thread t = w.thread;
        // 肯定不为null(健壮性判断)
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
             
                //保持锁定时重新检查。如果ThreadFactory出现故障或在获取锁之前关闭,请退出。
                
                //基于重新获取的 ctl,拿到线程池的状态
                int rs = runStateOf(ctl.get());
                
                //如果满足线程池的状态为 RUNNING,就添加工作线程
                if (rs < SHUTDOWN ||
                    // 如果满足线程池的状态为 SHUTDOWN,并且传入的任务为 null
                    (rs == SHUTDOWN && firstTask == null)) {
                    //开始添加工作线程
                    //判断当前线程是否处于 run 状态 (健壮性判断)
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将构建好的 Worker 对象添加到了 workers
                    workers.add(w);
                    //获取工作线程个数
                    int s = workers.size();
                    //如果现在的工作线程数,大于历史最大的工作线程数,就重新赋值给 largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //将工作线程添加的标识设置为 true
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //添加共组线程成功,启动线程
                t.start();
                //将工作线程启动标识设置为 true
                workerStarted = true;
            }
        }
    } finally {
        //如果启动工作线程失败
        if (! workerStarted)
            // 移除 workers 中的工作线程,将工作线程数 - 1 ,尝试修改线程池的状态为 TIDYING
            addWorkerFailed(w);
    }
    return workerStarted;
}

//启动工作线程失败,做的补救操作
private void addWorkerFailed(Worker w) {
    //加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //判断之前创建工作线程是否成功
        if (w != null)
            //如果成功,就将workers中的当前工作线程移除
            workers.remove(w);
        //将工作线程数 - 1
        decrementWorkerCount();
        //尝试修改线程池的状态为 TIDYING
        tryTerminate();
    } finally {
        //释放锁
        mainLock.unlock();
    }
}

常见的三种线程池的创建

ExecutorService executorService  = Executors.newCachedThreadPool(); 
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
newCachedThreadPool

缓存线程池,设置最大线程数,当线程空闲超过60秒,自动销毁线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
newFixedThreadPool

固定线程池,核心线程数等于最大线程数,不存在自动销毁空闲线程。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor

单个线程的线程池,线程池中只存在一个线程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

为什么不推荐使用自带的线程池?

  1. newCachedThreadPool

    maximumPoolSize : Integer.MAX_VALUE 任务有多少就需要创建多少 worker 。

    创建线程数太多,占用资源,可能造成 CPU 100%;

  2. newFixedThreadPool 和 newSingleThreadExecutor

    LinkedBlockingQueue 默认大小 Integer.MAX_VALUE

    当任务足够多时,都放入了队列,可能造成内存溢出。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容

  • 转载:https://www.jianshu.com/p/47e903ab1bec 一、概述 在我们的开发中经常会...
    郭某人1阅读 1,083评论 1 0
  • 引导 要求:线程资源必须通过线程池提供,不允许在应用自行显式创建线程;说明:使用线程池的好处是减少在创建和销毁线程...
    Hughman阅读 198评论 0 1
  • 前言 掌握线程池是后端程序员的基本要求,相信大家求职面试过程中,几乎都会被问到有关于线程池的问题。我在网上搜集了几...
    Jay_Wei阅读 973评论 0 0
  • 更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》 线程组 Thread Group 线程的集合...
    专职跑龙套阅读 1,357评论 0 3
  • 线程池的优点 ①降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;②提高系统响应速度,当有任务...
    三季人阅读 305评论 0 1