线程池ThreadPoolExecutor使用注意事项

1. 线程池的作用:

  1. 重复利用已经创建好的线程, 降低创建线程和销毁线程的性能开销
  2. 合理的设置线程池大小可以避免因为线程数超出硬件资源瓶颈带来的问题,类似起到了限流的作用

2. 线程池的创建

2.1 原生线程池 ThreadPoolExecutor(推荐)
ThreadPoolExecutor(int corePoolSize,  
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
image.png
2.2 JDK工具类 Executors(不推荐

1. newFixedThreadPool
工作线程控制在固定的数量上,但任务队列是无界的

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

2 newCachedThreadPool
最大线程数无上限

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

3 使用注意事项

3.1 不要使用Executors提供的工具方法创建线程池,会导致OOM,原因已在上面分析
3.2 线程池参数的配置

1. corePoolSize、maximumPoolSize 的选择
因为实际很难界定系统是IO密集型还是CPU密集型,并且tasks、taskcost也不是一成不变的,所以实际使用中先根据经验估算出一个经验值,然后再通过压测验证经验值, 这时如果能直观的观察到线程池的内部状态就非常有必要了

参数 备注
根据任务类型估算:
CPU密集型: N+1
IO密集型: corePoolSize 1, maximumPoolSize: 2N
《Java多线程变成实战指南》
IO corePoolSize 为1, 是为了减少IO操作引起的上下文切换
根据tasks、taskcost、responsetime估算 https://www.lagou.com/lgeduarticle/18336.html
动态修改线程池配置 https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html

1.2 BlockingQueue的选择
通常认为LinkedBlockingQueue吞吐量会比ArrayBlockingQueueu高[https://www.jianshu.com/p/5b85c1794351]
i. ArrayBlockingQueue: 存储空间是预先分配的, 一把全局锁
ii. LinkedBlockingQueue: 存取两把锁,用于存储队列元素的存储空间是在其使用过程中动态分配的,因此它可能会增加JVM垃圾回收的负担。
1.3 拒绝策略的选择

图片来源于《Java线程池实现原理及其在美团业务中的实践》

ThreadPoolExecutor的扩展
因为ThreadPoolExecutor为了通用,只有在阻塞队列满的时候,才继续创建线程,那有没有办法corePoolSize达到时,继续创建线程,直到达到了maximumPoolSize才放到阻塞队列中,可以参考唯品会开源的QueuableCachedThreadPool

3.3 线程池的监控

线程池监控主要为以下几方面提供帮助:
1.为线程池调优提供参考
1.帮助定位问题以及监控报警

ThreadPoolExecutor提供一些方法暴露内部状态,可以将指标记录到监控中

    private void doMetric() {
        // 当前线程池中运行的线程总数
        MetricClient.record("thread.pool.pool.size", threadPoolExecutor.getPoolSize());
        // 历史峰值线程数
        MetricClient.record("thread.pool.largest.pool.size", threadPoolExecutor.getLargestPoolSize());
        // 当前任务队列中积压任务的总数
        MetricClient.record("thread.pool.queue.size", threadPoolExecutor.getQueue().size());
        // 当前活跃线程数
        MetricClient.record("thread.pool.active.size", threadPoolExecutor.getActiveCount());
    }
3.4 ThreadPoolExecutor需要在全局声明

否则会重复创建多个线程池, 而且核心线程池不会被销毁, 一个反例如下

    public static void main(String[] args) throws Exception {

        FixedThreadPool fixedThreadPool = new FixedThreadPool();

        for (int i = 0; i < 5; i++) {
            fixedThreadPool.test();
        }

        TimeUnit.SECONDS.sleep(100);
        System.out.println("Finished");
    }

    private void test() {
        ExecutorService threadPoolExecutor = new ThreadPoolExecutor(
                1, 10,
                0, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(5),
                new ThreadFactoryBuilder().setNameFormat("threadPool-%d").build());

        threadPoolExecutor.submit(() -> System.out.println(Thread.currentThread().getName()));
    }

因为调用test函数5次,最终会有5个线程,且状态为WAIT。


image.png
3.5 避免线程泄露

线程泄露是指线程池中的工作者意外终止,使得线程池中实际可用的工作者线程变少
如等待网络I/O, 而该任务有没有对这个等待指定时间限制,如果外部资源一直没有返回该任务所等待的结果,就会导致该线程一直处于等待状态而无法执行其他任务。

3.6 线程池隔离
  1. 1 避免相互影响
    应用中通常配置多个线程池,要根据任务的“轻重缓急”来指定线程池的核心参数,包括线程数、回收策略和任务队列,避免相互之间影响。
  2. 1 避免引发死锁
    提交给同一线程池实例执行的任务是相互独立的,而不是彼此有依赖关系的任务
    如下是一个引起死锁的极端例子, 任务A依赖任务B的执行结果,等待B的执行,任务B因为资源限制,位于阻塞队列中,等待任务A执行结束,造成死锁
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(5));

        Runnable taskA = () -> {
            System.out.println("Task A start.");
            Runnable taskB = () -> System.out.println("Process Task B");

            Future result = threadPoolExecutor.submit(taskB);

            try {
                result.get();
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("Task A finished");
        };

        Future future = threadPoolExecutor.submit(taskA);

        future.get();

参考文献:

  1. Java多线程编程实战指南
  2. Java线程池实现原理及其在美团业务中的实践
  3. ArrayBlockingQueue与LinkedBlockingQueue
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容