jdk8特性-CompletableFuture

  • CompletionStage接口翻译成中文是“完工阶段“,是java8新增的一个工具。定义的一系列方法,接收的参数有有三类CompletionStage,Runnable,Consumer和Function方法。面向函数式编程,因为入参抽象的是方法,并且将结果传入下一个被调用的方法。该接口表示异步计算的某种状态。到了这个状态会出发对应的方法。

  • CompletableFuture分别实现了Fulture和CompletionStage接口。也就是说拥有异步计算的能力,通过回调来处理计算的结果。当然包括了多个CompletableFuture之间的组合。增加了多线程之间的协调性。可以是完成了也可以是未完成的某个阶段。这样的组合提供了很多编程的可能性。

  • 话不多说,我们直接上代码,用法都总结和归类到写在注释上

package com.example.demo;

import org.junit.Test;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.*;

/**
 * Project <demo-project>
 * Created by jorgezhong on 2018/9/8 11:45.
 */
public class CompletableFutureDemo {


    /**
     * 创建CompletableFuture
     * - runAsync
     * - supplyAsync
     * - completedFuture
     * <p>
     * 异步计算启用的线程池是守护线程
     */
    @Test
    public void test1() {

        //1、异步计算:无返回值

        //默认线程池为:ForkJoinPool.commonPool()
        CompletableFuture.runAsync(() -> {
            // TODO: 2018/9/8 无返回异步计算
            System.out.println(Thread.currentThread().isDaemon());
        });

        //指定线程池,(到了jdk9CompletableFuture还拓展了延迟的线程池)
        CompletableFuture.runAsync(() -> {
            // TODO: 2018/9/8 无返回异步计算
        }, Executors.newFixedThreadPool(2));


        //2、异步计算:有返回值

        // 使用默认线程池
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "result1");
        //getNow指定异步计算抛出异常或结果返回null时替代的的值
        String result1 = future1.getNow(null);


