CompletionService实际上可以看做是Executor和BlockingQueue的结合体。
CompletionService 依赖于一个单独的 Executor 来实际运行任务。
内部管理了一个堵塞队列来,在调用submit方法时,会向创建一个新的RunnableFuture,然后异步运行该RunnableFuture。当其状态变为done后,加入CompletionService的堵塞队列中,外部通过调用take()(堵塞)或者poll()(非堵塞,为空返回null)方法获取运行结果。
CompletionService接口,唯一实现类ExecutorCompletionService。
按照执行结果完成的先后顺序take。
CompletionService实现原理
ExecutorCompletionService在构造函数中会创建一个BlockingQueue(使用的基于链表的无界队列LinkedBlockingQueue),该BlockingQueue的作用是保存Executor执行的结果。
当计算完成时,调用FutureTask的done方法。
当提交一个任务到ExecutorCompletionService时,首先将任务包装成QueueingFuture,它是FutureTask的一个子类,然后改写FutureTask的done方法,之后把Executor执行的计算结果放入BlockingQueue中。
QueueingFuture的源码如下:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
...
}
ExecutorCompletionService主要是增强executor线程池的。
Task包装后被塞入completionQueue,当Task结束,其Future就可以从completionQueue中获取到。
ExecutorCompletionService原理图
CompletionService接口源码:
public interface CompletionService<V> {
// 提交
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
// 获取
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
例:向CompletionService中提交10个Task,当Task有任务返回则会优先从CompletionService内部的队列中获取到Task的Future。
package async;
import java.util.Random;
import java.util.concurrent.*;
/**
* CompletionService
* 应用举例1
*/
public class Main5 {
public static void main(String[] args) {
final ExecutorService pool = Executors.newFixedThreadPool(5);
CompletionService<Integer> completionService
= new ExecutorCompletionService<Integer>(pool);
for (int i=0;i<10;i++) {
completionService.submit(new Task(i));
}
System.out.println("begin get value....");
// 异步非阻塞队列获取结果
for (int i=0;i<10;i++) {
try {
Integer res = completionService.take().get();
System.out.println(res);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
System.out.println("end get value....");
pool.shutdown();
}
static class Task implements Callable<Integer> {
Integer value = 0;
public Task(int i) {
this.value = i;
}
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(new Random().nextInt(5) + 1);
System.out.println(Thread.currentThread().getName() + " finished..."
+ new Random().nextInt(5) + 1);
return new Random().nextInt(5) + 1;
}
}
}
public interface Executor {
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
}
public interface CompletionService<V> {
}
public class ExecutorCompletionService<V> implements CompletionService<V> {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
}