JAVA 多线程与高并发学习笔记(十八)——CompletableFuture

本部分介绍Java 8 中提供的具备异步回调能力的工具类——CompletableFuture,该类实现了Future接口,还具备函数式编程能力。

CompletableFuture详解

CompletableFuture类实现了Future和CompletionStage两个接口,该类的实例作为一个异步任务。

CompletionStage接口

CompletionStage代表某个同步或者异步计算的一个阶段,或者一些列异步任务中的一个子任务。

每个CompletionStage子任务所包装的可以是一个Function、Consumer或者Runnable函数式接口实例。这三个函数式接口特点如下:

  1. Function接口的特点是有输入、有输出。
  2. Runnable接口的特点是无输入、无输出。
  3. Consumer接口的特点是有输入、无输出。

多个CompletionStage构成了一条任务流水线,多个子任务之间可以使用链式调用,下面是个简单的例子:

oneStage.thenApply(x -> square(x))
            .thenAccept(y -> System.out.println(y))
            .thenRun(() -> System.out.println())

上例子说明如下:

  1. oneStage是一个CompletionStage子任务。
  2. x -> square(x) 是一个Function类型的Lambda表达式,被thenApply方法包装成了CompletionStage子任务,它又包含输入和输出。
  3. y -> System.out.println(y)是一个Consumer类型的Lambda表达式,被thenAccept包装成了一个CompletionStage子任务,它只有输入(即上个任务的输出)。
  4. () -> System.out.println()是一个Runnable类型的Lambda表达式,被thenRun方法包装成了一个CompletionStage子任务,它没有输入输出。

使用runAsync和supplyAsync创建子任务

CompletableFuture定义了一组用于创建CompletionStage子任务的方法。

// 子任务包装一个Runnable实例,并调用ForkJoinPool。commonPool线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable)

// 子任务包装一个Runnable实例,并调用指定的executor线程池来执行
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

// 子任务包装一个Supplier实例,并调用ForkJoinPool。commonPool线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

// 子任务包装一个Supplier实例,并调用指定的executor线程池来执行
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

执行子任务时,如果没有执行Executor线程池,默认情况下会使用公共的ForkJoinPool线程池。

设置子任务回调钩子

可以为CompletionStage子任务设置特定的回调钩子,当计算结果完成或者抛出异常的时候,执行这些特定的钩子。

设置子任务回调钩子的主要函数如下:

// 设置子任务完成时的回调钩子
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throeable> action)

// 设置子任务完成时的回调钩子,可能不在同一线程执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)

// 设置子任务完成时的回调钩子,提交给线程池executor执行
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

// 设置异常处理的回调钩子
public CompletableFuture<T> exceptionally(Fuction<Throwable, ? extends T> fn)

下面看个简单例子:


public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("抛出异常");
            throw new RuntimeException("发生异常");
        });

        // 设置异步任务执行完成后的回调钩子
        future.whenComplete(new BiConsumer<Void, Throwable>() {
            @Override
            public void accept(Void aVoid, Throwable throwable) {
                System.out.println("执行完成");
            }
        });

        future.exceptionally(new Function<Throwable, Void>() {
            @Override
            public Void apply(Throwable t) {
                System.out.println("执行失败:" + t.getMessage());
                return null;
            }
        });

        future.get();
    }
}

运行结果:

抛出异常
执行完成
执行失败:java.lang.RuntimeException: 发生异常

调用cancel方法取消CompletableFuture时,任务被视为异常完成,completeExceptionally方法所设置的异常回调钩子也会被执行。

如果没有设置异常回调钩子,发生内部异常时会有两种情况发生:

  1. 在调用get方法启动任务时,如果遇到内部异常,get方法就会抛出ExecutionException。
  2. 在调用join和getNow启动任务时(大多数情况下都是如此),如果遇到内部异常,会抛出CompletionException。

调用handle方法统一处理异常和结果

除了分别通过whenComplete、exceptionally设置完成钩子、异常钩子之外,还可以调用handle方法统一处理结果和异常。

handle方法有3个重载版本:

// 在执行任务的同一个线程中处理异常和结果
public<U> CompletionStage<U> handle (BiFunction<? super T, Throwable, ? extends U> fn);

// 可能不再执行任务的同一个线程中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);

// 在指定线程池executor中处理异常和结果
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor);

将前面的例子改成handle版本。


public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("抛出异常");
            throw new RuntimeException("发生异常");
        });

        future.handle(new BiFunction<Void, Throwable, Void>() {
            @Override
            public Void apply(Void input, Throwable throwable) {
                if(throwable == null) {
                    System.out.println("没有发生异常");
                } else {
                    System.out.println("发生了异常");
                }
                return null;
            }
        });

        future.get();
    }
}

