ThreadPoolExecutor + CountDownLatch 实际应用

记录下我自己工作中遇到的一个实际问题。

场景

遍历一个参数集合,并请求第三方API拉取数据,由于总数据量过大(过亿/天),所以运行时长接近30小时,不能满足业务方需求。为此,我使用线程池改造成并发拉取来提升拉取速度。

目标

1.多线程并发拉取,但是考虑到第三方有qps限制,不能一味增加线程数,所以要设定最大并发上限。
2.任务结束后记录拉取总数。

实现

多线程的介绍可以参见大神的博文 Java并发编程:线程池的使用.。

为了实现始终以不超过最大并发数目的线程数并发执行:
1、可以用线程池封装好的Executors.newFixedThreadPool(corePoolSize);
2、自定义:可以设置核心线程数corePoolSize为我期望的最大并发数,并且设置队列大小和maximumPoolSize不小于我的总任务数(直接设置为总任务数即可)。这很重要,根据线程池的特性,这里直接引用作者原话:

如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

如果队列长度设置小了,很容易满员,此时如果提交的任务数未超过maximumPoolSize,则线程池会继续创建新的线程,这就导致线程数超过我期望的最大并发数,因此队列大小必须大于等于我的总任务数。

还有,如果maximumPoolSize小于总任务数,可能会抛异常,原话:

如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

所以maximumPoolSize必须大于等于我的总任务数。

为了实现任务结束后记录总数,可以借助并发包工具类CountDownLatch,因为多线程是异步的,CountDownLatch可以阻塞当前线程,直到所有子线程完成。

下面是一个例子

package com.yzy.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main1 {
    public static void main(String[] args) throws InterruptedException {

        // 线程安全的计数器
        AtomicInteger totalRows = new AtomicInteger(0);

        // 创建线程池,其中核心线程10,也是我期望的最大并发数,最大线程数和队列大小都为30,即我的总任务数
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

        //或者ExecutorService executor = Executors.newFixedThreadPool(corePoolSize);

        // 初始化CountDownLatch,大小为30
        CountDownLatch countDownLatch = new CountDownLatch(30);

        // 模拟遍历参数集合
        for (int i = 0; i < 30; i++) {
            // 往线程池提交任务
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    int times = 0;
                    // 模拟数据拉取过程可能需要分页
                    while (true) {
                        // 模拟每个任务需要分页5次
                        if (times >= 5) {
                            break;
                        }
                        times++;

                        // 模拟计数
                        totalRows.incrementAndGet();
                        try {
                            // 模拟耗时
                            Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 子线程完成,countDownLatch执行countDown
                    countDownLatch.countDown();
                }
            });
            // 打印线程池运行状态
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());
        }
        // 标记多线程关闭,但不会立马关闭
        executor.shutdown();

        // 阻塞当前线程,知道所有子线程都执行countDown方法才会继续执行
        countDownLatch.await();

        // 打印线程池运行状态
        System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());

        // 打印计数
        System.out.println("结束:" + totalRows.get());
    }
}

控制台输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:1,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:2,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:3,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:6,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:7,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:8,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:9,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:10,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:11,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:12,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:13,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:14,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:15,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:16,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:17,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:18,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:19,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:20,已执行结束的任务数目:0
线程池中线程数目:0,队列中等待执行的任务数目:0,已执行结束的任务数目:30
结束:150

可以看出,线程池最大并发数为10,也是我所期望的数值。最终输出的计数也是正确的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 为什么使用线程池 当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题...
    闽越布衣阅读 9,774评论 10 45
  • 小娟,感恩有你 小娟:生命中有你是我的福气,今天借着练习写作就写一封感谢信给你,借此机会来表达我的谢意! 说你名如...
    萍子2阅读 1,332评论 0 0
  • 我们的信任在现今的社会中正在被一点点的丢掉! 信任是一个非常有价值的无形的东西。一个人,一个公司,一个国家都需要信...
    大張冰阅读 721评论 4 6
  • by hfy 每日新闻一分钟:美国人最害怕什么健康问题?_国外媒体资讯 - 可可英语 A new study sh...
    123逍遥游阅读 864评论 0 0