本部分介绍Java 8 中提供的具备异步回调能力的工具类——CompletableFuture,该类实现了Future接口,还具备函数式编程能力。
CompletableFuture详解
CompletableFuture类实现了Future和CompletionStage两个接口,该类的实例作为一个异步任务。
CompletionStage接口
CompletionStage代表某个同步或者异步计算的一个阶段,或者一些列异步任务中的一个子任务。
每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个函数式接口特点如下:
- Function接口的特点是有输入、有输出。
- Runnable接口的特点是无输入、无输出。
- Consumer接口的特点是有输入、无输出。
多个CompletionStage构成了一条任务流水线,多个子任务之间可以使用链式调用,下面是个简单的例子:
oneStage.thenApply(x -> square(x))
.thenAccept(y -> System.out.println(y))
.thenRun(() -> System.out.println())
上例子说明如下:
- oneStage是一个CompletionStage子任务。
-
x -> square(x)
是一个Function类型的Lambda表达式,被thenApply方法包装成了CompletionStage子任务,它又包含输入和输出。 -
y -> System.out.println(y)
是一个Consumer类型的Lambda表达式,被thenAccept包装成了一个CompletionStage子任务,它只有输入(即上个任务的输出)。 -
() -> System.out.println()
是一个Runnable类型的Lambda表达式,被thenRun方法包装成了一个CompletionStage子任务,它没有输入输出。
使用runAsync和supplyAsync创建子任务
CompletableFuture定义了一组用于创建CompletionStage子任务的方法。
// 子任务包装一个Runnable实例,并调用ForkJoinPool。commonPool线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)
// 子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
// 子任务包装一个Supplier实例,并调用ForkJoinPool。commonPool线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
// 子任务包装一个Supplier实例,并调用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
执行子任务时,如果没有执行Executor线程池,默认情况下会使用公共的ForkJoinPool线程池。
设置子任务回调钩子
可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的钩子。
设置子任务回调钩子的主要函数如下:
// 设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throeable> action)
// 设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
// 设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
// 设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Fuction<Throwable, ? extends T> fn)
下面看个简单例子:
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("抛出异常");
throw new RuntimeException("发生异常");
});
// 设置异步任务执行完成后的回调钩子
future.whenComplete(new BiConsumer<Void, Throwable>() {
@Override
public void accept(Void aVoid, Throwable throwable) {
System.out.println("执行完成");
}
});
future.exceptionally(new Function<Throwable, Void>() {
@Override
public Void apply(Throwable t) {
System.out.println("执行失败:" + t.getMessage());
return null;
}
});
future.get();
}
}
运行结果:
抛出异常
执行完成
执行失败:java.lang.RuntimeException: 发生异常
调用cancel方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally方法所设置的异常回调钩子也会被执行。
如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:
- 在调用get方法启动任务时,如果遇到内部异常,get方法就会抛出ExecutionException。
- 在调用join和getNow启动任务时(大多数情况下都是如此),如果遇到内部异常,会抛出CompletionException。
调用handle方法统一处理异常和结果
除了分别通过whenComplete、exceptionally设置完成钩子、异常钩子之外,还可以调用handle方法统一处理结果和异常。
handle方法有3个重载版本:
// 在执行任务的同一个线程中处理异常和结果
public<U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);
// 可能不再执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
// 在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
将前面的例子改成handle版本。
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("抛出异常");
throw new RuntimeException("发生异常");
});
future.handle(new BiFunction<Void, Throwable, Void>() {
@Override
public Void apply(Void input, Throwable throwable) {
if(throwable == null) {
System.out.println("没有发生异常");
} else {
System.out.println("发生了异常");
}
return null;
}
});
future.get();
}
}
线程池的使用
默认情况下,通过静态方法runAsync和supplyAsync创建的CompletableFuture任务会使用公共的ForkJoinPool线程池。默认的线程数是CPU的核数。
如果所有CompletableFuture任务共享一个线程池,那么一旦有任务执行一些很慢的IO操作,会导致所有线程阻塞,造成线程饥饿。所以建议大家根据不同的业务类型创建不同的线程池。
异步任务的串行执行
如果两个异步任务需要串行,可以通过CompletionStage接口的thenApply、thenAccept、thenRun和thenCompose方法实现。
theApply方法
theApply方法有三个重载版本,声明如下:
// 后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)
// 后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)
// 后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)
参数fn表示要串行执行的第二个异步任务,泛型参数T是上一个任务所返回结果的类型,泛型参数U是当前任务的返回值类型。
看个简单例子。
public class ThenApplyDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long firstStep = 10L + 10L;
System.out.println("first step outcome is " + firstStep);
return firstStep;
}
}).thenApplyAsync(new Function<Long, Long>() {
@Override
public Long apply(Long aLong) { // 参数是第一步的结果
long secondStep = aLong * 2;
System.out.println("Second outcome is " + secondStep);
return secondStep;
}
});
long result = future.get();
System.out.println("outcome is " + result);
}
}
thenRun方法
thenRun方法不关心任务的处理结果,只需要前一个任务执行完成,就开始执行后一个串行任务。
thenApply方法也有三个重载版本,声明如下:
// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenRun(Runnable action);
// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Runnable action);
// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);
thenAccept方法
thenAccept方法接收前一个任务的处理结果,但是没有输出。
thenAccept方法有三个重载版本,声明如下:
// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action);
// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action, Executor executor);
thenCompose方法
thenCompose方法在第一个任务操作完成时,将它的结果作为参数传递给第二个任务。
thenCompose方法有3个重载版本,声明如下:
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U> > fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn, Executor executor);
thenCompose方法要求第二个任务的返回值是一个CompletionStage异步实例。
将前面的例子改成theCompose版本:
public class ThenComposeDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long firstStep = 10L + 10L;
System.out.println("first step outcome is " + firstStep);
return firstStep;
}
}).thenCompose(new Function<Long, CompletionStage<Long>>() {
@Override
public CompletionStage<Long> apply(Long firstStepOutcome) {
return CompletableFuture.supplyAsync(new Supplier<Long>() {
@Override
public Long get() {
long secondStep = firstStepOutcome * 2;
System.out.println("Second outcome is " + secondStep);
return secondStep;
}
});
}
});
long result = future.get();
System.out.println("outcome is " + result);
}
}
异步任务的合并执行
对两个异步任务合并可以通过CompletionStage接口的thenCombine、runAfterBoth和thenAcceptBoth三个方法来实现。
thenCombine方法
thenCombine会在两个任务都执行完成后,把两个任务的结果一起交给thenCombine来处理。
public <U, V> CompletionStage<V> thenCombine(
CompletionStage<? extends U> other, // 待合并实例
BiFunction<? super T, ? super U, ? extends V> fn);
public <U, V> CompletionStage<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn
);
public <U, V> CompletionStage<V> thenCombineAsync(
CompletionStage<? extends U> other,
BiFunction<? super T, ? super U, ? extends V> fn,
Executor executor
);
下面看一个使用thenCombine分三步计算(10+10)*(10+10)
的例子:
public class ThenCombineDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer firstStep = 10 + 10;
System.out.println("firstStep outcome is " + firstStep);
return firstStep;
}
});
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer secondStep = 10 + 10;
System.out.println("secondStep outcome is " + secondStep);
return secondStep;
}
});
CompletableFuture<Integer> future3 = future1.thenCombine(future2,
new BiFunction<Integer, Integer, Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer * integer2;
}
});
Integer result = future3.get();
System.out.println(" outcome is " + result);
}
}
runAfterBoth方法
runAfterBoth方法不关心没异步任务的输入参数和处理结果。
public CompletionStage<Void> runAfterBoth(
CompletionStage<?> other, Runnable action
);
public CompletionStage<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action
);
public CompletionStage<Void> runAfterBothAsync(
CompletionStage<?> other, Runnable action, Executor executor
);
thenAcceptBoth方法
thenAcceptBoth方法可以接收前两个任务的处理结果,但是第三个任务却不返回结果。
public <U> CompletionStage<Void> thenAcceptBoth(
CompletionStage<? extends U> other,
BiConusmer<? super T, ? super U> action
);
public <U> CompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConusmer<? super T, ? super U> action
);
public <U> CompletionStage<Void> thenAcceptBothAsync(
CompletionStage<? extends U> other,
BiConusmer<? super T, ? super U> action,
Executor executor
);
allOf等待所有的任务结束
allOf会等待所有任务结束,以合并所有任务。
异步任务的选择执行
对有两个异步任务的选择可以通过CompletionStage接口的applyToEither、runAfterEither和acceptEither三个方法来实现。
applyToEither方法
两个CompletionStage谁返回结果的速度快,applyToEither方法就用这个结果进行下一步操作。
// 和other任务返回较快的结果用于执行fn回调函数
public <U> CompletionStage<U> applyToEither(
CompletionStage<? extends T> other, Function<? super T, U> fn
);
// 功能与上一个相同,但是不一定在同一个线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn
);
// 功能与上一个相同,在指定线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor
);
看个例子。
public class ApplyToEitherDemo {
public static void main(String[] args) throws Exception{
CompletableFuture<Integer> future1 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer firstStep = 10 + 10;
System.out.println("firstStep outcome is " + firstStep);
return firstStep;
}
});
CompletableFuture<Integer> future2 =
CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
Integer secondStep = 100 + 100;
System.out.println("secondStep outcome is " + secondStep);
return secondStep;
}
});
CompletableFuture<Integer> future3 =
future1.applyToEither(future2,
new Function<Integer, Integer>() {
@Override
public Integer apply(Integer integer) {
return integer;
}
});
Integer result = future3.get();
System.out.println("outcome is " + result);
}
}
runAfterEither方法
runAfterEither方法的功能为前面两个CompletionStage实例,任何一个执行完成都会执行第三部回调。
// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> runAfterEither(
CompletionStage<?> other, Runnable fn
);
// 功能与上一个相同,但是不一定在同一个线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable fn
);
// 功能与上一个相同,在指定线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
CompletionStage<?> other, Runnable fn, Executor executor
);
acceptEither方法
acceptEither用哪个最快的CompletionStage的结果作为下一步的输入,但是第三步没有输出。
// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> acceptEither(
CompletionStage<?> other, Consumer<? super T> fn
);
public CompletionStage<Void> acceptEitherAsync(
CompletionStage<?> other, Consumer<? super T> fn
);
public CompletionStage<Void> acceptEitherAsync(
CompletionStage<?> other, Consumer<? super T> fn, Executor executor
);
CompletableFuture的综合案例
使用CompletableFuture实现喝茶案例。
public class CompletableFutureDemo2 {
private static final int SLEEP_GAP = 3;
public static void main(String[] args) {
// 洗水壶->烧开水
CompletableFuture<Boolean> hotJob =
CompletableFuture.supplyAsync(() -> {
System.out.println("洗好水壶");
System.out.println("烧开水");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("水开了");
return true;
});
// 洗茶杯->那茶叶
CompletableFuture<Boolean> washJob =
CompletableFuture.supplyAsync(() -> {
System.out.println("洗茶杯");
try {
Thread.sleep(SLEEP_GAP);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("洗完了");
return true;
});
CompletableFuture<String> drinkJob =
hotJob.thenCombine(washJob, (hotOK, washOk) -> {
if(hotOK && washOk) {
System.out.println("泡茶喝,茶喝完");
return "茶喝完了";
}
return "没有喝到差";
});
System.out.println(drinkJob.join());
}
}