这几天稍微花时间学习了一下 java 8 提供的处理异步调用的一个 future 类 CompletableFuture
.
这是一个针对函数式的 future 对象.用来让我们更加优雅的使用异步调用.
简述
CompletableFuture
是Future
接口的实现类.内部只有两个属性:result
和stack
两个值,分别表示对象所存储的内容以及对象现在的完成状况.
由于CompletableFuture
也是 future 对象的一部分,所以它也有 future 的相关特性.博主在这里认为它最终要的内容就是他的执行时间,就是:
CompletableFuture 只在对象创建的时候才开始尝试执行
创建对象
CompletableFuture
有两类最基本的创建对象的静态方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
- supplyAsync 表示需要传入一个
Supplier
类型的 lambda 表达式来创建一个新的对象- runAsync 方法表示需要传入一个
Runnable
类型的对象来创建一个新的对象- supplyAsync 返回的对象带有生成值,而 runAsync 方法返回的对象只能用来确认该操作是否已经完成而不能获取到内容.在这一点上和
Runnable
的情况有些类似- 两个方法都有对应的重写方法, 需要传入线程池,如果不传入线程池,则会默认使用
ForkJoinPool
线程池
获取对象内容
CompletableFuture
有几个相关方法,包括:
public T get() throws InterruptedException, ExecutionException;
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
public T join();
public T getNow(T valueIfAbsent);
- get 方法会使当前线程阻塞,并且等待直到 future 完成,并且将返回 future 的值
- 带参数的 get 方法需要传入对应的时间长度,一旦超出这个时间,会抛出
TimeoutException
其中
TimeUnit
是一个表示时间类型的枚举,最小单位:纳秒,最大单位:日
[NANOSECONDS(纳秒), MICROSECONDS(微秒), MILLISECONDS(毫秒), SECONDS(秒), MINUTES(分), HOURS(时), DAYS(日)]
- join 方法与 get 并无太大差别,但 join 方法不能被打断.(暂时没搞清楚)
- getNow 方法不会阻塞线程,而是立即返回值,如果该 future 当前没有完成,则会立刻返回该方法的传入参数.
- get 和 join 方法会在正常的情况下返回值,在遇到异常的情况下将异常抛出
完成对象
CompletableFuture
有两个完成方法:
public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
public boolean cancel(boolean mayInterruptIfRunning);
- complete 方法将future状态置为已完成,并且将参数注入,但如果这个future已经完成了,则不会产生任何变化
- completeExceptionally 方法将future状态置为已完成,并且将异常参数注入,并且在get的时候回获取这个异常,但如果这个future已经完成了,则不会产生任何变化
- cancel 方法会取消 future 对象,并且置入一个 CancellationException. 参数表示是否会打断 future 的执行(目前没有测试过)
检验方法
public boolean isDone();
public boolean isCancelled();
public boolean isCompletedExceptionally();
- isDone 用来返回 future 对象是否已经完成
- isCancelled 用来返回 future 对象是否已经被取消
- isCompletedExceptionally 用来返回 future 是否出现了异常
then 方法
CompletableFuture
有一系列延续方法,大部分都以 then 开头.
- 都表示在 future 对象已完成之后进行某种操作.
- 方法都会返回一个新的
CompletableFuture
对象 - 方法的
Executor
参数表示是否需要传入一个线程池来开启新线程操作 -
CompletableFuture
对象可以嵌套 - lambda中可以抛出 RuntimeException .不可以抛出 Exception.
//该方法的传入一个 T -> U 的Function<T,U>, 并且返回 <U> 的CompletableFuture
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
//传入一个 Consumer 函数来完成. 返回值没有内容
public CompletableFuture<Void> thenAccept(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
//与 thenAccept 方法相同,但是传入参数为 Runnable
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor);
//thenCombine 针对两个 future 进行操作.
//需要传入另一个 future 以及 一个 BiFunction<T,U,V> ,返回一个新的 future.
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor);
//thenAcceptBoth 同时等待两个 future ,并且返回一个空的CompletableFuture,
//Bi lambda 的两个参数分别接受的两个参数 第一个接受 this , 第二个接受第一个参数返回
public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
//和thenAcceptBoth方法相同,但是传入参数为 Runnable
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
//同时等待两个 future 同时完成 ,并且对第二个返回的 future 进行处理.
public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn);
public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T, U> fn,Executor executor);
//同时等待两个 future ,并且对第二个 future 的返回值进行消费.
public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action);
public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor);
//和 acceptEitherAsync 相同,并且传入 Runnable
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action);
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
//thenCompose 接受一个 Function 并且返回一个新的 CompletableFuture 对象.
//所收集到的对象是没有嵌套的 CompletableFuture 对象. 类似 flatMap 的效果
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor);
//whenComplete 接受 future 中抛出的异常和返回
//在返回或者异常之后对返回值进行消费,或者返回正常结果
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action);
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor);
//handleAsync 接受一个 function<返回值,异常,结果> 并在返回之后对 future 的相应进行处理
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);
//exceptionally 会返回一个新的 CompletableFuture, 并且该方法会阻塞当前线程
//当原有方法发生了异常,exceptionally 的 function 会执行.
//当原有方法正常完成,则会返回一个相同的结果
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
处理多个CompletableFuture
对于多个相互独立的CompletableFuture
,我们提供了两个方法来处理它们
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs);
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs);
allOf方法
- CompletableFuture.allOf 静态方法,传入多个独立的 future ,并且返回一个新的CompletableFuture.
- 当所有的 future 完成时,新的 future 同时完成
- 不能传入空值
- 当某个方法出现了异常时,新 future 会在所有 future 完成的时候完成,并且包含一个异常.
anyOf方法
- CompletableFuture.anyOf 静态方法,传入多个独立的 future ,并且返回一个新的CompletableFuture
- 当任何一个方法完成时,新的 future 完成,并且返回该值
- 不能传入空值
- 当某个方法出现了异常时,新 future 会立刻完成,并且携带一个异常
completedFuture 方法
- completedFuture 方法是能够直接将值放入 future 对象,常用于测试
- CompletableFuture可以嵌套
public static <U> CompletableFuture<U> completedFuture(U value);
栗子
创建对象
@Test
public void base01() throws ExecutionException, InterruptedException {
/**
* CompletableFuture的静态方法supplyAsync(Supplier<U> supplier)返回一个带有
* ForkJoinPool 线程池的 CompletableFuture 对象.
* CompletableFuture 对象可以调用get方法,这样会使线程阻塞.
* Supplier<T> 接口是一个 () -> T 的函数
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println(future.get());
}
@Test
public void base02() throws ExecutionException, InterruptedException {
/**
* CompletableFuture的静态方法supplyAsync(Supplier<U> supplier, Executor executor)
* 返回一个带有自定义的线程池的CompletableFuture对象.
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt(), Executors.newCachedThreadPool());
System.out.println(future.get());
}
@Test
public void base03() throws ExecutionException, InterruptedException {
/**
* CompletableFuture的静态方法 runAsync(Runnable runnable)返回一个带有
* ForkJoinPool线程池的CompletableFuture对象.但是这个对象没有返回值
*/
CompletableFuture future = CompletableFuture.runAsync(getRandomIntRunnable());
Assert.assertEquals(null,future.get());
}
@Test
public void base04() throws ExecutionException, InterruptedException {
/**
* CompletableFuture的静态方法 runAsync(Runnable runnable, Executor executor)
* 返回一个带有自定义的线程池的CompletableFuture对象.但是这个对象没有返回值
*/
CompletableFuture future = CompletableFuture.runAsync(getRandomIntRunnable(), Executors.newCachedThreadPool());
Assert.assertEquals(null,future.get());
}
获取对象内容
@Test
public void base06() throws ExecutionException, InterruptedException, TimeoutException {
/**
* get(long,TimeUnit) 方法阻塞并且等待返回结果,TimeUnit表示时间长度单位,最小是纳秒.
* 当超时未返回信息的时候回抛出TimeoutException异常
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println(future.get(10, TimeUnit.MILLISECONDS));
}
@Test
public void base07() throws ExecutionException, InterruptedException {
/**
* join 方法与 get 方法无基本差别
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println(future.join());
}
@Test
public void base08() throws ExecutionException, InterruptedException {
/**
* getNow(obj) 方法立即返回结果,如果无结果就返回传入参数
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println(future.getNow("1bca"));
System.out.println(future.get());
}
完成对象
@Test
public void base09() throws ExecutionException, InterruptedException {
/**
* complete 方法将future状态置为已完成,并且将参数注入,但如果这个future已经完成了,则
* 不会产生任何变化
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
future.complete("123");
System.out.println(future.get());
future.complete("345");
System.out.println(future.get());
}
@Test
public void base10() throws ExecutionException, InterruptedException {
/**
* completeExceptionally 方法将future状态置为已完成,并且将异常参数注入,
* 并且在get的时候回获取这个异常
* 但如果这个future已经完成了,则不会产生任何变化
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
future.complete("123");
System.out.println(future.get());
future.complete("345");
System.out.println(future.get());
future.completeExceptionally(new Exception("abc"));
System.out.println(future.get());
future = CompletableFuture.supplyAsync(getRandomInt());
future.completeExceptionally(new Exception("abc"));
System.out.println(future.get());
}
检验方法
@Test
public void base20() throws ExecutionException, InterruptedException {
/**
* cancel 方法会取消 future 对象,并且置入一个 CancellationException
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
// future.get();
future.cancel(false);
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
future.get();
}
then 方法
@Test
public void base11() throws ExecutionException, InterruptedException {
/**
* thenApplyAsync 方法会在前一个future *已完成* 的情况下开始执行方法中的lambda函数.
* 并且返回一个新的future对象.
* 该方法的传入一个 T -> U 的Function<T,U>, 并且返回 <U> 的CompletableFuture
* 方法的参数表示是否需要传入一个线程池来开启新线程操作
*/
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(getRandomInt());
System.out.println(new Date().getTime());
CompletableFuture<Integer> newFuture = future.thenApplyAsync(x -> {
System.out.println(new Date().getTime());
return x * 100;
});
//获取两个future的结果
System.out.println(future.get());
System.out.println(newFuture.get());
}
@Test
public void base12() throws ExecutionException, InterruptedException {
/**
* thenAcceptAsync 传入一个 Consumer 函数来完成.
* 其它与 thenApplyAsync 相同
* thenRunAsync 方法与此方法相同,但是传入是一个 Runnable
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture future1 = future.thenAcceptAsync(num -> {
System.out.println(num);
});
future.get();
System.out.println(future1.get());
}
@Test
public void base13() throws ExecutionException, InterruptedException {
/**
* thenCombine 针对两个 future 进行操作.
* 需要传入另一个 future 以及 一个 BiFunction<T,U,V> ,返回一个新的 future.
*/
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(getRandomInt(3000l));
CompletableFuture c = a.thenCombine(b,(a1,b1) -> {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return a1+b1;
});
System.out.println("a:"+a.get());
System.out.println("b:"+b.get());
System.out.println("c:"+c.get());
}
@Test
public void base14() throws ExecutionException, InterruptedException {
/**
* thenAcceptBoth 同时等待两个 future ,并且返回一个空的CompletableFuture,
* Bi lambda 的两个参数分别接受的两个参数 第一个接受 this , 第二个接受第一个参数返回
* runAfterBothAsync 方法和此方法相同,但是传入参数为 Runnable
*/
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(getRandomInt(3000l));
a.thenAcceptBoth(b,(a1,b1) -> {
System.out.println(a1);
System.out.println(b1);
});
a.get();
b.get();
}
@Test
public void base15() throws ExecutionException, InterruptedException {
/**
* applyToEitherAsync 同时等待两个 future 同时完成 ,并且对第二个返回的 future 进行处理.
* acceptEitherAsync 同时等待两个 future ,并且对第二个 future 的返回值进行消费.
* runAfterEitherAsync 和 acceptEitherAsync 相同,并且传入 Runnable
*/
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(getRandomInt(3000l));
CompletableFuture c = a.applyToEitherAsync(b,b1 -> {
return b1*100;
});
a.get();
b.get();
System.out.println(c.get());
}
@Test
public void base16() throws ExecutionException, InterruptedException {
/**
* whenComplete 接受 future 中抛出的异常和返回, 方法中可以抛出 RuntimeException .不可以抛出 Exception.
* 在返回或者异常之后对返回值进行消费,或者返回正常结果
* handleAsync 接受一个 function<返回值,异常,结果> 并在返回之后对 future 的相应进行处理
*/
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
int a = new Random().nextInt(10);
if(a > 5){
throw new RuntimeException("too big");
}
return a;
});
future.whenComplete((res,ex) -> {
System.out.println("result:"+res+", ex:"+ex);
});
}
@Test
public void base17() throws ExecutionException, InterruptedException {
/**
* exceptionally 会返回一个新的 CompletableFuture , 该方法会阻塞,
* 当原有方法发生了异常,exceptionally 的 function 会执行.
* 当原有方法正常完成,则会返回一个相同的结果
*/
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
throw new RuntimeException("except");
});
CompletableFuture exfuture = future.exceptionally(ex -> {return new Exception(((RuntimeException) ex).getMessage());});
System.out.println(exfuture.get());
}
@Test
public void base21() throws ExecutionException, InterruptedException {
/**
* thenCompose 接受一个 Function 并且返回一个新的 CompletableFuture 对象.
* 所收集到的对象是没有嵌套的 CompletableFuture 对象. 类似 flatMap 的效果
*/
CompletableFuture future = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture newFuture = future.thenCompose(num -> {
return CompletableFuture.completedFuture(num);
});
System.out.println(newFuture.get());
}
处理多个CompletableFuture
allOf方法
@Test
public void base18() throws ExecutionException, InterruptedException {
/**
* CompletableFuture.allOf 静态方法,传入多个独立的 future ,并且返回一个新的CompletableFuture
* 当所有的 future 完成时,新的 future 同时完成,
* 不能传入空值
* 当某个方法出现了异常时,新 future 会在所有 future 完成的时候完成,并且包含一个异常.
*/
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(getRandomInt(3000l));
CompletableFuture ex = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("runtime");
});
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
CompletableFuture future = CompletableFuture.allOf(a,b,ex);
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
CompletableFuture exFuture = future.exceptionally( exp -> {
return ((RuntimeException) exp).getMessage();
});
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
System.out.println(exFuture.get());
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
}
anyOf方法
@Test
public void base19() throws ExecutionException, InterruptedException {
/**
* CompletableFuture.anyOf 静态方法,传入多个独立的 future ,并且返回一个新的CompletableFuture
* 当任何一个方法完成时,新的 future 完成,并且返回该值
* 不能传入空值
* 当某个方法出现了异常时,新 future 会立刻完成,并且携带一个异常
*/
CompletableFuture<Integer> a = CompletableFuture.supplyAsync(getRandomInt());
CompletableFuture<Integer> b = CompletableFuture.supplyAsync(getRandomInt(3000l));
CompletableFuture ex = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException("runtime");
});
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
CompletableFuture future = CompletableFuture.anyOf(a,b,ex);
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
CompletableFuture exFuture = future.exceptionally( exp -> {
return ((RuntimeException) exp).getMessage();
});
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
System.out.println("is a done?" +a.isDone()+". Is b done?"+b.isDone()+".");
System.out.println(exFuture.get());
System.out.println("done?"+future.isDone()+" cancel?"+future.isCancelled()+" exp?"+future.isCompletedExceptionally());
}
completedFuture 方法
@Test
public void base22() throws ExecutionException, InterruptedException {
/**
* completedFuture 方法是能够直接将值放入 future 对象,常用于测试
* CompletableFuture可以嵌套
*/
CompletableFuture future = CompletableFuture.completedFuture(1);
CompletableFuture future1 = CompletableFuture.completedFuture(future);
CompletableFuture future2 = CompletableFuture.completedFuture(future1);
System.out.println(future2.get());
}
这篇解析写到这里,下一次我会简单说明一下有关 java 8 中的几种 lambda
欢迎关注我的博客: 既然来了就坐坐吧
小站刚开始起步,欢迎您的驾到.