线程池的使用

默认情况下,通过静态方法runAsync和supplyAsync创建的CompletableFuture任务会使用公共的ForkJoinPool线程池。默认的线程数是CPU的核数。

如果所有CompletableFuture任务共享一个线程池,那么一旦有任务执行一些很慢的IO操作,会导致所有线程阻塞,造成线程饥饿。所以建议大家根据不同的业务类型创建不同的线程池。

异步任务的串行执行

如果两个异步任务需要串行,可以通过CompletionStage接口的thenApply、thenAccept、thenRun和thenCompose方法实现。

theApply方法

theApply方法有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public <U> CompletableFuture<U> thenApply(Function<? super T, ? extends U> fn)

// 后一个任务与前一个任务不在同一个线程中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn)

// 后一个任务在指定的executor线程池中执行
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T, ? extends U> fn, Executor executor)

参数fn表示要串行执行的第二个异步任务,泛型参数T是上一个任务所返回结果的类型,泛型参数U是当前任务的返回值类型。

看个简单例子。

public class ThenApplyDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                System.out.println("first step outcome is " + firstStep);

                return firstStep;
            }
        }).thenApplyAsync(new Function<Long, Long>() {
            @Override
            public Long apply(Long aLong) { // 参数是第一步的结果
                long secondStep = aLong * 2;
                System.out.println("Second outcome is " + secondStep);
                return secondStep;
            }
        });

        long result = future.get();
        System.out.println("outcome is " + result);
    }
}

thenRun方法

thenRun方法不关心任务的处理结果,只需要前一个任务执行完成,就开始执行后一个串行任务。

thenApply方法也有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenRun(Runnable action);

// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Runnable action);

// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Runnable action, Executor executor);

thenAccept方法

thenAccept方法接收前一个任务的处理结果,但是没有输出。

thenAccept方法有三个重载版本,声明如下:

// 后一个任务与前一个任务在同一个线程中执行
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

// 后一个任务与前一个任务不再同一个线程中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action);

// 后一个任务在executor线程池中执行
public CompletionStage<Void> thenRunAsync(Consumer<? super T> action, Executor executor);

thenCompose方法

thenCompose方法在第一个任务操作完成时,将它的结果作为参数传递给第二个任务。

thenCompose方法有3个重载版本,声明如下:

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U> > fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn);

public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U> > fn, Executor executor);

thenCompose方法要求第二个任务的返回值是一个CompletionStage异步实例。

将前面的例子改成theCompose版本:


public class ThenComposeDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Long> future = CompletableFuture.supplyAsync(new Supplier<Long>() {
            @Override
            public Long get() {
                long firstStep = 10L + 10L;
                System.out.println("first step outcome is " + firstStep);

                return firstStep;
            }
        }).thenCompose(new Function<Long, CompletionStage<Long>>() {
            @Override
            public CompletionStage<Long> apply(Long firstStepOutcome) {
                return CompletableFuture.supplyAsync(new Supplier<Long>() {
                    @Override
                    public Long get() {
                        long secondStep = firstStepOutcome * 2;
                        System.out.println("Second outcome is " + secondStep);
                        return secondStep;
                    }
                });
            }
        });
        long result = future.get();
        System.out.println("outcome is " + result);

    }
}

异步任务的合并执行

对两个异步任务合并可以通过CompletionStage接口的thenCombine、runAfterBoth和thenAcceptBoth三个方法来实现。

thenCombine方法

thenCombine会在两个任务都执行完成后,把两个任务的结果一起交给thenCombine来处理。

public <U, V> CompletionStage<V> thenCombine(
    CompletionStage<? extends U> other, // 待合并实例
    BiFunction<? super T, ? super U, ? extends V> fn); 

public <U, V> CompletionStage<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn
);

public <U, V> CompletionStage<V> thenCombineAsync(
    CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn,
    Executor executor
);

下面看一个使用thenCombine分三步计算(10+10)*(10+10)的例子:


public class ThenCombineDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        System.out.println("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 10 + 10;
                        System.out.println("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });
        CompletableFuture<Integer> future3 = future1.thenCombine(future2,
                new BiFunction<Integer, Integer, Integer>() {
                    @Override
                    public Integer apply(Integer integer, Integer integer2) {
                        return integer * integer2;
                    }
                });
        Integer result = future3.get();
        System.out.println(" outcome is " + result);
    }
}

runAfterBoth方法

runAfterBoth方法不关心没异步任务的输入参数和处理结果。

public CompletionStage<Void> runAfterBoth(
    CompletionStage<?> other, Runnable action
);

