深入理解线程池

老生常谈的问题:什么是线程池?

线程池就是创建若干个可执行的线程放入一个池(容器)中,有任务需要处理时,会提交到线程池中的任务队列,处理完之后线程并不会被销毁,而是仍然在线程池中等待下一个任务。

为什么要用线程池?

1.创建/销毁线程伴随着系统开销,使用多线程过于频繁的创建/销毁线程,会很大程度上影响处理效率;这里线程池可以复用线程,线程池可以避免性能降低。
2.线程并发数量过多,抢占系统资源从而导致阻塞;这里线程池可以显示最大线程数量。
3.提高线程的可管理性:线程是稀缺资源,如果无限制的创建,不仅会
消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的
分配,监控和调优。
4.提高响应速度:当任务到达时,可以不需要等待线程创建就能立即执
行。

讲线程池原理之前,先讲一讲线程池原理要涉及到的阻塞队列。

阻塞队列:

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这里会有两个阻塞动作:在队列为空时,获取元素的线程会阻塞来等待队列变为非空。当队列满时,存储元素的线程会阻塞来等待队列可用。

JDK提供了一个阻塞队列的接口类BlockingQueue:
插入移除有三对方法,既有阻塞方法,也有非阻塞方法。

add(e)与remove() ,如果遇到阻塞,会抛出异常
offer(e)与poll(),如果遇到阻塞,offer返回false,poll返回null
put(e)与take() ,如果遇到阻塞,会一直阻塞

阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

7个阻塞队列:

ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列,可以对元素类实现comparator,依据此进行排序。
DelayQueue:一个支持延时获取元素的无界阻塞队列,如果一个元素的延迟时间没有到,元素是获取不到的。
SynchronousQueue:一个不存储元素的阻塞队列。

有界表示队列长度有限,无界表示长度无限,可以无限放入元素。这个容量会在构造方法传入capacity值来初始化最大容量。

源码分析:

这里是ThreadPoolExecutor的构造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
参数意义:

int corePoolSize, 核心线程数
int maximumPoolSize, 最大线程数
long keepAliveTime, 设置空闲线程的存活时间
TimeUnit unit, 设置空闲线程的存活时间单位
BlockingQueue<Runnable> workQueue, 保存任务的阻塞队列
ThreadFactory threadFactory, 创建线程的工厂,给新建的线程赋予名字
RejectedExecutionHandler handler) 饱和/拒绝策略,内置4种策略

拒绝策略:
  • AbortPolicy :直接抛出异常,默认;
  • CallerRunsPolicy:用调用者所在的线程来执行任务
  • DiscardOldestPolicy:丢弃阻塞队列里最老的任务,队列里最靠前的任务
  • DiscardPolicy :当前任务直接丢弃
线程池工作机制图:
线程池运行机制:

1.当程序生产了任务之后,提交进线程池。
2.线程池会创建出corePoolSize数量的线程执行任务。
3.如果任务数超出了corePoolSize,那么阻塞队列派上用场了,会将任务放入到阻塞队列中。
4.如果任务数继续增大,阻塞队列也满了,这个时候才会继续新起线程执行任务。
5.而如果线程总数达到了maximumPoolSize,那么这个时候饱和策略就会执行了,这时线程池就会抛出 RejectedExecutionException 异常,线程池默认的拒绝策略就是丢弃任务。

ExecutorService类结构:
关闭线程池

shutdown:线程池执行 shutdown() 后,就会拒绝接收新的任务,但是会等待线程池中正在执行的任务和已经进入阻塞队列的任务都执行完之后才最终关闭线程池。
shutdownNow:会拒绝接收新的任务,同时还会中断线程池中正在执行的任务,已经进入阻塞队列的任务也被剥夺了执行的机会。

线程池提交任务的方式
ExecutorService.execute(Runnable command) 或者 
ExecutorService.sumbit(Runnable command) 或

1.excute(runnable)方法提交,提交的任务不管返回结果
2.submit(runnable,callback)方法提交,提交的任务可以得到任务执行完成的返回结果。

创建自定义线程池示例:
    public void createThreadPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2, // 核心线程数
                16, // 最大线程数
                60, TimeUnit.MILLISECONDS, // 线程空闲时间
                new LinkedBlockingDeque<>(1024), // 使用有界阻塞队列
                // 自定义拒绝策略,一般和降级策略配合使用
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                         // 队列已满,拒绝执行
                        throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + executor.toString());
                    }
                }
        );
        threadPoolExecutor.submit(() -> System.out.println("线程池开始执行任务"));
    }
线程池是一种生产者 - 消费者模式

线程池的设计,普遍采用的都是生产者 - 消费者模式。线程池的使用方是生产者,线程池本身是消费者。

原理实现大致如下:

/**
 * 自实现简单线程池
 */
public class ThreadPoolExecutor {

    // 利用阻塞队列实现生产者-消费者模式
    BlockingQueue<Runnable> workQueue;
    //管理内部工作线程
    final List<Thread> threads = new ArrayList<>();

    public ThreadPoolExecutor(int coreSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;

        //创建核心线程
        for (int i=0;i<coreSize;i++) {
            WorkerThread workerThread = new WorkerThread();
            workerThread.start();
            threads.add(workerThread);
        }
    }

