CompletableFuture(异步神器)

CompletableFuture是java8提供更加完备的函数式异步工具。其继承了Future和CompletionStage接口。
CompletableFuture收集了所有ListenableFuture in GuavaSettableFuture的特征。此外,内置的lambda表达式使它更接近于Scala/Akka futures。这听起来好得令人难以置信,但是请继续读下去。CompletableFuture有两个主要的方面优于ol中的Future – 异步回调/转换,这能使得从任何时刻的任何线程都可以设置CompletableFuture的值。

CompletableFuture类实现了CompletionStage接口,首先我们需要理解这个接口的契约。它代表了一个特定的计算的阶段,可以同步或者异步的被完成。你可以把它看成一个计算流水线上的一个单元,最终会产生一个最终结果,这意味着几个CompletionStage可以串联起来,一个完成的阶段可以触发下一阶段的执行,接着触发下一次,接着……

除了实现CompletionStage接口, CompletableFuture也实现了future接口, 代表一个未完成的异步事件。CompletableFuture提供了方法,能够显式地完成这个future,所以它叫CompletableFuture。

Extract/modify wrapped value

Typically futures represent piece of code running by other thread. But that's not always the case. Sometimes you want to create a Future representing some event that you know will occur, e.g. JMS message arrival. So you have Future<Message> but there is no asynchronous job underlying this future. You simply want to complete (resolve) that future when JMS message arrives, and this is driven by an event. In this case you can simply create CompletableFuture, return it to your client and whenever you think your results are available, simply complete() the future and unlock all clients waiting on that future.

For starters you can simply create new CompletableFuture out of thin air and give it to your client:

public CompletableFuture<String> ask() {
    final CompletableFuture<String> future = new CompletableFuture<>();
    //...
    return future;
}

Notice that this future is not associated wtih any Callable<String>, no thread pool, no asynchronous job. If now the client code calls ask().get() it will block forever. If it registers some completion callbacks, they will never fire. So what's the point? Now you can say:

future.complete("42")

...and at this very moment all clients blocked on Future.get() will get the result string. Also completion callbacks will fire immediately. This comes quite handy when you want to represent a task in the future, but not necessarily computational task running on some thread of execution. CompletableFuture.complete() can only be called once, subsequent invocations are ignored. But there is a back-door called CompletableFuture.obtrudeValue(...) which overrides previous value of the Future with new one. Use with caution.

相当于消息总线的广播?!

Sometimes you want to signal failure. As you know Future objects can handle either wrapped result or exception. If you want to pass some exception further, there is CompletableFuture.completeExceptionally(ex) (and obtrudeException(ex) evil brother that overrides the previous exception). completeExceptionally() also unlock all waiting clients, but this time throwing an exception from get(). Speaking of get(), there is also CompletableFuture.join() method with some subtle changes in error handling. But in general they are the same. And finally there is also CompletableFuture.getNow(valueIfAbsent) method that doesn't block but if the Future is not completed yet, returns default value. Useful when building robust systems where we don't want to wait too much.

Last static utility method is completedFuture(value) that returns already completed Future object. Might be useful for testing or when writing some adapter layer.

Creating and obtaining CompletableFuture

OK, so is creating CompletableFuture manually our only option? Not quite. Just as with normal Futures we can wrap existing task with CompletableFuture using the following family of factory methods:

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
static CompletableFuture<Void> runAsync(Runnable runnable);
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

Methods that do not take an Executor as an argument but end with ...Async will use ForkJoinPool.commonPool() (global, general purpose pool introduces in JDK 8). This applies to most methods in CompletableFuture class. runAsync() is simple to understand, notice that it takes Runnable, therefore it returns CompletableFuture<Void> as Runnable doesn't return anything. If you need to process something asynchronously and return result, use Supplier<U>:

final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    //...long running...
    return "42";
}, executor);

Transforming and acting on one CompletableFuture (thenApply)

So I said that CompletableFuture is superior to Future but you haven't yet seen why? Simply put, it's because CompletableFuture is a monad and a functor. Not helping I guess? Both Scala and JavaScript allow registering asynchronous callbacks when future is completed. We don't have to wait and block until it's ready. We can simply say: run this function on a result, when it arrives. Moreover, we can stack such functions, combine multiple futures together, etc. For example if we have a function from Stringto Integer we can turn CompletableFuture<String> to CompletableFuture<Integer without unwrapping it. This is achieved with thenApply() family of methods:

<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn);
<U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);

As stated before ...Async versions are provided for most operations on CompletableFuture thus I will skip them in subsequent sections. Just remember that first method will apply function within the same thread in which the future completed while the remaining two will apply it asynchronously in different thread pool.

