Future模式之CompletableFuture

CompletableFuture 是Java 8 新增加的Api,该类实现,Future和CompletionStage两个接口,提供了非常强大的Future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法。

一、主动完成计算

  • public T get()

    该方法为阻塞方法,会等待计算结果完成

  • public T get(long timeout,TimeUnit unit)

    有时间限制的阻塞方法

  • public T getNow(T valueIfAbsent)

    立即获取方法结果,如果没有计算结束则返回传的值

  • public T join()

    和 get() 方法类似也是主动阻塞线程,等待计算结果。和get() 方法有细微的差别

public class ThreadUtil {
    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }
}

public static void main(String[] args) {

     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.sleep(200);
            return 10 / 0;
    });
//       System.out.println(future.join());
//       System.out.println(future.get());
    System.out.println(future.getNow(10));
}
  • public boolean complete(T value)

    立即完成计算,并把结果设置为传的值,返回是否设置成功

    如果 CompletableFuture 没有关联任何的Callback、异步任务等,如果调用get方法,那会一直阻塞下去,可以使用complete方法主动完成计算

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = new CompletableFuture<>();
//        future.get();
    future.complete(10);
}
  • public boolean completeExceptionally(Throwable ex)
    立即完成计算,并抛出异常

二、执行异步任务

创建一个异步任务

  • public static <U> CompletableFuture<U> completedFuture(U value)

    创建一个有初始值的CompletableFuture

  • public static CompletableFuture<Void> runAsync(Runnable runnable)

  • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    以上四个方法中,以 Async 结尾并且没有 Executor 参数的,会默认使用 ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
    以run开头的,因为以 Runable 类型为参数所以没有返回值。示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
        
        System.out.println(future1.get());
        System.out.println(future2.get());
    }

结果:

runAsync
null
supplyAsync

三、计算完成时对结果的处理 whenComplete/exceptionally/handle

当CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action。主要是下面的方法:

  • public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

参数类型为 BiConsumer<? super T, ? super Throwable> 会获取上一步计算的计算结果和异常信息。以Async结尾的方法可能会使用其它的线程去执行,如果使用相同的线程池,也可能会被同一个线程选中执行,以下皆相同。

   public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        ThreadUtil.sleep(100);
        return 20;
    }).whenCompleteAsync((v, e) -> {
        System.out.println(v);
        System.out.println(e);
    });
    System.out.println(future.get());
}
  • public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

    该方法是对异常情况的处理,当函数异常时应该的返回值

    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.sleep(100);
            return 10 / 0;
        }).whenCompleteAsync((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 30;
        });
        System.out.println(future.get());
    }

  • public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
  • public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
  • public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

handle 方法和whenComplete方法类似,只不过接收的是一个 BiFunction<? super T,Throwable,? extends U> fn 类型的参数,因此有 whenComplete 方法和 转换的功能 (thenApply)

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 10 / 0)
                .handle((t, e) -> {
                    System.out.println(e.getMessage());
                    return 10;
                });

        System.out.println(future.get());
    }

四、结果处理转换 thenApply

CompletableFuture 由于有回调,可以不必等待一个计算完成而阻塞着调用县城,可以在一个结果计算完成之后紧接着执行某个Action。我们可以将这些操作串联起来。

  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
 public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenApply((a) -> {
                    System.out.println(a);//1
                    return a * 10;
                }).thenApply((a) -> {
                    System.out.println(a);//10
                    return a + 10;
                }).thenApply((a) -> {
                    System.out.println(a);//20
                    return a - 5;
                });
        System.out.println(future.get());//15
    }

这些方法不是马上执行的,也不会阻塞,而是前一个执行完成后继续执行下一个。

和 handle 方法的区别是,handle 会处理正常计算值和异常,不会抛出异常。而 thenApply 只会处理正常计算值,有异常则抛出。

五、纯消费 thenAccept/thenAcceptBoth/thenRun

