一、概述
CompletionService是JDK8中引入的的接口,目的是解决Future的缺点,因为在获取提交给定义ExecutorService线程池的批量任务结果时的返回值Future的get()方法是阻塞的,一旦前一个任务执行比较耗时,后续的任务调用get()方法就需要阻塞,从而形成排队等待的情况。而CompletionService是对定义ExecutorService进行了包装,把任务提交到一个队列中,先完成的任务结果,先保存到一个阻塞队列中,通过CompletionService的take()方法可以获取最先完成的任务结果。
二、示例说明
package com.www.pool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class FutureDemo {
public static void sleep(long time){
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//定义ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
//定义批量任务,每个任务的耗时不等
final List<Callable<Integer>> tasks = Arrays.asList(
() -> {
sleep(30L);
System.out.println("Task 30 completed done.");
return 30;
},
() -> {
sleep(10L);
System.out.println("Task 10 completed done.");
return 10;
},
() -> {
sleep(20L);
System.out.println("Task 20 completed done.");
return 20;
}
);
//批量提交执行异步任务,
try {
List<Future<Integer>> futures = executor.invokeAll(tasks);
futures.forEach(future -> {
try {
System.out.println("返回结果: " + future.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果:
Task 10 completed done.
Task 20 completed done.
Task 30 completed done.
返回结果: 30
返回结果: 10
返回结果: 20
Process finished with exit code 0
有以上执行结果可知,我们无法优先获取最先完成的任务的结果,而是等到耗时(30S)最长的任务接收后才可以返回结果。通过CompletionService接口就可以解决上面的问题。
示例如下:
package com.www.pool;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceDemo {
public static void sleep(long time){
try {
TimeUnit.SECONDS.sleep(time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
//定义ExecutorService
ExecutorService executor = Executors.newCachedThreadPool();
//定义批量任务,每个任务的耗时不等
final List<Callable<Integer>> tasks = Arrays.asList(
() -> {
sleep(30L);
System.out.println("Task 30 completed done.");
return 30;
},
() -> {
sleep(10L);
System.out.println("Task 10 completed done.");
return 10;
},
() -> {
sleep(20L);
System.out.println("Task 20 completed done.");
return 20;
}
);
//批量提交执行异步任务,
try {
CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);
tasks.forEach(completionService::submit);
for (int i = 0; i < tasks.size(); i++){
try {
System.out.println("返回结果: " + completionService.take().get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果如下:
Task 10 completed done.
返回结果: 10
Task 20 completed done.
返回结果: 20
Task 30 completed done.
返回结果: 30
有运行结果可知,最先完成的任务结果最先返回。
三、原理分析
1、CompletionService接口方法
Future<V> submit(Callable<V> task); 提交有返回结果的Callable类型任务
Future<V> submit(Runnable task, V result); 提交无返回结果Runnable类型任务,返回结果是通过传入result引用获取结果。
Future<V> take() throws InterruptedException; 获取结果,如果队列中没有则阻塞
Future<V> poll(); 获取结果,如果队列中没有则返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException; 在等待指定时间内没有结果则返回null。
2、CompletionService实现类
此接口只有一个实现了为ExecutorCompletionService。此实现类很简单,
构造方法
public ExecutorCompletionService(Executor executor);
和
ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue);
有构造方法可知,真正执行任务的还是Executor类,通过阻塞队列存储结果。
默认队列为 LinkedBlockingQueue。
在ExecutorCompletionService中定义了QueueingFuture内部类,此类又继承了FutureTask,在此类中复写了类中的 done()方法。如下所示
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
在done()方法中把,执行结果放入了队列中。
从此而实现了优先执行完的任务,最先放入到队列中。这样就可以通过对了的take()或poll()方法就可以获取最先完成的任务结果。