CompletableFuture流水线工作

CompletableFuture流水线工作

将CompletableFuture与其他CompletableFuture组合,以及与其他函数组合,能够为多项任务构建类似流水线的方案,这样能够控制同步和异步执行以及它们之间的依赖。

现在我们来实现下面需求来说明CompletableFuture流水线式的处理:一个shop里面有ID:1-10的production,现在想获取这些production的价格再乘以10倍输出。

用多线程去做的话就需要10个线程去运行任务,还要等待所有结果完成返回到一个list中去。

public class CompletableFutureInAction3 {

    private static Random RAMDOM = new Random(System.currentTimeMillis());

    /**
     * 这是一个高并发的执行过程
     */
    public static void main(String[] args) {

        ExecutorService executor = Executors.newFixedThreadPool(2,r->{
            Thread t = new Thread();
            t.setDaemon(false);
            return t;
        });

        List<Integer> productionIDs = Arrays.asList(1,2,3,4,5,6,7,8,9,10);

        // 开启一个CompletableFuture去进行商品价格查询
        Stream<CompletableFuture<Double>> cfStream = productionIDs.stream().map(i -> CompletableFuture.supplyAsync(() -> queryProduction(i), executor));
        // 实现价格翻倍
        Stream<CompletableFuture<Double>> multiplyFuture = cfStream.map(future -> future.thenApply(CompletableFutureInAction3::multiply));
        // 整合结果 join()
        List<Double> result = multiplyFuture.map(CompletableFuture::join).collect(Collectors.toList());
        System.out.print(result);
    }

    private static double get(){
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return RAMDOM.nextDouble();
    }

    private static double multiply(double value){
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return value*10;
    }

    // 参数i没有用,只不过模拟ID去查询商品
    private static double queryProduction(int i){
        return get();
    }
}

我们实现了多个任务去执行最终汇总到一个结果中去,而且我们不用考虑多线程的交互和控制。

CompletableFuture常用流水线工作API

1. 运行完成的代码

  • thenAccept(Consumer<? super T> block):接收上一阶段的输出作为本阶段的输入,无返回值。
  • thenRun(Runnable action):如果处理完成还想要做后续的操作可以使用thenRun。thenRun不关心前一阶段的输出和计算结果,因为它不需要输入参数,也无返回值。
  • runAfterBoth(CompletionStage<?> other,Runnable action):工作方式与thenRun()类似,但是是等待两个Future而不是一个。
// thenAccept
CompletableFuture.supplyAsync(()->1)
                .thenAccept(System.out::println);  // 输出1

StringBuilder result = new StringBuilder();
CompletableFuture.supplyAsync(()->"aaa")
                .thenAccept(s->result.append(s));  // 无任何输出

// thenRun
CompletableFuture.supplyAsync(()->1)
                .thenRun(System.out::println);  // 无任何输出,因为thenRun不接受参数

 /**
   * 这里的两个Stage都是同步运行的,两个运行结束才会输出done
   * 
   * 输出结果:
   * ForkJoinPool-commomPool-worker1 is running
   * ForkJoinPool-commomPool-worker1 is running
   * done
 */
CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" is running");
            return 1;
        }).runAfterBoth(CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getName()+" is running");
            return 2;
        }),()->System.out.println("done"));

2. 转换和作用于CompletableFuture

  • thenApply(Function<? super T,? extends U> fn):如果想持续的在多个回调之间传递值,可以用 thenApply 。 thenApply() 的参数 Function,接受一个值,并且返回一个值。其功能相当于将 CompletableFuture<T> 转换成CompletableFuture<U>。
  • handle(BiFunction<? super T,Throwable,? extends U> fn):handle()与thenApply()的区别在于handle()方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出,而thenApply()只能处理正常值,因此一旦有异常就会抛出。
  • thenCompose (Function<? super T, ? extends CompletionStage<U>> fn):允许你对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。和thenApply的不同类似于map和flatMap的不同:thenCompose在异步操作完成的时候对异步操作的结果进行一些操作,并且仍然返回CompletionStage类型。
// thenApply
CompletableFuture.supplyAsync(()->1)
       .thenApply(i->Integer.sum(i,10))
       .whenComplete((v,t)->System.out.println(v));  // 输出11

// handle
CompletableFuture.supplyAsync(()->1)
       .handle((v,t)->{
            return t!=null?"error":Integer.sum(v,10);
        })  // 多了对异常的考虑
       .whenComplete((v,t)->System.out.println(v)); // 输出11

