ThreadPoolExecutor 除了execute 方法用来提交任务,还有submit 方法,他们的区别就是后者有返回值,其原理是在前者的基础上使用了Future 接口。因此,我之前的示例改用submit 方法来实现会更加简单。原因有二:
1、submit 方法提交的任务有返回值,方便判断每个任务的最终运行结果,无需引入状态标识变量
2、future 的get方法是阻塞的,无需引入CountDownLatch 并发包工具也可以等待所有线程运行完毕
直接上代码
package com.yzy.test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class Main2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 线程安全的计数器
AtomicInteger totalRows = new AtomicInteger(0);
// 创建线程池,其中核心线程10,也是我期望的最大并发数,最大线程数和队列大小都为30,即我的总任务数
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));
// 创建 Future 集合,用来存储Future 实例
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < 30; i++) {
final int index = i;
// 这里创建 Callable 实例,返回整型
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
// 模拟异常的线程,返回0
if (index % 5 == 0) {
return 0;
}
int times = 0;
boolean loop = true;
// 模拟数据拉取过程可能需要分页
while (loop) {
// 模拟每个任务需要分页5次
if (times >= 5) {
break;
}
times++;
// 模拟计数
totalRows.incrementAndGet();
// 模拟耗时
try {
// 打印运行情况
System.out.println(Thread.currentThread().getName() + ":" + index + ":" + times);
Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 成功返回1
return 1;
}
});
futureList.add(future);
}
// 标记多线程关闭,但不会立马关闭
executor.shutdown();
boolean flag = true;
for (Future<Integer> future : futureList) {
// 获取每一个线程的返回结果,如果该线程未完成,会阻塞
if (future.get() == 0) {
flag = false;
}
}
if (flag) {
System.out.println("全部成功了,计数:" + totalRows.get());
} else {
System.out.println("失败了");
}
}
}
运行结果:
//省略若干行
pool-1-thread-7:24:5
pool-1-thread-3:29:3
pool-1-thread-3:29:4
pool-1-thread-5:28:3
pool-1-thread-6:26:4
pool-1-thread-6:26:5
pool-1-thread-5:28:4
pool-1-thread-5:28:5
pool-1-thread-3:29:5
失败了
然后注释这部分代码
if (index % 5 == 0) {
return 0;
}
再次运行:
//省略若干行
pool-1-thread-7:24:4
pool-1-thread-7:24:5
pool-1-thread-1:26:5
pool-1-thread-8:25:4
pool-1-thread-8:25:5
pool-1-thread-3:29:4
pool-1-thread-3:29:5
全部成功了,计数:150