CompletableFuture入门

CompletableFuture介绍

Future 接口可以构建异步应用,但依然有其局限性。它很难直接表述多个Future 结果之间的依赖性。实际开发中,我们经常需要达成以下目的:

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 仅等待 Future 集合中最快结束的任务完成(有可能因为它们试图通过不同的方式计算同一个值),并返回它的结果。
  • 等待 Future 集合中的所有任务都完成。
  • 当 Future 的完成事件发生时会收到通知,并能使用 Future 计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果。

等等之类的,新的CompletableFuture将使得这些成为可能。

CompletableFuture实现了Future<T>, CompletionStage<T>两个接口。所以还是可以像以前一样通过阻塞或轮询的方式获得结果,尽管这种方式不推荐使用。

虽说 CompletableFuture 实现了 Future 接口,但它多数方法源自于 CompletionStage。CompletionStage是一个接口,从命名上看得知是一个完成的阶段,它里面的方法也标明是在某个运行阶段得到了结果之后要做的事情。

如下我们使用new的方式创建一个CompletableFuture(这种方式不常用来创建一个CompletableFuture,此处仅为了说明情况),用阻塞的方式得到了结果:

public class CompletableFutureInAction {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture<Double> completableFuture = new CompletableFuture();
        new Thread(()->{
            double value = get();
            completableFuture.complete(value);
        }).start();

        System.out.print("===========no==block");
        Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println);
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

运行之后会立刻输出:

===========no==block

过了n秒就会输出一个随机数:

0.39273128973

如果我们想实现方法完成时会收到通知,而不是一直阻塞在结果的获取上面,可以这样写:

public class CompletableFutureInAction {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture<Double> completableFuture = new CompletableFuture();
        new Thread(()->{
            double value = get();
            completableFuture.complete(value);
        }).start();

        System.out.print("===========no==block");
        completableFuture.whenComplete((v,t)->{
            Optional.ofNullable(v).ifPresent(System.out::println);
            Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
        });
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

CompletableFuture的基本使用

【创建CompletableFuture对象】

CompletableFuture.completedFuture是一个静态辅助方法,用来返回一个已经计算好的CompletableFuture。

public static <U> CompletableFuture<U> completedFuture(U value)

而以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

以Async结尾并且没有指定Executor的方法会使用ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

  • runAsync方法:它以Runnabel函数式接口类型为参数,所以CompletableFuture的计算结果为空。
  • supplyAsync方法以Supplier<U>函数式接口类型为参数,CompletableFuture的计算结果类型为U。

这四个方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务。

【计算结果完成时的处理】

当CompletableFuture的计算结果完成,或者抛出异常的时候,可以执行特定的Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

可以看到Action的类型是BiConsumer<? super T,? super Throwable>它可以处理正常的计算结果,或者异常情况。

方法不以Async结尾,意味着Action使用相同的线程执行,而如果以Async结尾且没有指定Executor,则从ForkJoinPool.commonPool()中获取线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)。

如果你用过Future,就会知道糟糕的时候有多糟糕。幸运的是,CompletableFuture有一个漂亮的对应手段,通过使用exceptionally。exceptionally给我们一个机会恢复,通过执行当异步执行的计算抛出exception时备选的方法(alternative method)。

代码示例:

public class CompletableFutureInAction2 {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get)
                         .whenComplete((v,t)->{
                             Optional.ofNullable(v).ifPresent(System.out::println);
                             Optional.ofNullable(t).ifPresent(x->x.printStackTrace());
                         });
        System.out.print("===========no==block");
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

输出结果:

===========no==block

为什么没有结果输出呢?因为whenComplete里面的线程是守护线程,所有的“非后台线程”结束时,程序也就终止了,同时会杀死进程中所有后台线程:main就是一个非后台线程。所以运行之后会直接输出===========no==block,但方法没有执行完那当然没有任何结果输出。如果我们想得到结果可以在 System.out.print("===========no==block");后面加上一句:

Thread.currentThread().join();

这样main线程会等待CompletableFuture的线程结束之后才能继续运行。