Let's see how thenApply() works:

CompletableFuture<String> f1 = //...
CompletableFuture<Double> f3 = 
    f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);

You see a sequence of transformations here. From String to Integer and then to Double. But what's most important, these transformations are neither executed immediately nor blocking. They are simply remembered and when original f1 completes they are executed for you. If some of the transformations are time-consuming, you can supply your own Executor to run them asynchronously. Notice that this operation is equivalent to monadic map in Scala.

Running code on completion (thenAccept/thenRun)

CompletableFuture<Void> thenAccept(Consumer<? super T> block);
CompletableFuture<Void> thenRun(Runnable action);

...Async variants are available as well for both methods, with implicit and explicit executor.I can't emphasize this enough: thenAccept()/thenRun() methods do not block (even without explicit executor).Treat them like an event listener/handler that you attach to a future and that will execute some time in the future. "Continuing" message will appear immediately, even if future is not even close to completion.

Error handling of single CompletableFuture

So far we only talked about result of computation. But what about exceptions? Can we handle them asynchronously as well? Sure!

CompletableFuture<String> safe = 
    future.exceptionally(ex -> "We have a problem: " + ex.getMessage());

exceptionally() takes a function that will be invoked when original future throws an exception. We then have an opportunity to recover by transforming this exception into some value compatible with Future's type. Further transformations of safe will no longer yield an exception but instead a String returned from supplied function.

A more flexible approach is handle() that takes a function receiving either correct result or exception:

CompletableFuture<Integer> safe = future.handle((ok, ex) -> {
    if (ok != null) {
        return Integer.parseInt(ok);
    } else {
        log.warn("Problem", ex);
        return -1;
    }
});

handle() is called always, with either result or exception argument being not-null. This is a one-stop catch-all strategy.

Combining two CompletableFuture together

Asynchronous processing of one CompletableFuture is nice but it really shows its power when multiple such futures are combined together in various ways.

Combining (chaining) two futures (thenCompose())

Sometimes you want to run some function on future's value (when it's ready). But this function returns future as well. CompletableFuture should be smart enough to understand that the result of our function should now be used as top-level future, as opposed to CompletableFuture<CompletableFuture<T>>. Method thenCompose() is thus equivalent to flatMap in Scala:

<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);

...Async variations are available as well. Example below, look carefully at the types and the difference between thenApply() (map) and thenCompose() (flatMap) when applying a calculateRelevance() function returning CompletableFuture<Double>:

CompletableFuture<Document> docFuture = //...
 
CompletableFuture<CompletableFuture<Double>> f =
    docFuture.thenApply(this::calculateRelevance);
 
CompletableFuture<Double> relevanceFuture =
    docFuture.thenCompose(this::calculateRelevance);
 
//...
 
private CompletableFuture<Double> calculateRelevance(Document doc) 

thenCompose() is an essential method that allows building robust, asynchronous pipelines, without blocking or waiting for intermediate steps.

Transforming values of two futures (thenCombine())

While thenCompose() is used to chain one future dependent on the other, thenCombine combines two independent futures when they are both done:

<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)

...Async variations are available as well. Imagine you have two CompletableFutures, one that loads Customer and other that loads nearest Shop. They are completely independent from each other,but when both of them are completed, you want to use their values to calculate Route. Here is a stripped example:

CompletableFuture<Customer> customerFuture = loadCustomerDetails(123);
CompletableFuture<Shop> shopFuture = closestShop();
CompletableFuture<Route> routeFuture = 
    customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop));
//...
private Route findRoute(Customer customer, Shop shop) //...

Notice that in Java 8 you can replace (cust, shop) -> findRoute(cust, shop) with simple this::findRoute method reference:

customerFuture.thenCombine(shopFuture, this::findRoute);

So you get the idea. We have customerFuture and shopFuture. Then routeFuture wraps them and "waits" for both to complete. When both of them are ready, it runs our supplied function that combines results (findRoute()). Thus routeFuture will complete when two underlying futures are resolved and findRoute() is done.

好用!自己实现该咋做?实现原理是啥?

Waiting for both CompletableFutures to complete

If instead of producing new CompletableFuture combining both results we simply want to be notified when they finish, we can use thenAcceptBoth()/runAfterBoth() family of methods (...Async variations are available as well). They work similarly to thenAccept() and thenRun() but wait for two futures instead of one:

<U> CompletableFuture<Void> thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> block);

CompletableFuture<Void> runAfterBoth(CompletableFuture<?> other, Runnable action)

