Future系列(CompletableFuture的使用)

1.创建CompletableFuture对象

CompletableFuture提供了四个静态方法用来创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

Asynsc表示异步,而supplyAsync与runAsync不同在与前者异步返回一个结果,后者是void.第二个函数第二个参数表示是用我们自己创建的线程池,否则采用默认的ForkJoinPool.commonPool()作为它的线程池

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return Thread.currentThread().getName();
                });
        System.out.println(future.get());//输出ForkJoinPool.commonPool-worker-1

2.计算结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

getNow有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueIfAbsent值。
join()与get()区别在于join()返回计算的结果或者抛出一个unchecked异常(CompletionException),而get()返回一个具体的异常.

3.主动触发计算

public boolean complete(T  value)
public boolean completeExceptionally(Throwable ex)

上面方法表示当调用CompletableFuture.get()被阻塞的时候,那么这个方法就是结束阻塞,并且get()获取设置的value.

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return Thread.currentThread().getName();
                });
        
        new Thread(new Runnable() {
            
            @Override
            public void run() {
                try {
                    Thread.sleep(1500);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                future.complete("hello");
            }
        }).start();
        
        System.out.println(future.get());

上面的例子,如果sleep小于1000,输出hello,大于1000ms,则输出ForkJoinPool.commonPool-worker-1

4.值转化

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

可以通过Function转化数据类型。直接上示例代码

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("supplyAsync" + Thread.currentThread().getName());
                    return "step1";
                }).thenApply(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        System.out.println("thenApply" + Thread.currentThread().getName());
                        return "step2";
                    }
                    
                }).thenApplyAsync(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        System.out.println("thenApplyAsync" + Thread.currentThread().getName());
                        return "step3";
                    }
                    
                });
        System.out.println(future.get());

输出

supplyAsyncForkJoinPool.commonPool-worker-1
thenApplymain
thenApplyAsyncForkJoinPool.commonPool-worker-1
step3

thenApply会自动从默认的子线程中切换成主线程

5.消费处理结果

public CompletionStage<Void> thenAccept(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);

注意没有返回值,一般作为最后一步来做。示例代码

        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("supplyAsync" + Thread.currentThread().getName());
                    return "step1";
                }).thenApply(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        System.out.println("thenApply" + Thread.currentThread().getName());
                        return "step2";
                    }
                    
                }).thenAccept(new Consumer<String>() {
                    @Override
                    public void accept(String t) {
                        System.out.println("accept" + Thread.currentThread().getName());
                    }
                });
        System.out.println(future.get());

输出

supplyAsyncForkJoinPool.commonPool-worker-1
thenApplymain
acceptmain
null

如果换成 thenAcceptAsync,那么就会打印 ForkJoinPool.commonPool-worker-1

6.thenRun 方法

public CompletionStage<Void> thenRun(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action);
public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);

它只是接着上面继续往下运行

        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("supplyAsync" + Thread.currentThread().getName());
                    return "step1";
                }).thenApply(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        System.out.println("thenApply" + Thread.currentThread().getName());
                        return "step2";
                    }
                    
                }).thenRunAsync(new Runnable() {
                    
                    @Override
                    public void run() {
                        System.out.println("thenRunAsync" + Thread.currentThread().getName());
                    }
                });
        System.out.println(future.get());

输出

supplyAsyncForkJoinPool.commonPool-worker-1
thenApplymain
thenRunAsyncForkJoinPool.commonPool-worker-1
null

如果改成thenRun,就是main

7.合并任务系列

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn);
public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);

public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action);
public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action,     Executor executor);

当两个CompletionStage都执行完成后,把结果一块处理。thenAcceptBoth 系列没有返回值。还有后面的 runAfterBoth 系列的也是同样的功能

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    System.out.println("supplyAsync" + Thread.currentThread().getName());
                    return "step1";
                }).thenApply(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        System.out.println("thenApply" + Thread.currentThread().getName());
                        return "step2";
                    }
                    
                });
        
        CompletableFuture<String> future2 = CompletableFuture
                .supplyAsync(() -> {
                    return "future2-step1";
                }).thenApplyAsync(new Function<String, String>() {
                    @Override
                    public String apply(String t) {
                        return "future2-step2";
                    }
                    
                });
        
        future.thenCombine(future2, new BiFunction<String, String, String>() {

            @Override
            public String apply(String t, String u) {
                System.out.println("thenCombine" + Thread.currentThread().getName());
                return "thenCombine";
            }
            
        });
        
        System.out.println(future.get());

输出

supplyAsyncForkJoinPool.commonPool-worker-1
thenApplymain
thenCombinemain
step2

8.either系列

两个CompletionStage,谁执行返回的结果快,我就用那个CompletionStage的结果进行下一步的转化操作。

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn);
public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);

例子我就不写了

9.thenCompose

thenCompose 方法允许你对两个 CompletionStage 进行流水线操作,第一个操作完成时,将其结果作为参数传递给第二个操作。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

直接上代码

        CompletableFuture<String> future = CompletableFuture.supplyAsync(
                () -> {
                    System.out.println("supplyAsync" + Thread.currentThread().getName());
                    return "step1";
                }).thenCompose(new Function<String, CompletionStage<String>>() {

            @Override
            public CompletionStage<String> apply(String t) {

                return CompletableFuture.supplyAsync(() -> {
                    return "future2-step1";
                });
            }
        });

        System.out.println(future.get());

输出

supplyAsyncForkJoinPool.commonPool-worker-1
future2-step1

感觉和前面的没啥区别呀

10.whenComplete

计算结果完成时的处理

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

直接上代码

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    double a = 1/0;
                    return "step1";
                }).whenComplete(new BiConsumer<String, Throwable>() {

                    @Override
                    public void accept(String t, Throwable throwable) {
                        System.out.println("t = " + t + ", throwable = " + throwable);
                    }
                    
                });

        System.out.println(future.get());

输出

t = null, throwable = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
    at java.util.concurrent.CompletableFuture.reportGet(Unknown Source)
    at java.util.concurrent.CompletableFuture.get(Unknown Source)
    at completableFuture.Main.main(Main.java:28)
Caused by: java.lang.ArithmeticException: / by zero
    at completableFuture.Main.lambda$0(Main.java:17)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.util.concurrent.CompletableFuture$AsyncSupply.exec(Unknown Source)
    at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
    at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
    at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

可以看到其实就算accept,整个程序依然会崩溃

11.handle

功能和上面一点一样,计算结果完成时的处理

        CompletableFuture<String> future = CompletableFuture
                .supplyAsync(() -> {
                    double a = 1/0;
                    return "step1";
                }).handle(new BiFunction<String, Throwable, String>() {

                    @Override
                    public String apply(String t, Throwable throwable) {
                        System.out.println("t = " + t + ", throwable = " + throwable);
                        return "handle return";
                    }
                });

        System.out.println(future.get());

输出

t = null, throwable = java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
handle return

明显看到程序并没有崩溃,也就是说handle里面把崩溃捕捉了

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,033评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,725评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,473评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,846评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,848评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,691评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,053评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,700评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,856评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,676评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,787评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,430评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,034评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,990评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,218评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,174评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,526评论 2 343

推荐阅读更多精彩内容