01 CompletionService 简介
在上一篇内容中,介绍了 Future 的使用,其中也涉及到了 Future 的不足,就是当通过 get() 方法获取线程的返回值的时候,会导致阻塞,由于阻塞很多时候这回倒置性能问题。而 JDK 中提供的另外一个工具类能够帮助我们缓解或是解决阻塞的问题。
CompletionService 的作用:
CompletionService 接口解决 Future 阻塞的问题。Completion 的实现类有 ExecutorCompletionService。
CompletionService 实现内能够一边处理 submit 的线程的任务,一边处理已完成任务的结果。这样就可以将执行任务与处理任务分离开来进行处理。
使用 submit() 执行任务,使用 take()取得已完成的任务(Future),并按照完成这些任务的时间顺序处理它们的结果。
02 常用方法
-
submit(Runnable task, V result)
作用是来执行线程任务。 -
take()
作用是获取先完成任务的 Future 对象,take()获取到的 Future 对象在调用 get()方法的时候还是呈现阻塞特性。 -
poll()
作用是获取并移除表示下一个已完成任务的 Future,如果不存在这样的任务,则返回 null,poll() 方法没有阻塞效果。 -
poll(long timeout, TimeUnit unti)
作用是等待指定的时间(timeout + unti),如果在 timeout 时间内获得值时立即向下继续执行,如果超过时也立即向下执行。 -
异常处理
。在某个线程内抛出异常,如果只是通过 take() 方法获取到 Future 对象,那么无法在 console 中得到相关的异常信息。只有调用 Future 对象的 get() 方法才能获得异常栈信息。
03 案例
下面是一个例子,用来演示,take()方法返回的顺序是按照执行玩的先后顺序。
Worker.java
public class Worker implements Callable<String> {
private final int index;
public Worker(int index) {
this.index = index;
}
@Override
public String call() throws Exception {
String threadName = String.format("name-%d", index);
System.out.println(threadName + " is working.");
Thread.sleep(1000 * index);
return threadName + " is Done";
}
}
Company.java
public class Company {
public static void main(String[] args) {
List<Worker> workers = new ArrayList<>();
for (int i = 6; i >= 1; i--) {
workers.add(new Worker(i));
}
ExecutorService executorService = Executors.newCachedThreadPool();
CompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
workers.forEach(completionService::submit);
workers.forEach(worker -> {
try {
String result = completionService.take().get();
System.out.println("result is " + result);
} catch (InterruptedException | ExecutionException e) {
System.out.println("Getting result fail.");
}
});
}
}