    /**
     * 生产者  提交任务
     * @param runnable
     */
    public void execute(Runnable runnable) {
        try {
            // 队列已满,put 会一直等待
            workQueue.put(runnable);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * 工作线程负责消费任务,并执行任务
     */
    class WorkerThread extends Thread {
        @Override
        public void run() {
            while(true) {
                try {
                    Runnable runnable = workQueue.take();
                    runnable.run();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    public static void createThreadPool() {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, new LinkedBlockingDeque<>());
        threadPoolExecutor.execute(() -> System.out.println("线程池开始执行任务"));
    }
}
线程池如何配置合理线程数

I/O 密集型程序和 CPU 密集型程序,计算最佳线程数的方法是不同的。
(1)CPU密集型:
定义:CPU 密集型计算大部分场景下都是纯 CPU 计算,而没有阻塞,CPU一直全速运行。

多线程本质上是提升多核 CPU 的利用率,所以对于一个 4 核的 CPU,每个核一个线程,理论上创建 4 个线程就可以了,再多创建线程也只是增加线程切换的成本。
配置线程数:CPU核数+1 (多一个用于线程卡顿的备用)

(2)IO密集型:
定义:IO密集型,即该任务需要大量的IO,读写文件读写流,即大量的阻塞。
在单线程上运行IO密集型任务会导致浪费大量的CPU运算能力浪费在等待。
所以IO密集型任务中使用多线程可以大大的加速程序运行,即使在单核CPU上,这种加速主要利用了被浪费掉的阻塞时间。
配置线程数:CPU核数 * 2

CPU核心数获取方法:

 Runtime.getRuntime().availableProcessors()

另外一方面考虑,在一些核心业务中,核心线程数设置为CPU核心数,最大线程数可根据公式 最佳线程数 =CPU 核数 * [ 1 +(I/O 耗时 / CPU 耗时)] 来计算。在一些非核心业务中,我们可以将核心线程数设置小一些,最大线程数量设置为CPU核心数量。

四种线程池创建
Executors

考虑到 ThreadPoolExecutor 的构造函数实在是有些复杂,所以 Java 并发包里提供了一个线程池的静态工厂类 Executors,利用 Executors 你可以快速创建线程池。包含四种常用线程池。

1.创建缓存线程池:

创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

内部实现方式:

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

2.创建单线程池:
创建一个线程的线程池,保证任务执行顺序和添加任务顺序一致。核心线程和最大线程只有一个。

        ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

内部实现方式:

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.创建定长线程池
这种方式可以指定线程池中的线程数。核心线程数和最大线程数是传的参数。

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);

内部实现方式:

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

4.创建周期线程池
该线程池支持定时,以及周期性的任务执行,我们可以延迟任务的执行时间,也可以设置一个周期性的时间让任务重复执行。

        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(3);
验证各种线程池特性:

创建执行的任务:

public class MyRunable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        System.out.println(Thread.currentThread().getName() + "  " + "处理任务");
    }
}

1.循环创建任务放入CachedThreadPool中执行:

    public static void cachedThreadPool() {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i=0;i<5;i++) {
            executor.execute(new MyRunable());
        }
    }

打印结果:实现了线程的复用,并没有创建多余的线程。

pool-2-thread-1  处理任务
pool-2-thread-2  处理任务
pool-2-thread-1  处理任务
pool-2-thread-3  处理任务
pool-2-thread-3  处理任务

2.循环创建任务放入SingleThreadPool中执行:

       ExecutorService executor = Executors.newFixedThreadPool(3);
        for(int i=0;i<5;i++){
            executor.execute(new MyRunable());
        }

当前只有一个线程处理任务,任务按照加入顺序依序执行:

pool-2-thread-1  处理任务
pool-2-thread-1  处理任务
pool-2-thread-1  处理任务
pool-2-thread-1  处理任务
pool-2-thread-1  处理任务

3.循环创建任务放入FixedThreadPool中执行:
定长线程池,设置最大线程数为3;

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
        for(int i=0;i<10;i++){
            fixedThreadPool.execute(new MyRunable());
        }

最多能有3个线程同时处理任务:

pool-2-thread-1  处理任务
pool-2-thread-3  处理任务
pool-2-thread-2  处理任务
pool-2-thread-1  处理任务
pool-2-thread-3  处理任务

4.ScheduledThreadPool执行周期性任务:

        ScheduledExecutorService executor = Executors.newScheduledThreadPool(3);
        executor.scheduleWithFixedDelay(() -> System.out.println(Thread.currentThread().getName() + "  " + "处理任务"),
                4, 4, TimeUnit.SECONDS);

打印方式是每4秒,打印一次任务执行:

pool-2-thread-1  处理任务
pool-2-thread-1  处理任务
pool-2-thread-2  处理任务
pool-2-thread-2  处理任务
pool-2-thread-3  处理任务
线程池多余的线程是如何回收的?

参考:https://blog.csdn.net/xmtblog/article/details/116725086

四种线程池分别用在什么样的场景?

1.SIngleThreadPool
因为线程池里只有一个线程,就确保所有的任务都在同一个线程中顺序执行,这样就不需要处理线程同步的问题。这类线程池适用于多个任务顺序执行的场景
2.FixedThreadPool
里面全是核心线程,没有非核心线程,也没有超时机制。即使是空闲状态,线程不会被回收,除非线程池被关闭。由于该线程池线程数固定,且不被回收,线程与线程池的生命周期同步,所以适用于任务量比较固定但耗时长的任务

  1. CachedThreadPool
    如果线程池中的线程数量过大,它可以有效的回收多余的线程,如果线程数不足,那么它可以创建新的线程。这类线程池的空闲线程都是有超时机制的,keepAliveTime在这里是有效的,时长为60秒,超过60秒的空闲线程就会被回收,所以CachedThreadPool比较适合任务量大但耗时少的任务
  2. ScheduledThreadPool
    这类线程池适用于执行定时任务和具体固定周期的重复任务。
参考:

https://blog.csdn.net/hollis_chuang/article/details/102740058
https://www.cnblogs.com/chiangchou/p/thread-pool.html

Github代码地址:

https://github.com/running-libo/JavaPrinciples

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容