对于线程池中的所有线程默认都为非后台线程,这样主线程退出时不会直接退出JVM,而是等待线程池结束。所以线程池的生命周期要比创建它们的程序生命周期要长,我们必须使用shutdown()方法手动退出。我们知道Executors可以用来创建一个线程池,如果我们想让Executors创建的线程池中的线程自动结束,可以使用如下方法:

 public static void main(String[] args) {
        // ExecutorService 默认不是守护线程 默认false setDaemon(false)
        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            // 对线程池里面的线程进行初始化设置
            Thread t = new Thread();
            t.setDaemon(true);  // 设置线程为守护线程
            return t;
        });
        
        executor.execute(()->System.out.print("test"));

        // 不用执行shutdown,线程池中的线程随着main()方法执行完也随之退出
    }

执行supplyAsync的线程是守护线程,所以main()函数执行完了这个线程也随之被杀死。而如果我们把线程池中的所有线程默认都转换为非后台线程,然后用这个线程池中的县城去执行supplyAsync,我们就不用依赖join(),这样主线程退出时不会直接退出JVM,我们就可以等到方法输出结果。

public class CompletableFutureInAction2 {
    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args)
            throws ExecutionException, InterruptedException {
        AtomicBoolean finished = new AtomicBoolean(false);

        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            Thread t = new Thread();
            t.setDaemon(false);
            return t;
        });
        
        // CompletableFuture.supplyAsync(executor)  whenComplete
        // 上面两步都是 [pool-1-thread-1]执行 即两个步骤是同一个线程执行
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executor)
                .whenComplete((v,t)->{
                    Optional.ofNullable(v).ifPresent(System.out::println);
                    finished.set(true);
                });
        System.out.print("===========no==block");

       /* while(!finished.get()){
            Thread.sleep(1);
        }*/
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}

做好了上面的准备,下面我们来看一下方法使用Async结尾和不使用有什么区别:

public class CompletableFutureInAction2 {
    private static Random RAMDOM = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        ExecutorService executorPool = Executors.newFixedThreadPool(2, run -> {
            Thread t = new Thread(run);
            t.setDaemon(false);
            return t;
        });

        //###1.0 CompletableFuture.supplyAsync(executorPool)  thenApply  whenComplete
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApply(t->{
                    System.out.println("##1.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                })
                .whenComplete((v,t)->{
                    System.out.println("##1.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                    System.out.println(v);
                    t.printStackTrace();
                });
        //### 上面三步都是 [pool-1-thread-1]执行 即三个步骤是同一个线程执行

        //###2.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApplyAsync(t->{
                    System.out.println("##2.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                }).
                whenCompleteAsync((v,t)->{
                   System.out.println("##2.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                   System.out.println(v);
                   t.printStackTrace();
                });
        //### 上面三步supplyAsync=[pool-1-thread-2]  后面两个都是[ForkJoinPool.commonPool-worker-1]
        //### 原因由于调用这个时thenApplyAsync,没有指定Executor executor,然后又是因为异步,默认采用ForkJoin的连接池
        //### 由于工作量不大,ForkJoinPool并没有分配两个线程,ForkJoinPool.commonPool-worker-1承担了两份工作

        //###3.0 CompletableFuture.supplyAsync(executorPool)  thenApplyAsync  whenCompleteAsync(executorpool)
        CompletableFuture.supplyAsync(CompletableFutureInAction2::get,executorPool)
                .thenApply(t->{
                    System.out.println("##3.0## [thenApplyThreadName]=["+Thread.currentThread().getName()+"]");
                    return t*10;
                }).
                whenCompleteAsync((v,t)->{
                    System.out.println("##3.0## [whenCompleteThreadName]=["+Thread.currentThread().getName()+"]");
                    System.out.println(v);
                    t.printStackTrace();},executorPool);
        //### 上面三步supplyAsync和thenApply 都是[pool-1-thread-3]  whenCompleteAsync[pool-1-thread-4]
        //### 原因由于调用这个时thenApply和前者同一线程  whenCompleteAsync指定了线程
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,185评论 6 503
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,652评论 3 393
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,524评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,339评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,387评论 6 391
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,287评论 1 301
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,130评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,985评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,420评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,617评论 3 334
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,779评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,477评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,088评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,716评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,857评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,876评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,700评论 2 354

推荐阅读更多精彩内容