使用教程
为保持简洁,以下不介绍带Executor参数的的方法。
当然,在CompletableFuture使用过程中还是要自定义线程池。
开始任务
这两个方法,根据需要选取其中之一
- 异步执行Runnable,无返回值
public static CompletableFuture<Void> runAsync(Runnable runnable);
- 异步执行supplier,有返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
执行下一个任务
在前一个CompletableFuture对象执行之后:
- 执行Runnable
public CompletableFuture<Void> thenRun(Runnable action);
- 执行Consumer,前一个CompletableFuture的执行结果用于消费
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
- 执行Function,前一个CompletableFuture的执行结果用于消费,并提供一个新的返回值。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
结果处理
- CompletableFuture 执行的过程中不会抛出异常,可通过此方法去处理异常。
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
- 多了CompletableFuture对象的返回值当作入参,用于BiConsumer。
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
- 通过BiFunction可自定义返回的CompletableFuture。
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
获取结果
常用的获取CompletableFuture结果方式有以下几种。
- join() :阻塞当前线程,只会抛出RuntimeException的异常,不用特意try catch 比较方便。
- get() :阻塞当前线程,InterruptedException,ExecutionException。
- get(long timeout, TimeUnit unit):阻塞当前线程,相比get()多设置个超时时间,当然也会多个TimeoutException。
- getNow(T valueIfAbsent):立刻获取结果,无阻塞,任务未完成返回默认值。
并发编排
可以大致分为三种类型
- allOf()、anyOf()。CompletableFuture的静态方法,合并多个CompletableFuture为一个新的CompletableFuture。
// 当所有传入的CompletableFuture都完成后,返回一个新的CompletableFuture
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
// 当传入的CompletableFuture其中之一完成后(产生异常异常也算完成),返回一个新的CompletableFuture
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- public 方法,如xxxBoth , xxxEither ,针对当前CompletableFuture对象和传人的CompletableFuture,传入传入的执行方法,最终返回一个新的CompletableFuture。
- 为当前CompletableFuture对象以及入参数CompletableFuture完成之后执行
// 执行 BiFunction
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
// 执行 BiConsumer
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
// 执行 Runnable
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action)
- 当前CompletableFuture对象以及入参数CompletableFuture其中之一完成之后执行
// 执行Function
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
// 执行Consumer
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
// 执行Runnable
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
使用场景
A任务返回1,B任务返回2,C任务返回3,D任务返回4。
AB并行,返回结果汇总
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture
.supplyAsync(() -> 1)
.thenCombine(CompletableFuture.supplyAsync(() -> 2), Integer::sum);
Integer sum = completableFuture.join();
System.out.println(sum);
}
AB并行,返回执行快的任务的结果
public static void main(String[] args) {
CompletableFuture<Integer> completableFuture = CompletableFuture
.supplyAsync(() -> 1)
.applyToEither(CompletableFuture.supplyAsync(() -> 2), Function.identity());
Integer first = completableFuture.join();
System.out.println(first);
}
多任务复杂场景
任务1: 执行A,之后并发执行BC,之后再执行C
任务2: 执行A,之后执行BC,BC中其中有一个完成后之后再执行C
当两个任务完成之后执行D,任务总超时时间设置为1s,超时返回0
image.png
public static void main(String[] args) {
// 任务1
CompletableFuture<Integer> completableFuture1 = CompletableFuture
.supplyAsync(() -> 1)
.thenApplyAsync(cf -> {
CompletableFuture<Integer> completableFuture = CompletableFuture
.supplyAsync(() -> 2)
.thenCombine(CompletableFuture.supplyAsync(() -> 3), (cf1, cf2) -> cf1 + cf2 + cf);
return completableFuture.join();
})
.thenApplyAsync(cf -> CompletableFuture.supplyAsync(() -> 3 + cf).join());
// 任务2
CompletableFuture<Integer> completableFuture2 = CompletableFuture
.supplyAsync(() -> 1)
.thenApplyAsync(cf -> {
CompletableFuture<Integer> completableFuture = CompletableFuture
.anyOf(CompletableFuture.supplyAsync(() -> 2), CompletableFuture.supplyAsync(() -> 3))
.thenApply(cf2 -> (Integer) cf2 + cf);
return completableFuture.join();
})
.thenApplyAsync(cf -> CompletableFuture.supplyAsync(() -> 3 + cf).join());
// 等待1 2 完成
CompletableFuture<Integer> sumCF = completableFuture1.thenCombine(completableFuture2, Integer::sum);
try {
System.out.println(sumCF.get(1, TimeUnit.SECONDS));
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println(0);
e.printStackTrace();
}
}