单纯的去消费结果而不会返回新的值,因些计算结果为 Void;

  • public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenAccept(System.out::println) //消费 上一级返回值 1
                .thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
                
        System.out.println(future.get()); //消费函数没有返回值 输出null
    }

  • public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
  • public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
  • public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)

和 thenAccept 相比,参数类型多了一个 CompletionStage<? extends U> other,以上方法会接收上一个CompletionStage返回值,和当前的一个。

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture
                .supplyAsync(() -> 1)
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
                    System.out.println(a);
                    System.out.println(b);
                }).get();
    }
  • public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

    runAfterBoth 和以上方法不同,传一个 Runnable 类型的参数,不接收上一级的返回值


更彻底的:

  • public CompletableFuture<Void> thenRun(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

以上是彻底的纯消费,完全忽略计算结果

六、组合 thenCompose/thenCombine

  • public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

以上接收类型为 Function<? super T,? extends CompletionStage<U>> fn ,fn 接收上一级返回的结果,并返回一个新的 CompletableFuture

   public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenApply((a) -> {
                    ThreadUtil.sleep(1000);
                    return a + 10;
                })
                .thenCompose((s) -> {
                    System.out.println(s); //11
                    return CompletableFuture.supplyAsync(() -> s * 5);
                });

        System.out.println(future.get());//55
    }

  • public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

两个CompletionStage是并行执行的,它们之间并没有先后依赖顺序,other并不会等待先前的CompletableFuture执行完毕后再执行。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("supplyAsync");
                    return 2;
                }).thenApply((a) -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("thenApply");
                    return a * 3;
                })
                .thenCombine(CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("thenCombineAsync");
                    return 10;
                }), (a, b) -> {
                    System.out.println(a);
                    System.out.println(b);
                    return a + b;
                });

        System.out.println(future.get());
    }

thenCombine 和 supplyAsync 不一定哪个先哪个后,是并行执行的。

七、acceptEither / applyToEither

  • public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
  • public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
  • public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

先看示例:

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Random random = new Random();
        CompletableFuture
                .supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return "A";
                })
                .acceptEither(CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return "B";
                }), System.out::println)
                .get();
    }

以上代码有时输出A,有时输出B,哪个Future先执行完就会根据它的结果计算。

acceptEither方法是当任意一个 CompletionStage 完成的时候,action 这个消费者就会被执行。这个方法返回 CompletableFuture<Void>


  • public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

applyToEither 方法是当任意一个 CompletionStage 完成的时候,fn会被执行,它的返回值会当作新的CompletableFuture<U>的计算结果。

acceptEither 没有返回值,applyToEither 有返回值

八、allOf / anyOf

  • public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

这个方法的意思是把有方法都执行完才往下执行,没有返回值

   public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println(1);
                }),
                CompletableFuture.runAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println(2);
                }))
                .get();

    }

有时输出1 2 有时输出 2 1


  • public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

任务一个方法执行完都往下执行,返回一个Object类型的值

  public static void main(String[] args) throws ExecutionException, InterruptedException {
        Random random = new Random();

        Object obj = CompletableFuture.anyOf(
                CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return 1;
                }),
                CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return 2;
                })).get();

        System.out.println(obj);
    }

输出结果有时为1 有时间为 2

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》 所谓异步调用其实就是实现一个可无需等待被调用...
    专职跑龙套阅读 21,351评论 0 26
  • 本笔记来自 计算机程序的思维逻辑 系列文章 Lambda表达式 Lambda表达式 语法 匿名函数,由 -> 分隔...
    码匠阅读 470评论 0 6
  • Java 8 有大量的新特性和增强如 Lambda 表达式,Streams,CompletableFuture等。...
    YDDMAX_Y阅读 4,750评论 0 15
  • 在现代软件开发中,系统功能越来越复杂,管理复杂度的方法就是分而治之,系统的很多功能可能会被切分为小的服务,对外提供...
    天堂鸟6阅读 7,147评论 0 23
  • 201组别 301期 利他一组 【日精进打卡第210天 【知~学习】 诵读《六项精进大纲》3遍,累计272遍;诵...
    J0hn先生阅读 151评论 0 0