Imagine that in the example above, instead of producing new CompletableFuture<Route> you simply want send some event or refresh GUI immediately. This can be easily achieved with thenAcceptBoth():

customerFuture.thenAcceptBoth(shopFuture, (cust, shop) -> {
    final Route route = findRoute(cust, shop);
    //refresh GUI with route
});

I hope I'm wrong but maybe some of you are asking themselves a question: why can't I simply block on these two futures? Like here:

Future<Customer> customerFuture = loadCustomerDetails(123);
Future<Shop> shopFuture = closestShop();
findRoute(customerFuture.get(), shopFuture.get());

Well,of course you can. But the whole point of CompletableFuture is to allow asynchronous, event driven programming model instead of blocking and eagerly waiting for result.So functionally two code snippets above are equivalent, but the latter unnecessarily occupies one thread of execution.

Waiting for first CompletableFuture to complete

Another interesting part of the CompletableFuture API is the ability to wait for first (as opposed to all) completed future. This can come handy when you have two tasks yielding result of the same type and you only care about response time, not which task resulted first. API methods (...Async variations are available as well):

CompletableFuture<Void> acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> block);

CompletableFuture<Void> runAfterEither(CompletableFuture<?> other, Runnable action)

As an example say you have two systems you integrate with. One has smaller average response times but high standard deviation. Other one is slower in general, but more predictable. In order to take best of both worlds (performance and predictability) you call both systems at the same time and wait for the first one to complete. Normally it will be the first one, but in case it became slow, second one finishes in an acceptable time:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
fast.acceptEither(predictable, s -> {
    System.out.println("Result: " + s);
});

s represents String reply either from fetchFast() or from fetchPredictably(). We neither know nor care.

Transforming first completed

applyToEither() is an older brother of acceptEither(). While the latter simply calls some piece of code when faster of two futures complete, applyToEither() will return a new future. This future will complete when first of the two underlying futures complete. API is a bit similar (...Async variations are available as well):

<U> CompletableFuture<U> applyToEither(CompletableFuture<? extends T> other, Function<? super T,U> fn)

The extra fn function is invoked on the result of first future that completed. I am not really sure what's the purpose of such a specialized method, after all one could simply use: fast.applyToEither(predictable).thenApply(fn). Since we are stuck with this API but we don't really need extra function application, I will simply use Function.identity() placeholder:

CompletableFuture<String> fast = fetchFast();
CompletableFuture<String> predictable = fetchPredictably();
CompletableFuture<String> firstDone = 
    fast.applyToEither(predictable, Function.<String>identity());

firstDone future can then be passed around. Notice that from the client perspective the fact that two futures are actually behind firstDone is hidden. Client simply waits for future to complete and applyToEither() takes care of notifying the client when any of the two finish first.

Combining multiple CompletableFuture together

So we now know how to wait for two futures to complete (using thenCombine()) and for the first one to complete (applyToEither()). But can it scale to arbitrary number of futures? Sure, using static helper methods:

static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

allOf() takes an array of futures and returns a future that completes when all of the underlying futures are completed (barrier waiting for all). anyOf() on the other hand will wait only for the fastest of the underlying futures. Please look at the generic type of returned futures. Not quite what you would expect? We will take care of this issue in the next article.

学习例子

public class TCompletableFuture {

