记录下我自己工作中遇到的一个实际问题。
场景
遍历一个参数集合,并请求第三方API拉取数据,由于总数据量过大(过亿/天),所以运行时长接近30小时,不能满足业务方需求。为此,我使用线程池改造成并发拉取来提升拉取速度。
目标
1.
多线程并发拉取,但是考虑到第三方有qps限制,不能一味增加线程数,所以要设定最大并发上限。
2.
任务结束后记录拉取总数。
实现
多线程的介绍可以参见大神的博文 Java并发编程:线程池的使用.。
为了实现始终以不超过最大并发数目的线程数并发执行:
1、可以用线程池封装好的Executors.newFixedThreadPool(corePoolSize);
2、自定义:可以设置核心线程数corePoolSize
为我期望的最大并发数,并且设置队列大小和maximumPoolSize
不小于我的总任务数(直接设置为总任务数即可)。这很重要,根据线程池的特性,这里直接引用作者原话:
如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
如果队列长度设置小了,很容易满员,此时如果提交的任务数未超过maximumPoolSize
,则线程池会继续创建新的线程,这就导致线程数超过我期望的最大并发数,因此队列大小必须大于等于我的总任务数。
还有,如果maximumPoolSize
小于总任务数,可能会抛异常,原话:
如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
所以maximumPoolSize
必须大于等于我的总任务数。
为了实现任务结束后记录总数,可以借助并发包工具类CountDownLatch
,因为多线程是异步的,CountDownLatch
可以阻塞当前线程,直到所有子线程完成。
下面是一个例子
package com.yzy.test;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Main1 {
public static void main(String[] args) throws InterruptedException {
// 线程安全的计数器
AtomicInteger totalRows = new AtomicInteger(0);
// 创建线程池,其中核心线程10,也是我期望的最大并发数,最大线程数和队列大小都为30,即我的总任务数
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
//或者ExecutorService executor = Executors.newFixedThreadPool(corePoolSize);
// 初始化CountDownLatch,大小为30
CountDownLatch countDownLatch = new CountDownLatch(30);
// 模拟遍历参数集合
for (int i = 0; i < 30; i++) {
// 往线程池提交任务
executor.execute(new Runnable() {
@Override
public void run() {
int times = 0;
// 模拟数据拉取过程可能需要分页
while (true) {
// 模拟每个任务需要分页5次
if (times >= 5) {
break;
}
times++;
// 模拟计数
totalRows.incrementAndGet();
try {
// 模拟耗时
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 子线程完成,countDownLatch执行countDown
countDownLatch.countDown();
}
});
// 打印线程池运行状态
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());
}
// 标记多线程关闭,但不会立马关闭
executor.shutdown();
// 阻塞当前线程,知道所有子线程都执行countDown方法才会继续执行
countDownLatch.await();
// 打印线程池运行状态
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());
// 打印计数
System.out.println("结束:" + totalRows.get());
}
}
控制台输出:
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:1,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:2,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:3,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:6,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:7,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:8,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:9,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:10,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:11,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:12,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:13,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:14,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:15,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:16,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:17,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:18,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:19,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:20,已执行结束的任务数目:0
线程池中线程数目:0,队列中等待执行的任务数目:0,已执行结束的任务数目:30
结束:150
可以看出,线程池最大并发数为10,也是我所期望的数值。最终输出的计数也是正确的。