Java线程池

Java线程池

线程池测试代码

public static void main(String[] args) {
    /**
     * 核线程数
     * 最大线程数
     * 非核心线程最大存活时间
     * 非核心线程最大存存活时间单位
     * 任务阻塞队列
     * 线程工厂
     * 任务拒绝策略
     */
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        2,
        10,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<>(10),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r);
            }
        },
        new ThreadPoolExecutor.AbortPolicy()
    );

    for (int i = 0; i < 20; i++) {
        int finalI = i;
        threadPoolExecutor.execute(()->{
            System.out.println(finalI +"XXXXXXXXXXX"+Thread.currentThread().getName());
        });
    }

}

线程池的执行过程:

image.png

线程池的状态切换

image.png

核心类ThreadPoolExecutor

系统自带的线程池,通过 ThreadPoolExecutor 实现的。

线程池的状态
//ctl int 类型的数值
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// ctl 表述了两个状态: 
// 1. 表示线程池当前的状态(高3位)
// 2. 表示线程池当前的工作线程个数(低29位)
// 29:  00000000 00000000 00000000 00011101 
private static final int COUNT_BITS = Integer.SIZE - 3;
// 线程池的最大数量 57:0000000 0000000 0000000 00111010
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 线程池的5种状态: runState存储在高位
// 10000000 00000000 0000000 00000001 左移 29 位 : 11100000 00000000 00000000 00000000
// 111
private static final int RUNNING    = -1 << COUNT_BITS;

// 00000000 00000000 0000000 00000000 左移 29 位: 00000000 00000000 00000000 00000000
// 000
private static final int SHUTDOWN   =  0 << COUNT_BITS;

// 00000000 00000000 0000000 00000001 左移 29 位 : 00100000 00000000 00000000 00000000
// 001
private static final int STOP       =  1 << COUNT_BITS;

// 00000000 00000000 0000000 00000010 左移 29 位 : 01000000 00000000 00000000 00000000
// 010
private static final int TIDYING    =  2 << COUNT_BITS;

// 00000000 00000000 00000000 00000011 左移 29 位 :01100000 00000000 00000000 00000000
// 011
private static final int TERMINATED =  3 << COUNT_BITS;