        //  指定线程池
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "result2", Executors.newFixedThreadPool(2));
        //getNow指定异步计算抛出异常或结果返回null时替代的的值
        String result2 = future2.getNow(null);


        //3、初始化一个有结果无计算的CompletableFuture
        CompletableFuture<String> future = CompletableFuture.completedFuture("result");
        String now = future.getNow(null);
        System.out.println("now = " + now);


    }


    /**
     * 计算完成时需要对异常进行处理或者对结果进行处理
     * - whenComplete:同步处理包括异常
     * - thenApply:同步处理正常结果(前提是没有异常)
     * <p>
     * - whenCompleteAsync:异步处理包括异常
     * - thenApplyAsync:异步处理正常结果(前提是没有异常)
     * <p>
     * - exceptionally : 处理异常
     */
    @Test
    public void test2() {


        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "result");

        //whenComplete方法收future的结果和异常,可灵活进行处理
        //1、同步处理

        //  无返回值:可处理异常
        future.whenComplete((result, throwable) -> System.out.println("result = " + result));

        //  有返回值:没有异常处理(前提)
        CompletableFuture<String> resultFuture1 = future.thenApply(result -> "result");
        String result1 = resultFuture1.getNow(null);


        //2、异步处理:

        //  无返回值: 默认线程池
        future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result));
        //  无返回值:指定线程池
        future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + result), Executors.newFixedThreadPool(2));

        //  有返回值:默认线程池
        CompletableFuture<String> resultFuture2 = future.thenApplyAsync(result -> "result");
        String result2 = resultFuture2.getNow(null);

        //  有返回值:指定线程池
        CompletableFuture<String> resultFuture3 = future.thenApplyAsync(result -> "result", Executors.newFixedThreadPool(2));
        String result3 = resultFuture3.getNow(null);


        //3、处理异常,处理完之后返回一个结果
        CompletableFuture<String> exceptionallyFuture = future.whenCompleteAsync((result, throwable) -> System.out.println("result = " + 1 / 0))
                .exceptionally(throwable -> "发生异常了:" + throwable.getMessage());
        System.out.println(exceptionallyFuture.getNow(null));


    }


    /**
     * 异常处理还可以使用以下两个方法
     * - handle
     * - handleAsync
     * <p>
     * 备注:exceptionally同步和异步计算一起用如果出现异常会把异常抛出。用以上的方法可以拦截处理
     */
    @Test
    public void test3() {


        CompletableFuture<String> exceptionoHandle = CompletableFuture.completedFuture("produce msg")
                .thenApplyAsync(s -> "result" + 1 / 0);

        String handleResult1 = exceptionoHandle.handle((s, throwable) -> {
            if (throwable != null) {
                return throwable.getMessage();
            }
            return s;
        }).getNow(null);

        //指定线程池
        String handleResult2 = exceptionoHandle.handleAsync((s, throwable) -> {
            if (throwable != null) {
                return throwable.getMessage();
            }
            return s;
        }, Executors.newFixedThreadPool(2)).getNow(null);

    }

    /**
     * 生产--消费
     * - thenAccept:同步的
     * - thenAcceptAsync:异步的
     * <p>
     * 接受上一个处理结果,并实现一个Consumer,消费结果
     */
    @Test
    public void test4() {

        //同步的
        CompletableFuture.completedFuture("produce msg")
                .thenAccept(s -> System.out.println("sync consumed msg : " + s));

        //异步的
        //默认线程池
        CompletableFuture.completedFuture("produce msg")
                .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s));
        //指定线程池
        CompletableFuture.completedFuture("produce msg")
                .thenAcceptAsync(s -> System.out.println("async consumed msg : " + s), Executors.newFixedThreadPool(2));
    }


    /**
     * 取消任务
     * - cancel
     */
    @Test
    public void test5() throws InterruptedException {

        CompletableFuture<String> message = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {

            try {
                Thread.sleep(800);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s + "result";

        });
        String now = message.getNow(null);
        System.out.println("now = " + now);

        //取消
        boolean cancel = message.cancel(true);
        System.out.println("cancel = " + cancel);


        //如果这里再去获取,会抛出异常,说明已经取消了
        //String now1 = message.getNow(null);

        Thread.sleep(1000);

    }


    /**
     * 两个异步计算
     * - applyToEither:有返回值,同步
     * - acceptEither:无返回值,同步
     * - applyToEitherAsync:有返回值,异步
     * -
     */
    @Test
    public void test6() {


        CompletableFuture<String> task1 = CompletableFuture.completedFuture("task1")
                .thenApply(s -> "task1的计算结果:s1 = " + s);

        //同步,有返回值
        //applyToEither第二个参数接收的值是task1计算的返回值
        CompletableFuture<String> result1 = task1.applyToEither(CompletableFuture.completedFuture("task2")
                .thenApply(s -> "task2的计算结果:s2 = " + s), s -> s);
        System.out.println("task2:" + result1.getNow(null));


        //同步,无返回值
        task1.acceptEither(CompletableFuture.completedFuture("task3")
                .thenApply(s -> "task3的计算结果:s3 = " + s), s -> System.out.println("task3:" + s));


        //异步有返回值,默认线程池,也可以指定
        CompletableFuture<String> result2 = task1.applyToEitherAsync(CompletableFuture.completedFuture("task4")
                .thenApply(s -> "task4的计算结果:s4 = " + s), s -> s);
        //由于是异步的,主线程跑的快一点,因此join()之后才能看到跑完的结果
        System.out.println("task4:" + result2.join());


        //异步无返回值,指定线程池,也可以使用默认线程池
        CompletableFuture<Void> task5 = task1.acceptEitherAsync(CompletableFuture.completedFuture("task5")
                .thenApply(s -> "task5的计算结果:s5 = " + s), s -> System.out.println("task5:" + s), Executors.newFixedThreadPool(2));
        task5.join();


    }

    /**
     * 组合计算结果
     * - runAfterBoth:都计算完之后执行一段代码
     * - thenAcceptBoth:都计算完之后把结果传入,并执行一段代码
     * <p>
     * - thenCombine:组合两个结果
     * - thenCompose:组合两个结果
     */
    @Test
    public void test7() {

        //runAfterBoth方式
        StringBuilder msg = new StringBuilder("jorgeZhong");
        CompletableFuture.completedFuture(msg)
                .thenApply(s -> s.append(" task1,"))
                .runAfterBoth(CompletableFuture.completedFuture(msg)
                        .thenApply(s -> s.append(" task2")), () -> System.out.println(msg));


        //thenAcceptBoth方式
        CompletableFuture.completedFuture("jorgeZhong")
                .thenApplyAsync(String::toLowerCase)
                .thenAcceptBoth(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase), (s, s2) -> System.out
                        .println("s1:" + s + ", s2:" + s2));


        //thenCombine方式
        CompletableFuture<String> result1 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCombine(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApply(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);

        System.out.println("result1:" + result1.getNow(null));

        //异步
        CompletableFuture<String> result11 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCombineAsync(CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase), (s, s2) -> "s1:" + s + ", s2:" + s2);
        System.out.println("result11:" + result11.join());


        //thenCompose方式
        CompletableFuture<String> result2 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenCompose(s -> CompletableFuture.completedFuture("jorgeZhong")
                        .thenApply(String::toUpperCase)
                        .thenApply(s1 -> "s:" + s + ", s1:" + s1));
        System.out.println("result2:" + result2.getNow(null));

        //异步
        CompletableFuture<String> result22 = CompletableFuture.completedFuture("jorgeZhong")
                .thenApply(String::toLowerCase)
                .thenComposeAsync(s -> CompletableFuture.completedFuture("jorgeZhong")
                        .thenApplyAsync(String::toUpperCase)
                        .thenApplyAsync(s1 -> "s:" + s + ", s1:" + s1));

        System.out.println("result22:" + result22.join());
    }


    /**
     * 多个CompletableFuture策略
     * - anyOf:接受一个CompletableFuture数组,任意一个任务执行完返回。都会触发该CompletableFuture
     * - whenComplete:计算执行完之后执行实现的一段代码,将上一个结果和异常作为参数传入
     */
    @Test
    public void test8() throws InterruptedException {

        List<String> messages = Arrays.asList("a", "b", "c");
        CompletableFuture.anyOf(messages.stream()
                .map(o -> CompletableFuture.completedFuture(o).thenApplyAsync(s -> {

                    try {
                        Thread.sleep(new Random().ints(99, 300).findFirst().getAsInt());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return s.toUpperCase();
                }))
                .toArray(CompletableFuture[]::new))
                .whenComplete((res, throwable) -> {
                    if (throwable == null) {
                        System.out.println(res.toString());
                    }
                });


        Thread.sleep(1000);


    }


    /**
     * 多个CompletableFuture策略
     * - allOf:接受一个CompletableFuture数组,所有任务返回后,创建一个CompletableFuture
     */
    @Test
    public void test9() {

        List<String> messages = Arrays.asList("a", "b", "c");
        CompletableFuture[] cfs = messages.stream()
                .map(s -> CompletableFuture.completedFuture(s).thenApplyAsync(String::toUpperCase))
                .toArray(CompletableFuture[]::new);
        CompletableFuture.allOf(cfs)
                .whenCompleteAsync((aVoid, throwable) -> Arrays.stream(cfs).forEach(completableFuture -> System.out
                        .println(completableFuture.getNow(null))));


    }


}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,133评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,682评论 3 390
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,784评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,508评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,603评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,607评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,604评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,359评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,805评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,121评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,280评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,959评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,588评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,206评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,442评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,193评论 2 367
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,144评论 2 352

推荐阅读更多精彩内容