flink涉及的基础知识 - CompletableFuture

public class CompletableFuture<T> implements Future<T>, CompletionStage<T>

根据其实现的接口可知,它有两个特点:

  1. Future,表示异步执行的结果
  2. CompletionStage,表示整个异步计算的一个阶段,它能被其它的stage触发,也能触发其它的stage。CompletionStage的组合可以分为3种(假设有3个CompletionStage,分别为1、2、3):
  • then,关系为 2依赖于1、3依赖于2,则1、2、3顺序执行
  • both,关系为 3依赖于1、3依赖于2,则1、2都执行完后,3再执行
  • either,关系为 3依赖于1、3依赖于2,则1、2只要有一个执行完后,3就执行
    根据上面的依赖关系,所以CompletionStage的方法也分为3类,方法名中有combine、both的方法表示both关系,方法名中有either的方法表示either关系,其余的就是then关系
volatile Object result; 

表示异步执行的结果,当result为null,表示该阶段还未执行完成,当result为AltResultAltResultThrowable ex不为null,表示该阶段执行异常,其它则为执行成功

1.1 构造CompletableFuture

new CompletableFutur<T>()
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

下面的4个方法会被直接提交并执行,Supplier是有返回值的,Runnable没有返回值

1.2 获取执行结果

public T get() throws InterruptedException, ExecutionException
public T join()

阻塞式的获取任务结果,并不推荐这么做,因为会阻塞调用线程,从异步变成同步了,那跟Future没什么区别了,需要使用回调函数来处理

1.3 触发回调函数

public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)

还有其它的一些以complete开头的方法,最常用的就是上面2种,它会将result从null变为指定值,然后触发依赖任务的执行

1.4 设置回调函数

一个回调函数对应一个CompletableFuture

1.4.1 then关系

// 没有返回值
public CompletableFuture<Void> thenRun(
        Runnable action)
public CompletableFuture<Void> thenRunAsync(
        Runnable action) 
public CompletableFuture<Void> thenRunAsync(
        Runnable action, Executor executor) 
// 没有返回值
public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                 Executor executor);
// 有返回值
public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) 
public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) 
public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) 
// 有返回值
public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn, Executor executor)

这些方法都是实现的CompletionStage中定义的方法,他描述的是一种串行的关系

串行关系

  • 假设用户创建了CompletableFuture1,然后调用了2次thenApply方法,则上图描述了最后的依赖关系,即当CompletableFuture1完成后,则先触发函数2的执行,再触发则先触发函数1的执行,是一种栈的结构,此时获取最后结果要从CompletableFuture2中获取

1.4.2 both关系

除了串行的关系,CompletionStage中还定义了描述AND关系的方法

// 有返回值
public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) 

// 无返回值
public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action)
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) 
public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor)

// 无返回值
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) 
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action) 
public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action, Executor executor) 

上面的方法都表示当调用者futureother执行完成后,才会执行传入的函数

1.4.3 either关系

CompletionStage中还定义了描述OR关系的方法

// 有返回值
public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn)
public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor)

// 无返回值
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action)
public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) 
public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action,
        Executor executor) 
// 无返回值
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action)
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) 
public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action, Executor executor) 

上面的方法都表示当调用者futureother任意一个执行完成后,才会执行传入的函数

1.5 异常处理

下面的方法用来做异常处理,如果把回调函数比作try中的代码块,那么这个方法就是catch中的代码块

public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }

1.6 finally功能

下面的方法可以用来做收尾工作,相当于finally中的代码块,一定会被执行;当被依赖的任务报错,后续依赖任务的函数不会执行(除exceptionally),只是将对应的CompletableFuture的result被设为AltResult(被依赖任务的异常)

public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) 
public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) 
public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action, Executor executor)
public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)

1.7 执行线程

  • 所有方法名中带有Async的方法,如果不指定executor,则使用默认线程池ForkJoinPool#commonPool(),线程个数为核数(可通过-Djava.util.concurrent.ForkJoinPool.common.parallelism指定),但是如果线程个数小于2,则不使用,而是每一个函数或任务都会创建一个线程
  • 如果使用不带Async的方法,则会在设置了值(比如调用了complete、completeExceptionally)的线程中执行任务
  • 建议,对于执行时间可能较长的函数或任务,比如远程的RPC,尽量使用async + 指定Executor的方式,因为设置了Executor,则提交的任务在指定的线程池中运行,这样做的好处
    • 对于使用默认线程池,避免默认的线程池中有慢任务执行使线程耗尽,导致新提交的任务积压而无法快速执行
    • 对于使用不带Async的方法,避免影响调用线程中的代码执行
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容