以前对线程池的理解大多处于理论阶段,知道为什么要用以及怎么用,各个参数的含意设置及原理,也明白submit和excute的区别,但是在实际工作时遇到对异步任务处理时却不知道怎么先批量提交到线程池,利用线程池批量处理后再批量获取返回结果,通过同事指点再知道invokeAll方法,因此看了一下源码并写了一个简单的demo
invokeAll是AbstractExecutorService抽象类下的一个方法,源码如下, 该方法主要是接受一个异步任务列表,这段代码主要是有两个for循环,第一个for循环是将列表中的任务添加到线程池运行起来,第二个for循环是通过f.get来阻塞所有未完成的任务直到完成,最后finally里面尝试取消出现异常完成的任务。
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
使用线程池调用invokeAll的demo:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ThreadPoolDemo {
public static void main(String[] args) {
new ThreadPoolDemo().execute();
}
public void execute(){
ThreadPoolExecutor executor = new ThreadPoolExecutor(12, 12, 2000, TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<Runnable>(1000));
List<Handler> list = new ArrayList<>();
for(int i = 0; i < 100; i++){
list.add(new Handler(String.valueOf(i)));
}
try {
//会按照添加到List的次序返回,该方法阻塞至所有任务完成
List<Future<String>> results = executor.invokeAll(list);
results.stream().forEach( future -> {
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
//关闭线程池
executor.shutdown();
}
private class Handler implements Callable<String> {
//模拟传参,实际可为对象类型
String value;
public Handler(String value){
this.value = value;
}
public String call(){
try {
//模拟任务的处理过程0-1000ms
Thread.sleep((int)(Math.random()*1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
//模拟返回参数,实际可为对象类型
System.out.println(value);
return value + "value";
}
}
}
运行结果:
如上图所示,所有任务添加到线程池后执行并没有次序,但最后返回的结果是按照最初添加list的顺序,即利用了线程池并发处理的特点,又有序获取到了所有异步任务的最终执行结果。