Java8异步编程-CompletableFuture

异步编程的难点

如何优雅地实现异步编程一直都是一个难题,异步编程的通常做法就是采用callback的方法,但是这种方法通常会把代码嵌套在正常流程的代码中,而且当有多层嵌套的时候代码更加难以维护。

另外还有一点,异步编程的异常处理也是难以未维护,特别是在Java中,异步编程通常由新的线程完成,而子线程的异常是无法在父线程捕获的,那么对于异步执行结果的获取就需要付出更大的代价,比如通过:轮询、事件驱动等来完成。

从Future说起

Java5之后就引入Future用于异步编程,通过get()方法来对异步执行结果的同步等待和结果获取:

Future<String> doSomething = Executors.newSingleThreadExecutor().submit(() -> {
    try {
        Thread.sleep(1000 * 3);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "success";
});

String result = doSomething.get();

System.out.println(result);

Future的Api比较简单,而已对异常处理不友好,如果有同时有多个异步操作需要同时进行是就不好处理了

假设有这么一个场景,用户登录拿到登录凭证(token),登录之后获取用户信息

ExecutorService executors = Executors.newFixedThreadPool(10);

Future<String> login = executors.submit(()->login());
String token = login.get();

Future<String> userInfo = executors.submit(() -> userInfo(token));
String userInfoResult = userInfo.get();

System.out.println(userInfoResult);

这种实现方法还是不能实现真正的异步编程或者说不是我们所期望的,我们期望的是登录后获取用户信息,但这两件事情完成后统一对结果进行处理,而这种方式是先等待登录之后再取用户信息,和同步调用类似,这就与我们的设想不符。

CompletableFuture

初识CompletableFuture

在Java8中引入了CompletableFuture类,同时实现了Future接口和CompletionStage接口,提供了一套用于异步编程的Api接口并且提供了异步处理

CompletableFuture提供了许多异步编程的操作,可以说是Java中的Promise了,下面通过CompletableFuture来实现上面提到的例子:

String userInfo = CompletableFuture.supplyAsync(() -> login())
    .thenApplyAsync(token -> userInfo(token))
    .get();

System.out.println(userInfo);

CompletableFuture API

CompletableFuture方法很多,功能也很丰富,这里不一一说明,主要可以分为这几类来使用:

1.把CompletableFuture当Future使用

CompletableFuture实现了Future接口,也就是Future能做的CompletableFuture也同样能使用,加上completecompleteExceptionally方法可以控制结果的结束:

CompletableFuture<String> f = new CompletableFuture<>();

Executors.newSingleThreadExecutor().submit(()->{
    f.complete("hello");
    //f.completeExceptionally(new RuntimeException("error"));
});

String result = f.get();

System.out.println(result);

可以通过CompletableFuture来控制多个异步操作同时执行:

CompletableFuture<String> f = new CompletableFuture<>();

new Thread(() -> {
    try {
        System.out.println("thread1:" + f.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();

new Thread(() -> {
    try {
        System.out.println("thread2:" + f.get());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}).start();

f.complete("hello");

2.异步操作

创建异步操作的方法主要是:

public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)

使用如下:

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep(3000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "hello";
});

String result = f.get();

System.out.println(result);

3.连续异步操作

public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) 
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) 
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) 
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) 
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor) 

使用如下:

CompletableFuture<Void> f = CompletableFuture
                .supplyAsync(() -> "hello")
                .thenApplyAsync(res -> res + " world!")
                .thenAcceptAsync(System.out::println);
// wait for job done
f.get();

4.等待操作完成

public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action) 
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action) 
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)

使用如下:

CompletableFuture<String> f = CompletableFuture
        .supplyAsync(() -> "hello")
        .thenApplyAsync(res -> res + " world!")
        .whenComplete((res, err) -> {
            if (err != null) {
                err.printStackTrace();
            } else {
                System.out.println(res);
            }
        });

// wait for job done
f.get();

5.组合

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) 
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) 
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn) 
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)

使用如下:

CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Hello")
        .thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " World,"))
        .thenCombine(CompletableFuture.supplyAsync(() -> "CompletableFuture!"), (a, b) -> a + b);

String result = f.get();

System.out.println(result);//Hello World,CompletableFuture!

6.结果&异常处理

public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn) 
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) 
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) 
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) 

使用如下:

// 异常处理
CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello")
        .thenApplyAsync(res -> res + "World")
        .thenApplyAsync(res -> {
            throw new RuntimeException("error");
        })
        .exceptionally(e -> {
            //handle exception here
            e.printStackTrace();
            return null;
        });
f.get();

// 执行结果处理
CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> "Hello")
        .thenApplyAsync(res -> res + "World")
        .thenApplyAsync(res -> {
            throw new RuntimeException("error");
        })
        .handleAsync((res, err) -> {
            if (err != null) {
                //handle exception here
                return null;
            } else {
                return res;
            }
        });

Object result = f2.get();

System.out.println(result);

7.并行执行异步操作并统一处理结果

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

使用如下:

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "!");

// 使用allOf方法
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.get();

System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());

// 结合StreamAPI
List<String> result = Stream.of(f1, f2, f3)
        .map(CompletableFuture::join)
        .collect(Collectors.toList());

System.out.println(result);

总结

在Java7之前,Java中对于异步编程的实现都可能比较复杂或者实现得不够优雅,而CompletableFuture的出现则提供了异步编程的强大能力,虽然API有点多但是只要稍加理解和使用还是很好应用的,通过链式调用使原本通过回调的方式变得更加优雅,代码的可阅读性和可维护性也得到一定的提高。

参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容

  • 在现代软件开发中,系统功能越来越复杂,管理复杂度的方法就是分而治之,系统的很多功能可能会被切分为小的服务,对外提供...
    天堂鸟6阅读 7,141评论 0 23
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,637评论 18 139
  • 《见面》这个主题,真的难到我了,蒙蒙的不知该说些什么。如果让我选择与一个人见面,我会想见谁?在现实的生活中...
    静等花开_fee0阅读 284评论 0 0
  • Cleere阅读 131评论 0 0
  • 原文请戳:http://clarkesworldmagazine.com/swirsky_02_11/ 原作者:雷...
    有猫出没阅读 555评论 0 1