一、场景描述
例如: 需要指定3个线程来处理1000条数据, 其中线程数、数据量是可变的。
二、 思路
- 针对数据量进行分片, 片的数量即线程数。
- 控制线程数可通过CountDownLatch来处理。
三、代码
1. 程序执行类
/**
* @ClassName : LimitThreadNumApp
* @Description : 限定线程的执行数,去执行大数据
* @Author : hack2012
* @Date: 2020-12-16 22:53
*/
public class LimitThreadNumApp {
Logger log = LoggerFactory.getLogger(getClass());
static final ExecutorService executorService = Executors.newFixedThreadPool(3);
@Test
public void testProcess() throws Exception{
//数据量
int size = 100;
//定义执行任务的线程数
int threadNum = 3;
process(threadNum, size);
}
public void process(int threadNum, int size) throws InterruptedException {
List<TaskModel> datas = initData(size);
//计算每个分片的大小
int shardSize = datas.size() / threadNum + 1;
CountDownLatch latch = new CountDownLatch(threadNum);
int cnt = 0;
for (int i = 0; i < threadNum; i++) {
List<TaskModel> processDatas = shardTask(datas, i, shardSize);
cnt += processDatas.size();
executorService.submit(new TaskCallable(processDatas, latch));
}
latch.await();
log.info("预期数据量:{};实际执行数据量:{}", size, cnt);
}
/** 任务分片 */
private List<TaskModel> shardTask(List<TaskModel> datas, int tnum, int size){
int start = tnum * size;
int end = (tnum + 1) * size;
if (end > datas.size()) {
end = datas.size();
}
return datas.subList(start, end);
}
private List<TaskModel> initData(int size){
List<TaskModel> datas = Lists.newArrayList();
for (int i = 1; i <= size; i++) {
TaskModel data = new TaskModel();
data.setId(String.valueOf(i));
datas.add(data);
}
return datas;
}
}
2. 定义任务模型
/**
* @ClassName : TaskModel
* @Description : 数据模型
* @Author : hack2012
* @Date: 2020-12-16 22:55
*/
public class TaskModel {
private String id ;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
}
3. 定义任务执行类
/**
* @ClassName : TaskCallable
* @Description : 任务执行器
* @Author : hack2012
* @Date: 2020-12-16 22:58
*/
public class TaskCallable implements Callable<Boolean> {
Logger log = LoggerFactory.getLogger(getClass());
List<TaskModel> datas;
CountDownLatch latch;
TaskCallable(List<TaskModel> datas, CountDownLatch latch) {
this.datas = datas;
this.latch = latch;
}
@Override
public Boolean call() throws Exception {
try {
int cnt = 0;
for (TaskModel data : datas) {
cnt++;
}
log.info("[{}]-线程执行数量:{}", Thread.currentThread().getName(), cnt);
latch.countDown();
return true;
} catch (Exception e) {
log.error(Thread.currentThread().getName() + "线程执行失败:{}", e.getMessage());
}
return false;
}
}
四、测试结果
1. 线程3, 数量 98
[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-线程执行数量:33
-[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-线程执行数量:32
-[2020-12-1623:36:55下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-线程执行数量:33
-[2020-12-1623:36:55下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)预期数据量:98;实际执行数据量:98
1. 线程3, 数量 99
[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-线程执行数量:31
-[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-线程执行数量:34
-[2020-12-1623:37:40下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-线程执行数量:34
-[2020-12-1623:37:40下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)预期数据量:99;实际执行数据量:99
1. 线程3, 数量 100
[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-1]-线程执行数量:34
-[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-2]-线程执行数量:34
-[2020-12-1623:38:18下午]:INFOthread.TaskCallable.call(TaskCallable.java:35)[pool-1-thread-3]-线程执行数量:32
-[2020-12-1623:38:18下午]:INFOthread.LimitThreadNumApp.process(LimitThreadNumApp.java:44)预期数据量:100;实际执行数据量:100