public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
根据其实现的接口可知,它有两个特点:
- Future,表示异步执行的结果
- CompletionStage,表示整个异步计算的一个阶段,它能被其它的stage触发,也能触发其它的stage。CompletionStage的组合可以分为3种(假设有3个CompletionStage,分别为1、2、3):
- then,关系为 2依赖于1、3依赖于2,则1、2、3顺序执行
- both,关系为 3依赖于1、3依赖于2,则1、2都执行完后,3再执行
- either,关系为 3依赖于1、3依赖于2,则1、2只要有一个执行完后,3就执行
根据上面的依赖关系,所以CompletionStage的方法也分为3类,方法名中有combine、both的方法表示both关系,方法名中有either的方法表示either关系,其余的就是then关系
volatile Object result;
表示异步执行的结果,当result为null,表示该阶段还未执行完成,当result为AltResult且AltResult的Throwable ex不为null,表示该阶段执行异常,其它则为执行成功
1.1 构造CompletableFuture
new CompletableFutur<T>()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
下面的4个方法会被直接提交并执行,Supplier是有返回值的,Runnable没有返回值
1.2 获取执行结果
public T get() throws InterruptedException, ExecutionException
public T join()
阻塞式的获取任务结果,并不推荐这么做,因为会阻塞调用线程,从异步变成同步了,那跟Future没什么区别了,需要使用回调函数来处理
1.3 触发回调函数
public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
还有其它的一些以complete开头的方法,最常用的就是上面2种,它会将result从null变为指定值,然后触发依赖任务的执行
1.4 设置回调函数
一个回调函数对应一个CompletableFuture
1.4.1 then关系
// 没有返回值
public CompletableFuture<Void> thenRun(
Runnable action)
public CompletableFuture<Void> thenRunAsync(
Runnable action)
public CompletableFuture<Void> thenRunAsync(
Runnable action, Executor executor)
// 没有返回值
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
Executor executor);
// 有返回值
public <U> CompletableFuture<U> thenApply(
Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(
Function<? super T,? extends U> fn, Executor executor)
// 有返回值
public <U> CompletableFuture<U> thenCompose(
Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(
Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)
这些方法都是实现的CompletionStage中定义的方法,他描述的是一种串行的关系

串行关系
- 假设用户创建了CompletableFuture1,然后调用了2次thenApply方法,则上图描述了最后的依赖关系,即当CompletableFuture1完成后,则先触发函数2的执行,再触发则先触发函数1的执行,是一种栈的结构,此时获取最后结果要从CompletableFuture2中获取
1.4.2 both关系
除了串行的关系,CompletionStage中还定义了描述AND关系的方法
// 有返回值
public <U,V> CompletableFuture<V> thenCombine(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T,? super U,? extends V> fn, Executor executor)
// 无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConsumer<? super T, ? super U> action, Executor executor)
// 无返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
Runnable action, Executor executor)
上面的方法都表示当调用者future与other执行完成后,才会执行传入的函数。
1.4.3 either关系
CompletionStage中还定义了描述OR关系的方法
// 有返回值
public <U> CompletableFuture<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn,
Executor executor)
// 无返回值
public CompletableFuture<Void> acceptEither(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
CompletionStage<? extends T> other, Consumer<? super T> action,
Executor executor)
// 无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
Runnable action, Executor executor)
上面的方法都表示当调用者future与other任意一个执行完成后,才会执行传入的函数。
1.5 异常处理
下面的方法用来做异常处理,如果把回调函数比作try中的代码块,那么这个方法就是catch中的代码块
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn) {
return uniExceptionallyStage(fn);
}
1.6 finally功能
下面的方法可以用来做收尾工作,相当于finally中的代码块,一定会被执行;当被依赖的任务报错,后续依赖任务的函数不会执行(除exceptionally),只是将对应的CompletableFuture的result被设为AltResult(被依赖任务的异常)
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
public <U> CompletableFuture<U> handle(
BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
1.7 执行线程
- 所有方法名中带有Async的方法,如果不指定executor,则使用默认线程池
ForkJoinPool#commonPool(),线程个数为核数(可通过-Djava.util.concurrent.ForkJoinPool.common.parallelism指定),但是如果线程个数小于2,则不使用,而是每一个函数或任务都会创建一个线程 - 如果使用不带Async的方法,则会在设置了值(比如调用了complete、completeExceptionally)的线程中执行任务
- 建议,对于执行时间可能较长的函数或任务,比如远程的RPC,尽量使用async + 指定
Executor的方式,因为设置了Executor,则提交的任务在指定的线程池中运行,这样做的好处- 对于使用默认线程池,避免默认的线程池中有慢任务执行使线程耗尽,导致新提交的任务积压而无法快速执行
- 对于使用不带Async的方法,避免影响调用线程中的代码执行