Java:CompletableFuture一文搞定

Prefix

CompletableFuture本身是为了解决任务的执行和衔接。在后面我们可以看到,CompletableFuture的功能就是分派任务到线程,并以近乎于pipeline的概念将各任务进行编排,下游任务可以获取上游任务的结果。从方法名上我们可见pipeline

complete -> whenComplete -> then* 

后面我们会从例子探索pipeline的概念。

注意以Async结尾的方法:

不以Async结尾的方法由原来的线程计算;
以Async结尾的方法由默认的线程池ForkJoinPool.commonPool()或者指定的线程池executor运行。

About CompletableFuture

同步式
尽管Future可以代表在另外的线程中执行的一段异步代码,但是你还是可以在本身线程(例如main)中执行:

CompletableFuture<Integer> future = new CompletableFuture<>();

上面的代码中future没有关联任何的Callback、线程池、异步任务等,如果客户端调用future.get就会一直傻等下去。

future.get(); 
//一直等下去。源码可以看到get调用了waitingGet,是循环获取结果。
//因为没有调用complete这种方法生成结果,它就要一直循环

你可以通过下面的代码完成一个计算,触发客户端的等待:

future.complete(100);

CompletableFuture.complete()只能调用一次,后续调用将被忽略。但也有一个后门叫做CompletableFuture.obtrudeValue(…)覆盖一个新Future之前的价值,请小心使用。
Java 8:CompletableFuture终极指南

异步式
以下四个静态方法用来为一段异步执行的代码创建CompletableFuture对象:

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

当运行完成时的回调操作

  • 当运行时出现了异常,可以通过exceptionally进行补偿(消化)
  • whenComplete,当计算完成,或者抛出异常的时候,可以对其结果或异常进行消费
  • handle,当计算完成,或者抛出异常的时候,可以对其结果或异常进行转换

handle使用场景:
exceptionally消化任务抛出的异常后,我们会有机会将此异常转换为和Future类型的兼容的一些值来进行恢复。safe进一步的转换将不再产生一个异常而是从提供功能的函数返回一个String值。
Java 8:CompletableFuture终极指南

这里有个关于任务的概念需要了解:

whenCompletehandleexceptionally里面的运算代码算不算任务?可能在某些业务语境中是的,但至少在CompletableFuture的语义中我觉得不算。它们仅仅是任务的附属,而不能独立成为任务。从下面的例子就能反映:

    @Test
    public void testNoExceptionally() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.completeExceptionally(new Exception("测试抛异常"));
        future.whenComplete((s, t) -> {
            // 1
            log.info("1:{}", s);
            if(t != null)
                log.error(t.getMessage());})
        .whenComplete((s, t) -> {
            // 2
            log.info("2:{}", s);
            if(t != null)
                log.error(t.getMessage());})
        .thenApply(s -> {  // task2
            // 3
            log.info("3:{}", s);
            return s;})
        .exceptionally(e -> {
            // 4       异常中断了task2,在这里被消化
            log.error("4:{}", e.getMessage());
            return e.getMessage();
        })
        .join();
    }

task1抛出异常,2个whenCompleteexceptionally都执行了,但是task2并没有被执行。说明了2个问题:

  • whenComplete实际上是归于task1
  • 只有exceptionally可以进行任务中异常的消化,否则异常会传递下去,中断后面的任务被执行,直到有exceptionally消化异常

任务之间的承接

注意:下游任务并不承接上游任务的异常。如果上游任务抛出异常而没有exceptionally消化,则下游任务不会被执行,所以这里的方法并没有像上面whenCompletehandle有异常参数:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)

A->B

  • thenApply,针对上一步(A)结果进行转换(B)。
  • thenAccept,针对上一步结果进行消费
  • thenRun,对上一步的计算结果不关心