// 计算出当前线程池的状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 计算当前线程池的工作线程个数
private static int workerCountOf(int c)  { return c & CAPACITY; }
构造函数
public ThreadPoolExecutor(int corePoolSize, 
                          int maximumPoolSize,  
                          long keepAliveTime,   
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
    
  • corePoolSize 核心线程数

  • maximumPoolSize 最大线程数

  • keepAliveTime 非核心线程等待任务时的超时时间

  • unit 非核心线程等待任务时的超时时间单位

  • workQueue 多余任务等待队列(当线程池中所有线程都在工作,新来的任务进入队列)

  • threadFactory 线程工厂(线程创建)

  • handler 拒绝策略(阻止执行的处理程序)

执行优先级:核心线程 -> 队列 -> 非核心线程

执行过程execute
public void execute(Runnable command) {
    //非空判断
    if (command == null)
        throw new NullPointerException();
    
    /**
     * 分为三步:
     * 1.如果运行的线程数少于核心线程数,添加一个核心线程启动该任务
     * 2.第一步的条件已不成立,进行第二步:加入阻塞队列进行排队
     * 3.如果前两步都不成立:尝试添加一个新线程;如果它失败了,我们知道我们已经关闭或饱和,因此拒绝该任务。
     **/
    //获取线程池状态
    int c = ctl.get();
    
    //工作线程个数 是否小于 核心线程数
    if (workerCountOf(c) < corePoolSize) { 
        //添加核心线程成功,返回
        if (addWorker(command, true)) 
            return;
        // 如果在并发情况下,添加核心线程失败的线程,重新获取线程池状态
        c = ctl.get();
    }
    
    // 当线程池为运行状态,将任务件加到工作队列  成功
    if (isRunning(c) && workQueue.offer(command)) {
        // 添加任务到工作队列成功
        // 再次获取 ctl
        int recheck = ctl.get();
        // 判断线程池是否 RUNNING 状态,如果不是 RUNNING ,需要将任务从工作队列移除
        if (! isRunning(recheck) && remove(command))
            // 执行拒绝策略(线程池状态不正确,执行拒绝策略)
            reject(command);
        
        // 判断工作线程个数是否为 0 (这时工作队列存在任务)
        else if (workerCountOf(recheck) == 0)
            // 工作线程数为0, 但是工作队列中有任务排队
            // 添加一个空任务非核心线程,为了处理在工作队列排队的任务
            addWorker(null, false);
    }
    
    // 任务添加到工作队列失败,添加非核心线程去执行当前任务
    else if (!addWorker(command, false))
        //添加非核心线程失败,执行拒绝策略
        reject(command);
}
线程池的执行 addWorker
// 添加工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
    // ========== 对线程池状态的判断,以及对线程数量的判断 =========
    
    // 外层for循环标识
    retry:
    for (;;) {
        //获取ctl值
        int c = ctl.get();
        //获取线程池的运行状态
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 如果线程池的状态不是 RUNNING ,就再次做后续判断,查看当前任务是否可以不处理
        if (rs >= SHUTDOWN &&
            // 线程池状态为 SHUTDOWN ,并且任务为空 , 并且工作队列任务不为空
            // 如果满足了这三个要求,那就是要处理工作队列当前任务,否则结束
            ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
            return false;

        
        for (;;) {
            //基础 ctl 获取当前工作线程数量 
            int wc = workerCountOf(c);
            //判断工作线程是否大于最大值
            if (wc >= CAPACITY ||
                // 如果是核心线程,是否大于设置的 corePoolSize (核心线程数)
                // 如果非核心线程,是否大于设置的 maximumPoolSize(最大线程数)
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            
            // 以 CAS 的方式,对工作线程 +1 ,如果成功
            if (compareAndIncrementWorkerCount(c))
                //直接跳出外层 for 循环
                break retry;
            //否则:重新获取 ctl 
            c = ctl.get();  // Re-read ctl
            // 基于新获取的 ctl 拿到线程池的状态,判断是否和之前的 rs 状态一致
            if (runStateOf(c) != rs)
                // 说明并发操作导致线程池的状态变化,需要重新判断状态
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // ========== 添加工作线程,并启动工作线程 =========
    
    //工作线程是否启动了
    boolean workerStarted = false;
    //工作线程是否添加了
    boolean workerAdded = false;
    // worker 就是工作线程
    Worker w = null;
    try {
        // new Worker 构建工作线程,将任务扔到了 worker 中
        w = new Worker(firstTask);
        // 拿到 Worker 中绑定的 Thread 线程
        final Thread t = w.thread;
        // 肯定不为null(健壮性判断)
        if (t != null) {
            // 加锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
             
                //保持锁定时重新检查。如果ThreadFactory出现故障或在获取锁之前关闭,请退出。
                
                //基于重新获取的 ctl,拿到线程池的状态
                int rs = runStateOf(ctl.get());
                
                //如果满足线程池的状态为 RUNNING,就添加工作线程
                if (rs < SHUTDOWN ||
                    // 如果满足线程池的状态为 SHUTDOWN,并且传入的任务为 null
                    (rs == SHUTDOWN && firstTask == null)) {
                    //开始添加工作线程
                    //判断当前线程是否处于 run 状态 (健壮性判断)
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    //将构建好的 Worker 对象添加到了 workers
                    workers.add(w);
                    //获取工作线程个数
                    int s = workers.size();
                    //如果现在的工作线程数,大于历史最大的工作线程数,就重新赋值给 largestPoolSize
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    //将工作线程添加的标识设置为 true
                    workerAdded = true;
                }
            } finally {
                //释放锁
                mainLock.unlock();
            }
            if (workerAdded) {
                //添加共组线程成功,启动线程
                t.start();
                //将工作线程启动标识设置为 true
                workerStarted = true;
            }
        }
    } finally {
        //如果启动工作线程失败
        if (! workerStarted)
            // 移除 workers 中的工作线程,将工作线程数 - 1 ,尝试修改线程池的状态为 TIDYING
            addWorkerFailed(w);
    }
    return workerStarted;
}

//启动工作线程失败,做的补救操作
private void addWorkerFailed(Worker w) {
    //加锁
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        //判断之前创建工作线程是否成功
        if (w != null)
            //如果成功,就将workers中的当前工作线程移除
            workers.remove(w);
        //将工作线程数 - 1
        decrementWorkerCount();
        //尝试修改线程池的状态为 TIDYING
        tryTerminate();
    } finally {
        //释放锁
        mainLock.unlock();
    }
}

常见的三种线程池的创建

ExecutorService executorService  = Executors.newCachedThreadPool(); 
ExecutorService executorService1 = Executors.newFixedThreadPool(10);
ExecutorService executorService2 = Executors.newSingleThreadExecutor();
newCachedThreadPool

缓存线程池,设置最大线程数,当线程空闲超过60秒,自动销毁线程。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
newFixedThreadPool

固定线程池,核心线程数等于最大线程数,不存在自动销毁空闲线程。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor

单个线程的线程池,线程池中只存在一个线程。

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

为什么不推荐使用自带的线程池?

  1. newCachedThreadPool

    maximumPoolSize : Integer.MAX_VALUE 任务有多少就需要创建多少 worker 。

    创建线程数太多,占用资源,可能造成 CPU 100%;

  2. newFixedThreadPool 和 newSingleThreadExecutor

    LinkedBlockingQueue 默认大小 Integer.MAX_VALUE

    当任务足够多时,都放入了队列,可能造成内存溢出。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 转载:https://www.jianshu.com/p/47e903ab1bec 一、概述 在我们的开发中经常会...
    郭某人1阅读 1,088评论 1 0
  • 引导 要求:线程资源必须通过线程池提供,不允许在应用自行显式创建线程;说明:使用线程池的好处是减少在创建和销毁线程...
    Hughman阅读 213评论 0 1
  • 前言 掌握线程池是后端程序员的基本要求,相信大家求职面试过程中,几乎都会被问到有关于线程池的问题。我在网上搜集了几...
    Jay_Wei阅读 999评论 0 0
  • 更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》 线程组 Thread Group 线程的集合...
    专职跑龙套阅读 1,387评论 0 3
  • 线程池的优点 ①降低系统资源消耗,通过重用已存在的线程,降低线程创建和销毁造成的消耗;②提高系统响应速度,当有任务...
    三季人阅读 311评论 0 1