// thenCompose
CompletableFuture.supplyAsync(()->1)
       .thenCompose(i->CompletableFuture.supplyAsync(()->10*i)) //把i作为入参,交给另外一个CompletableFuture处理
       .thenAccept(System.out::println);  //输出10

3. 结合(链接)两个futures

  • thenCombine (CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn):适用于将两个完全不相干的 CompletableFuture 对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务的情况。
  • thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action):用于组合两个并发的任务,产生新的Future没有返回值。工作方式与thenAccept()类似,但是是等待两个Future而不是一个。
// thenCombine
CompletableFuture.supplyAsync(()->1)
     .thenCombine(CompletableFuture.supplyAsync(()->2.0d),(r1,r2)->r1+r2)
     .thenAccept(System.out::println);  //输出 3.0

// thenAcceptBoth
CompletableFuture.supplyAsync(()->1)
     .thenAcceptBoth(CompletableFuture.supplyAsync(()->2.0d),(r1,r2)->{
      // 不会把计算结果再传递下去,就是一个消费者
                    System.out.println(r1);
                    System.out.println(r2);
                    System.out.println(r1+r2);
      }); 

4. 将Function作用于两个已完成Stage的结果之一

CompletableFuture 还能等待第一个 Future 而不是所有 Future 计算完毕,当你有两个任务,两个都是同样类型的结果,你只关心响应时间,谁快用谁的结果,不关心具体哪个任务首先返回。比如有两个数据一样的数据库,你同时去查询,谁快用谁的数据。

  • applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn):取2个Future中最先返回的,Function作用于两个Future中完成最快的那一个。
  • acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action):和applyToEither类似,不同的是acceptEither参数为consumer,不会有返回值,也就是会消费掉。
  • runAfterEither(CompletionStage<?> other,Runnable action):两个Future,任何一个完成了都会执行下一步的操作(Runnable)。
/**
  * 输出:
  * I am future 2
  * I am future 1
  * 2
  * 20
  * 1
*/
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).applyToEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),v->v*10)
           .thenAccept(System.out::print);

/**
  * 输出:
  * I am future 2
  * I am future 1
  * 2
  * 1
 */
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).acceptEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),System.out::println);

/**
  * 输出:
  * I am future 2
  * done
  * I am future 1
*/
CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 1");
            return 1;
        }).runAfterEither(CompletableFuture.supplyAsync(()->{
            System.out.println("I am future 2");
            return 2;
        }),()->System.out.println("done"));

Thread.currentThread().join();  // 为了让多个任务都执行完,这里添加一个join方法

5. allOf与anyOf

  • allOf(CompletableFuture<?>... cfs):该方法接收一个由CompletableFuture 构成的数组,数组中的所有 Completable-Future 对象执行完成之后,它返回一个 CompletableFuture<Void> 对象。
  • anyOf():该方法接收一个 CompletableFuture 对象构成的数组,返回由第一个执行完毕的 CompletableFuture 对象的返回值构成的 CompletableFuture<Object> ,也就是只要 CompletableFuture 对象数组中有任何一个执行完毕就不再等待。
 /**
   * 输出:
   * 3.12
   * 3.12
   * 3.12
   * 3.12
   * done
   */
List<CompletableFuture<Double>> collect = Arrays.asList(1, 2, 3, 4)
            .stream()
            .map(i -> CompletableFuture.supplyAsync(() -> 3.12))
            .collect(Collectors.toList());
CompletableFuture.allOf(collect.toArray(new CompletableFuture[collect.size()]))
            .thenRun(()->System.out.println("done"));
public static void test4() throws Exception {

        CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(()->{
            //模拟执行耗时任务
            System.out.println("task1 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果
            return "result1";
        });

        CompletableFuture<String> cf2 = CompletableFuture.supplyAsync(()->{
            //模拟执行耗时任务
            System.out.println("task2 doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果
            return "result2";
        });

        CompletableFuture<Object> anyResult=CompletableFuture.anyOf(cf1,cf2);

        System.out.println("第一个完成的任务结果:"+anyResult.get());

        CompletableFuture<Void> allResult=CompletableFuture.allOf(cf1,cf2);

        // 阻塞等待所有任务执行完成
        // join等同于get,唯一区别是前者不会扔出任何检查意外exception
        allResult.join();
        System.out.println("所有任务执行完成");
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,670评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,928评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,926评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,238评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,112评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,138评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,545评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,232评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,496评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,596评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,369评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,226评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,600评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,906评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,185评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,516评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,721评论 2 335

推荐阅读更多精彩内容