    static void completedFutureExample() throws InterruptedException {
        CompletableFuture cf = CompletableFuture.completedFuture("message");
        System.out.println(cf.isDone());
        // getNow(null)方法在future完成的情况下会返回结果,就比如上面这个例子,否则返回null (传入的参数)。
        System.out.println(cf.getNow(null));

        // 在前一个阶段上应用函数
        CompletableFuture cfApply = CompletableFuture.completedFuture("message").thenApply(s -> {
            return s.toUpperCase();
        });
        System.out.println("MESSAGE:" + cfApply.getNow(null));

        // 在前一个阶段上异步应用函数
        CompletableFuture cfApplyAsync = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return s.toUpperCase();
        });
        System.out.println(cfApplyAsync.getNow(null));
        System.out.println("MESSAGE:" + cfApplyAsync.join());


    }

    static void runAsyncExample() throws InterruptedException {
        // CompletableFuture的方法如果以Async结尾,它会异步的执行(没有指定executor的情况下),
        // 异步执行通过ForkJoinPool实现, 它使用守护线程去执行任务。
        // 注意这是CompletableFuture的特性, 其它CompletionStage可以override这个默认的行为。
        CompletableFuture cf = CompletableFuture.runAsync(() -> {
            System.out.println(Thread.currentThread().isDaemon());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println(cf.isDone());
        TimeUnit.SECONDS.sleep(3);
        System.out.println(cf.isDone());
    }

    static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
        int count = 1;

        @Override
        public Thread newThread(Runnable runnable) {
            return new Thread(runnable, "custom-executor-" + count++);
        }
    });

    static void thenApplyAsyncWithExecutorExample() {

        // 使用定制的Executor在前一个阶段上异步应用函数
        CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
            System.out.println(Thread.currentThread().getName().startsWith("custom-executor-"));
            System.out.println(Thread.currentThread().isDaemon());
            return delayStringUp(s);
        }, executor);

        System.out.println(cf.getNow(null));
        System.out.println("MESSAGE" + cf.join());
    }

    static void thenAcceptExample() {
        // 如果下一阶段接收了当前阶段的结果,但是在计算的时候不需要返回值(它的返回类型是void),
        // 那么它可以不应用一个函数,而是一个消费者, 调用方法也变成了thenAccept:
        StringBuilder result = new StringBuilder();
        // 消费者同步地执行
        CompletableFuture.completedFuture("thenAccept message")
                .thenAccept(s -> result.append(s));
        System.out.println("Result :" + result);

        // 消费者同步地执行
        CompletableFuture cfAsync = CompletableFuture.completedFuture("thenAccept message")
                .thenAcceptAsync(s -> result.append(s));
        // join 同步等待
        cfAsync.join();
        System.out.println("Result :" + result);
    }

    static void completeExceptionallyExample() {
        // 完成计算异常
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> delayStringUp(s));

        // 创建了一个分离的handler阶段: exceptionHandler, 它处理异常异常,在异常情况下返回message upon cancel。
        CompletableFuture exceptionHandler = cf.handle((s, throwExcept) -> {
            return (throwExcept != null) ? "message upon cancel" : "";
        });

        /**
         * If not already completed, causes invocations of {@link #get()}
         * and related methods to throw the given exception.
         */
        cf.completeExceptionally(new RuntimeException("completed exceptionally"));

        System.out.println("Was not completed exceptionally" + cf.isCompletedExceptionally());

        try {
            // 显式地用异常完成第二个阶段。 在阶段上调用join方法,它会执行大写转换,然后抛出CompletionException
            // (正常的join会等待1秒,然后得到大写的字符串。不过我们的例子还没等它执行就完成了异常), 然后它触发了handler阶段。
            cf.join();
            System.out.println("Should have thrown an exception");
        } catch (CompletionException ex) { // just for testing
            System.out.println("completed exceptionally" + ex.getCause().getMessage());
        }

        System.out.println("message upon cancel" + exceptionHandler.join());
    }

    private static String delayStringUp(String s) {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return s.toUpperCase();
    }

    static void cancelExample() {
        // 完成计算异常

        // 对于CompletableFuture类,布尔参数并没有被使用,这是因为它并没有使用中断去取消操作,
        // 相反,cancel等价于completeExceptionally(new CancellationException())。
        CompletableFuture<String> cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> delayStringUp(s));
        CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
        System.out.println("Was not canceled" + cf.cancel(true));
        System.out.println("Was not completed exceptionally" + cf.isCompletedExceptionally());
        System.out.println("canceled message" + cf2.join());
    }

    static void applyToEitherExample() {
        // 在两个完成的阶段其中之一上应用函数

        String original = "Message";
        CompletableFuture cf1 = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayStringUp(s));
        CompletableFuture cf2 = cf1.applyToEither(
                CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayStringUp(s)),
                s -> s + " from applyToEither");
        System.out.println((String) cf2.join());
    }

    static void acceptEitherExample() {
        // acceptEither 在两个完成的阶段其中之一上调用消费函数
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayStringUp(s))
                .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayStringUp(s)),
                        s -> result.append(s).append("acceptEither"));
        cf.join();
        System.out.println("Result was empty" + result.toString().endsWith("acceptEither"));
    }

    static void runAfterBothExample() {
        // runAfterBoth  在两个阶段都执行完后运行一个 Runnable
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                () -> result.append("done"));
        System.out.println("Result :" + result);
    }

    static void thenAcceptBothExample() {
        // thenAcceptBoth 使用BiConsumer处理两个阶段的结果
        String original = "Message";
        StringBuilder result = new StringBuilder();
        CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth(
                CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
                (s1, s2) -> result.append(s1 + s2));
        System.out.println("MESSAGEmessage" + result.toString());
    }

    static void thenCombineExample() {
        // 如果CompletableFuture依赖两个前面阶段的结果, 它复合两个阶段的结果再返回一个结果,
        // 我们就可以使用thenCombine()函数。
        // 整个流水线是同步的,所以getNow()会得到最终的结果
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayStringUp(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayStringUp(s)),
                        (s1, s2) -> s1 + s2);
        System.out.println("MESSAGEmessage" + cf.getNow(null));
    }

    static void thenCombineAsyncExample() {
        // 异步使用BiFunction处理两个阶段的结果

        // Actions supplied for dependent completions of non-async methods may be performed by the thread
        // that completes the current CompletableFuture, or by any other caller of a completion method

        // 所以我们需要join方法等待结果的完成。
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original)
                .thenApplyAsync(s -> delayStringUp(s))
                .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayStringUp(s)),
                        (s1, s2) -> s1 + s2);
        System.out.println("MESSAGEmessage" + cf.join());
    }

    static void thenComposeExample() {
        // 这个方法等待第一个阶段的完成(大写转换),
        // 它的结果传给一个指定的返回CompletableFuture函数,
        // 它的结果就是返回的CompletableFuture的结果。
        String original = "Message";
        CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayStringUp(s))
                .thenCompose(
                        upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayStringUp(s)).thenApply(s -> upper + s)
                );
        System.out.println("MESSAGEmessage" + cf.join());
    }

    static void anyOfExample() {
        // 当几个阶段中的一个完成,创建一个完成的阶段

        // 本例中这些阶段都是同步地执行(thenApply), 从anyOf中创建的CompletableFuture会立即完成,
        // 这样所有的阶段都已完成,我们使用whenComplete(BiConsumer<? super Object, ? super Throwable> action)处理完成的结果。
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture> futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayStringUp(s)))
                .collect(Collectors.toList());

        CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> {
            if (th == null) {
                System.out.println("res = " + res);
                result.append(res);
            }
        });
        System.out.println("result = " + result);
    }

    static void allOfExample() {
        // 同步的当所有的阶段都完成后创建一个阶段
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture> futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayStringUp(s)))
                .collect(Collectors.toList());
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((v, th) -> {
            futures.forEach(cf -> {
                System.out.println(cf.getNow(null));
                result.append(cf.getNow(null));
            });
            result.append("done");
        });
        System.out.println("result = " + result);
    }

    static void allOfAsyncExample() {
        // 使用thenApplyAsync()替换那些单个的CompletableFutures的方法,
        // allOf()会在通用池中的线程中异步地执行。所以我们需要调用join方法等待它完成。
        StringBuilder result = new StringBuilder();
        List<String> messages = Arrays.asList("a", "b", "c");
        List<CompletableFuture> futures = messages.stream()
                .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayStringUp(s)))
                .collect(Collectors.toList());
        CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
                .whenComplete((v, th) -> {
                    futures.forEach(cf -> {
                        System.out.println(cf.getNow(null));
                        result.append(cf.getNow(null));
                    });
                    result.append("done");
                });
        allOf.join();
        System.out.println("result = " + result);

    }

    /**
     * 1 首先异步调用cars方法获得Car的列表,它返回CompletionStage场景。cars消费一个远程的REST API。
     * 2 然后我们复合一个CompletionStage填写每个汽车的评分,通过rating(manufacturerId)返回一个CompletionStage,
     * 它会异步地获取汽车的评分(可能又是一个REST API调用)
     * 3 当所有的汽车填好评分后,我们结束这个列表,所以我们调用allOf得到最终的阶段, 它在前面阶段所有阶段完成后才完成。
     * 4 在最终的阶段调用whenComplete(),我们打印出每个汽车和它的评分。
     */

    static void simpleExample(){
        // cars().thenCompose(cars -> {
        //     List<CompletionStage> updatedCars = cars.stream()
        //             .map(car -> rating(car.manufacturerId).thenApply(r -> {
        //                 car.setRating(r);
        //                 return car;
        //             })).collect(Collectors.toList());
        //
        //     CompletableFuture done = CompletableFuture
        //             .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
        //
        //     return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture)
        //             .map(CompletableFuture::join).collect(Collectors.toList()));
        //
        // }).whenComplete((cars, th) -> {
        //     if (th == null) {
        //         cars.forEach(System.out::println);
        //     } else {
        //         throw new RuntimeException(th);
        //     }
        // }).toCompletableFuture().join();
    }
}

Ref:
https://www.nurkiewicz.com/2013/05/java-8-definitive-guide-to.html
http://www.importnew.com/10815.html
https://mahmoudanouti.wordpress.com/2018/01/26/20-examples-of-using-javas-completablefuture/
http://www.importnew.com/28319.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容