A,B -> C

  • thenCombine,结合上一步(A)结果和本次(B)的结果,然后进行转换
  • thenAcceptBoth,结合上一步结果和本次的结果,然后进行消耗。消耗计算是在本次线程(本次线程就是上文提到的原来的线程)上执行;而Async是在线程池上执行。下面以此类推
  • runAfterBoth,不关心上一步和本次的结果,只关心它们运算完毕,然后进行下步操作
  • acceptEither,上一步和本次哪个快就用哪个的结果,然后消费

辅助方法 allOf 和 anyOf

  • allOf方法是当所有的CompletableFuture都执行完后执行计算
  • anyOf方法是当任意一个CompletableFuture执行完后就会执行计算,计算的结果相同。

探索一下pipeline的概念

下面的代码中,我先后执行2个任务:task1task2

task1中抛出异常
1.task1如果抛异常,则打印异常信息;
2.等task1完成后,task后处理打印task1的输出字符串或异常信息;
3.task后处理如果抛异常,则打印异常信息;
4.task2task后处理完成后,打印task1的输出字符串

    @Test
    public void testPipeline() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.completeExceptionally(new Exception("测试抛异常"));
        future.exceptionally(e -> {
            // 1
            log.error("1:{}", e.getMessage());
            return e.getMessage();
        }).whenComplete((s, t) -> { 
            // 2
            log.info("2:{}", s);
            if(t != null)   // 注意,t如果为null,会导致抛异常
                log.error(t.getMessage());
        }).exceptionally(e -> {
            // 3    1将异常消化了,所以不会触发3
            log.error("3:{}", e.getMessage());
            return e.getMessage();
        }).thenAccept(s -> {  // task2
            // 4
            log.info("4:{}", s+" world");})
         .join();     //main线程等任务执行完再结束
    }

结果是1、2、4 会被触发。
下面我们做个修改:task1不抛出异常。

    @Test
    public void testPipeline2() throws Exception {
        CompletableFuture<String> future = new CompletableFuture();
        // task1
        future.complete("正常");
        future.exceptionally(e -> {
            // 1
            log.error("1:{}", e.getMessage());
            return e.getMessage();
        }).whenComplete((s, t) -> {   
            // 2
            log.info("2:{}", s);
            if(t != null) 
                log.error(t.getMessage());
        }).exceptionally(e -> {
            // 3
            log.error("3:{}", e.getMessage());
            return e.getMessage();
        }).thenAccept(s -> {  // task2
            // 4
            log.info("4:{}", s+" world");})
                .join(); 
    }

那么只有2、4被触发。

About thread

allOf

Q:allOfthenApply中的任务哪个线程执行?

可以清楚的是,thenApply中的任务是在allOf返回的CompletableFuture的线程上执行。那么allOf返回的CompletableFuture的线程是什么?allOf返回的CompletableFuture的线程一般是allOf中最后一个完成的任务的线程。

不过凡事总有例外。实际上无论是new CompletableFuture()还是CompletableFuture.supplyAsync都是立刻执行任务。如果main执行到allOf时,像下面的例子里allOf内的任务都已经结束了,即使future1是异步的,thenApply仍然是同步的(此处为main线程)。
比如:

    @Test
    public void testCmopletableFuture() throws Exception {

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(this::getThreadName);
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(this::getThreadName);

        sleep(3000);
        CompletableFuture.allOf(future1, future2)  //  执行到allof,future1和 future2已经结束
                        .thenApply(t ->{
                            log.info(Thread.currentThread().getName());
                            return getThreadName(future1, future2);})
                        .exceptionally(e->{
                            log.info(Thread.currentThread().getName());
                            return Lists.newArrayList();})
                        .join();
    }
    private String getThreadName(){
        sleep(1000);
        String res = Thread.currentThread().getName();
        return res;
    }

    private List<String> getThreadName(CompletableFuture<String> future1, CompletableFuture<String> future2){
        List<String> res = Lists.newLinkedList();
        try{
            res.add(future1.get());
            res.add(future2.get());
        }catch(Exception e){

        }
        log.info("{}", res);
        return res;
    }

    private void sleep(long millis){
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

Ref

简明扼要:CompletableFuture 详解 - 简书
全面:Java CompletableFuture 详解 | 鸟窝

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354