public CompletionStage<Void> runAfterBothAsync(
    CompletionStage<?> other, Runnable action
);

public CompletionStage<Void> runAfterBothAsync(
    CompletionStage<?> other, Runnable action, Executor executor
);

thenAcceptBoth方法

thenAcceptBoth方法可以接收前两个任务的处理结果,但是第三个任务却不返回结果。

public <U> CompletionStage<Void> thenAcceptBoth(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action
);

public <U> CompletionStage<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action
);

public <U> CompletionStage<Void> thenAcceptBothAsync(
    CompletionStage<? extends U> other,
    BiConusmer<? super T, ? super U> action,
    Executor executor
);

allOf等待所有的任务结束

allOf会等待所有任务结束,以合并所有任务。

异步任务的选择执行

对有两个异步任务的选择可以通过CompletionStage接口的applyToEither、runAfterEither和acceptEither三个方法来实现。

applyToEither方法

两个CompletionStage谁返回结果的速度快,applyToEither方法就用这个结果进行下一步操作。

// 和other任务返回较快的结果用于执行fn回调函数
public <U> CompletionStage<U> applyToEither(
    CompletionStage<? extends T> other, Function<? super T, U> fn
);

// 功能与上一个相同,但是不一定在同一个线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn
);

// 功能与上一个相同,在指定线程执行fn
public <U> CompletionStage<U> applyToEitherAsync(
    CompletionStage<? extends T> other, Function<? super T, U> fn, Executor executor
);

看个例子。


public class ApplyToEitherDemo {

    public static void main(String[] args) throws Exception{
        CompletableFuture<Integer> future1 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer firstStep = 10 + 10;
                        System.out.println("firstStep outcome is " + firstStep);
                        return firstStep;
                    }
                });
        CompletableFuture<Integer> future2 =
                CompletableFuture.supplyAsync(new Supplier<Integer>() {
                    @Override
                    public Integer get() {
                        Integer secondStep = 100 + 100;
                        System.out.println("secondStep outcome is " + secondStep);
                        return secondStep;
                    }
                });

        CompletableFuture<Integer> future3 =
                future1.applyToEither(future2,
                        new Function<Integer, Integer>() {
                            @Override
                            public Integer apply(Integer integer) {
                                return integer;
                            }
                        });
        Integer result = future3.get();
        System.out.println("outcome is " + result);
    }
}

runAfterEither方法

runAfterEither方法的功能为前面两个CompletionStage实例,任何一个执行完成都会执行第三部回调。

// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> runAfterEither(
    CompletionStage<?> other, Runnable fn
);

// 功能与上一个相同,但是不一定在同一个线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
    CompletionStage<?> other, Runnable fn
);

// 功能与上一个相同,在指定线程执行fn
public CompletionStage<Void> runAfterEitherAsync(
    CompletionStage<?> other, Runnable fn, Executor executor
);

acceptEither方法

acceptEither用哪个最快的CompletionStage的结果作为下一步的输入,但是第三步没有输出。

// 和other任务返回较快的结果用于执行fn回调函数
public CompletionStage<Void> acceptEither(
    CompletionStage<?> other, Consumer<? super T> fn
);

public CompletionStage<Void> acceptEitherAsync(
    CompletionStage<?> other, Consumer<? super T> fn
);

public CompletionStage<Void> acceptEitherAsync(
    CompletionStage<?> other, Consumer<? super T> fn, Executor executor
);

CompletableFuture的综合案例

使用CompletableFuture实现喝茶案例。


public class CompletableFutureDemo2 {

    private static final int SLEEP_GAP = 3;

    public static void main(String[] args) {
        // 洗水壶->烧开水
        CompletableFuture<Boolean> hotJob =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("洗好水壶");
                    System.out.println("烧开水");
                    try {
                        Thread.sleep(SLEEP_GAP);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("水开了");
                    return true;
                });
        // 洗茶杯->那茶叶
        CompletableFuture<Boolean> washJob =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("洗茶杯");
                    try {
                        Thread.sleep(SLEEP_GAP);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("洗完了");

                    return true;
                });
        CompletableFuture<String> drinkJob =
                hotJob.thenCombine(washJob, (hotOK, washOk) -> {
                    if(hotOK && washOk) {
                        System.out.println("泡茶喝,茶喝完");
                        return "茶喝完了";
                    }
                    return "没有喝到差";
                });
        System.out.println(drinkJob.join());
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,386评论 6 479
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,939评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,851评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,953评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,971评论 5 369
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,784评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,126评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,765评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,148评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,744评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,858评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,479评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,080评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,053评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,278评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,245评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,590评论 2 343