1 CompletionService作用
使用Future和Callable可以获取线程执行结果,但获取方式确实阻塞的,根据添加到线程池中的线程顺序,依次获取,获取不到就阻塞。
为了解决这种情况,可以采用轮询的做法。
CompletionService可以异步非阻塞获取并行任务执行结果
2 案例
public class CompletionCase {
private final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
private final int TOTAL_TASK = Runtime.getRuntime().availableProcessors();
// 方法一,自己写集合来实现获取线程池中任务的返回结果
public void testByQueue() throws Exception {
long start = System.currentTimeMillis();
//统计所有任务休眠的总时长
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
//容器存放提交给线程池的任务,list,map,
BlockingQueue<Future<Integer>> queue =
new LinkedBlockingQueue<Future<Integer>>();
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
Future<Integer> future = pool.submit(new WorkTask("ExecTask" + i));
queue.add(future);//i=0 先进队列,i=1的任务跟着进
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = queue.take().get();///i=0先取到,i=1的后取到
System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms");
}
// 方法二,通过CompletionService来实现获取线程池中任务的返回结果
public void testByCompletion() throws Exception {
long start = System.currentTimeMillis();
AtomicInteger count = new AtomicInteger(0);
// 创建线程池
ExecutorService pool = Executors.newFixedThreadPool(POOL_SIZE);
CompletionService<Integer> cService = new ExecutorCompletionService<>(pool);
// 向里面扔任务
for (int i = 0; i < TOTAL_TASK; i++) {
cService.submit(new WorkTask("ExecTask" + i));
}
// 检查线程池任务执行结果
for (int i = 0; i < TOTAL_TASK; i++) {
int sleptTime = cService.take().get();
System.out.println(" slept "+sleptTime+" ms ...");
count.addAndGet(sleptTime);
}
// 关闭线程池
pool.shutdown();
System.out.println("-------------tasks sleep time "+count.get()
+"ms,and spend time "
+(System.currentTimeMillis()-start)+" ms");
}
public static void main(String[] args) throws Exception {
CompletionCase t = new CompletionCase();
t.testByQueue();
t.testByCompletion();
}
}
运行结果:
slept 6 ms ...
slept 825 ms ...
slept 792 ms ...
slept 264 ms ...
slept 896 ms ...
slept 783 ms ...
slept 549 ms ...
slept 117 ms ...
slept 287 ms ...
slept 642 ms ...
slept 769 ms ...
slept 425 ms ...
-------------tasks sleep time 6355ms,and spend time 899 ms
slept 10 ms ...
slept 71 ms ...
slept 180 ms ...
slept 186 ms ...
slept 205 ms ...
slept 368 ms ...
slept 373 ms ...
slept 429 ms ...
slept 497 ms ...
slept 548 ms ...
slept 693 ms ...
slept 775 ms ...
-------------tasks sleep time 4335ms,and spend time 776 ms
可以看出,使用了CompletionService,线程池返回的结果,是先返回则先获取,节省了资源,提高了效率。
而只使用线程池和Future的线程池执行结果则是按顺序阻塞获取的