1 CompletionService接口
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
2 ExecutorCompletionService.QueueingFuture内部类
ExecutorCompletionService.QueueingFuture继承了FutureTask。
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
// 将task对象(之前包装好的FutureTask对象)包装成QueueingFuture对象
// 执行QueueingFuture对象中的run方法-->执行QueueingFuture对象中的callable中的call方法,将call方法的返回值保存到QueueingFuture对象中的outcome
// 执行QueueingFuture对象中的callable中的call方法-->执行task对象(之前包装好的FutureTask对象)中的run方法,返回null
super(task, null);
this.task = task;
}
// 重写FutureTask中的done方法
// 将task对象(之前包装好的FutureTask对象)添加到completionQueue,此时提交的任务执行完毕
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
3 ExecutorCompletionService
ExecutorCompletionService实现了CompletionService接口。
3.1 ExecutorCompletionService中的字段
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
3.2 ExecutorCompletionService中的构造方法
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
// aes只有两钟可能
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
// aes只有两钟可能
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
3.3 ExecutorCompletionService中的submit方法
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
// 将提交的任务包装成FutureTask对象
RunnableFuture<V> f = newTaskFor(task);
// 执行execute方法-->在executor中执行创建的QueueingFuture对象中的run方法
// 执行创建的QueueingFuture对象中的run方法-->执行创建的QueueingFuture对象中的callable中的call方法,将call方法的返回值保存到创建的QueueingFuture对象中的outcome
// 执行创建的QueueingFuture对象中的callable中的call方法-->执行f对象中的run方法,返回null
// 执行f对象中的run方法-->执行f对象中的callable中的call方法,将call方法的返回值保存到f对象中的outcome,执行done方法将f对象添加到completionQueue
// 执行f对象中的callable中的call方法-->执行task对象中的call方法
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
// 将提交的任务包装成FutureTask对象
RunnableFuture<V> f = newTaskFor(task, result);
// 执行execute方法-->在executor中执行创建的QueueingFuture对象中的run方法
// 执行创建的QueueingFuture对象中的run方法-->执行创建的QueueingFuture对象中的callable中的call方法,将call方法的返回值保存到创建的QueueingFuture对象中的outcome
// 执行创建的QueueingFuture对象中的callable中的call方法-->执行f对象中的run方法,返回null
// 执行f对象中的run方法-->执行f对象中的callable中的call方法,将call方法的返回值保存到f对象中的outcome,执行done方法将f对象添加到completionQueue
// 执行f对象中的callable中的call方法-->执行task对象中的run方法,返回给定的result
executor.execute(new QueueingFuture(f));
return f;
}
3.3.1 ExecutorCompletionService中的newTaskFor方法
// 将提交的任务包装成FutureTask对象
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
// 将提交的任务包装成FutureTask对象
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
3.3.1.1 AbstractExecutorService中的newTaskFor方法
// 将提交的任务包装成FutureTask对象
// 子类中可以重写newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
// 将提交的任务包装成FutureTask对象
// 子类中可以重写newTaskFor方法
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
3.4 ExecutorCompletionService中的take方法
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
3.5 ExecutorCompletionService中的poll方法
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}