新颖的、优雅的异步处理数据的方法
Java SE 8为Java平台带来了许多新东西,其中很多已经在生产环境当中得到了应用。但是在异步编程方法,却并不是每个程序员都能很好的使用,也并非所有应用程序都使用java.util.concurrent包,即使此包中对于编写正确的并发代码提供的原语非常有用。
java.util.concurrent包在Java 8中增加了几个非常好的补充接口和实现类。我们在本文中讨论的是CompletionStage接口和CompletableFuture实现类。 与Future接口一起,它们为构建异步系统提供了非常好的应用模式。
Problem Statement
让我们从下面的代码开始。 我们不用关心是哪个API提供的方法,也不关心使用的类,甚至不用关心是不是正确的Java代码。
queryEngine.select("select user from User user")
.forEach(user -> System.out.println(user));
我们这里有一个查询引擎,它从数据库上执行Java Persistence Query Language (JPQL)类型的请求。 查询的结果,然后打印结果。 查询数据库的速度可能很慢,因此我们希望在单独的线程中执行此代码,并在获取结果以后触发打印。 一旦我们启动了这项任务,我们真的不想再慢慢等待了。
我们在Java 7中有哪些API工具来实现此需求? 众所周知,Java 5中引入的Callable,我们可以将要执行的任务包装在Callable中,并将此对象提交给ExecutorService。 如下所示:
Callable<String> task = () -> "select user from User";
Future<String> future = executorService.submit(task);
从future对象获取结果的唯一方法是在提交Callable任务的线程中调用其get() 方法。 此方法是阻塞的,因此此调用将阻止线程,直到结果查询完毕。
这正是CompletionStage的使用场景。
第一个链模式
让我们使用CompletionStage模式重写任务提交。
原来:
executor.submit(() -> {
() -> "select user from User";
});
改变:
CompletableFuture<List<User>> completableFuture =
CompletableFuture.supplyAsync(() -> { () -> dbEngine.query("select user from User"); }, executor);
我们将参数Callable传递给CompletableFuture的静态supplyAsync()方法,而不是将它传递给ExecutorService的submit()方法。 此方法还可以将Executor作为第二个参数,使客户端可以自定义选择执行Callable的线程池。
它返回一个CompletableFuture实例,这是一个Java 8的新类。在这个对象上,我们可以做以下操作:
completableFuture.thenAccept(System.out::println);
thenAccept()方法接收的参数是一个Consumer,在结果可用时自动调用该方法,无需编写额外的等待代码。 避免像前一种情况那样造成线程阻塞。
CompletionStage是什么?
简而言之,CompletionStage是一个承载任务的模型。接下来我们将看到,任务可以是任意的Runnable,Consumer或Function的实例。 任务是链的一个要素。 CompletionStage以不同的方式链接在一起。 “上游”元素是之前执行的CompletionStage。 因此,“下游”元素是在之后执行的CompletionStage。
执行完成一个或多个上游CompletionStageS后,将触发当前的CompletionStage执行。 执行完CompletionStageS可能会有返回值,返回值可以传递给当前的CompletionStage。 当前CompletionStage执行完会触发其他下游的CompletionStageS,并且将生成返回值传递下去。
因此,CompletionStage是链的一个元素。
CompletionStage接口的实现类是java.util.concurrent.CompletableFuture的实现。 请注意,CompletableFuture也实现了Future接口。但是 CompletionStage没有继承Future。
一个任务一般有如下三种状态:
1、正在执行
2、执行正常完成,并产生正确的结果
3、执行异常完成,可能会产生异常
Future的方法介绍
Future定义了三种类型五个方法:
- cancel(), 试图取消正在执行的任务
- isCanceled() 和 isDone() 判断任务的状态(取消成功|完成)
- get(), 两个方法,一个不带参数的,和一个带超时参数的。
CompletableFuture增加了六种类似Future的新方法。
前两个方法是join()和getNow(value)。 第一个,join(),阻塞直到CompletableFuture完成,和Future的get()方法一样。 主要区别在于join() 方法不会抛出显式的异常(get()方法抛出InterruptedException, ExecutionException异常),从而更简单。getNow(value)类似,它会立即返回,如果完成,则返回执行结果,如果没有完成,则返回给定得默认值value。 请注意,此调用不会阻塞,不会等待CompletableFuture完成。
其余四种方法强制CompletableFuture结束,无论是使用默认值还是异常,如果CompletableFuture已经完成,它们可以覆盖此CompletableFuture得返回值。
- complete(value)方法完成CompletableFuture(如果CompletableFuture没有结束),并返回value。 如果CompletableFuture已完成,则complete(value)方法不会执行。 如果需要更改value,则需要调用obtrude(value) 方法。 此方法确实会更改CompletableFuture的值,即使它已经完成。 但是使用的时候要小心,因为complete已经触发了客户端,有可能导致客户端会得到不期望的结果。
- 另一对方法的工作方式相同,但它们强制CompletableFuture抛出异常并结束: completeExceptionally(throwable)和obtrudeExceptionally(throwable)。 如果CompletableFuture尚未完成,则第一个抛出RuntimeException,第二个强制CompletableFuture更改其状态,和*obtrude(value) *类似。
如何创建CompletableFuture
创建CompletableFutures有以下几种方式。
创建一个已完成的CompletableFuture
首先介绍的第一种方式,创建了一个已完成的CompletableFuture。 创建这样的Future可能看起来很奇怪,但它在测试环境中非常有用。
CompletableFuture<String> cf =
CompletableFuture.completedFuture("I'm done!");
cf.isDone(); // return true
cf.join(); // return "I'm done"
从任务创建一个CompletableFuture
CompletableFuture可以构建在两种任务上:一个不带任何参数且没有返回值的Runnable,另一个是不带参数,返回一个对象的Supplier。 在这两种情况下,都可以传递Executor来设置执行任务的线程池。如下所示:
CompletableFuture<Void> cf1 =
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture<T> cf2 =
CompletableFuture.supplyAsync(Supplier<T> supplier);
如果未提供ExecutorService,则任务将在 ForkJoinPool.commonPool()
线程池
中执行,该池与Stream并行执行的线程池相同。
自定义线程池,如下所示:
Runnable runnable = () -> {
System.out.println("Executing in " +
Thread.currentThread().getName());
};
ExecutorService executor = Executors.newSingleThreadExecutor();
CompletableFuture<Void> cf =
CompletableFuture.runAsync(runnable, executor);
cf.thenRun(() -> System.out.println("I'm done"));
executor.shutdown();
执行上面的代码,输入如下:
Executing in pool-1-thread-1
I'm done
在这种情况下,Runnable在我们自定义的的SingleThreadExecutor线程池中执行。
构建CompletableFuture链
我们在本文的开始已经介绍过,CompletableFuture是链的一个元素。 上一节中看到了如何从任务(Runnable或Supplier)创建此链的第一个元素。 现在让我们看看如何将其他任务链接到这个任务。 事实上,根据前面的例子,我们已经猜到了该怎么做了。
链的任务
第一个任务是由Runnable或Supplier构建的,两个功能接口(你可以看成是functions),它们不带任何参数,但是可能会,也可能不会有返回值。
链的第二个元素和其他元素都可以获取前一个元素的结果(如果有的话)。 所以我们需要不同的functions来构建这些元素。 我们先尝试简单的理解一下。
链的前一个元素可能会,也可能不会有返回值。 所以链的functions的入参可以有一个对象,或者没有参数。 此链元素可能会有,可也能不会有返回值。 所以链的函数应该有一个返回值,或者没有返回值。 这有四种可能的情况。 在这四种可能的函数中,其中不带结果参数,并产生返回值的函数是链的起点,我们已在上一节中看到过。
CompletableFuture API中使用的四种可能的functions的名称
Takes a Parameters? | Returns Void | Returns R |
---|---|---|
Takes T | Consumer<T> | Function<T, R> |
Does not take anything | Runnable | Not an element of a chain |
链的类型
现在我们已经对API支持的任务有所了解,让我们来看看链的含义。 到目前为止,我们假设链是关于触发另一个任务的任务,将第一个的结果作为参数传递给第二个任务。 这是基本的一对一的链。
我们也可以组合元素而不是链接它们。 这仅适用于获取前一任务结果并包装成CompletableFuture对象提供给另一个任务。 这又是一对一的关系(不是链,因为这是组合)。
但我们也可以构建一个树状结构,其中由两个上游任务而不是一个上游任务触发的任务。 我们可以想象成两个提供组合结果,或者在第一个上游元素提供结果,并触发当前任务。 这两种情况都有意义,我们将会说到它们的例子。
选择一个执行器
最后,我们希望能够根据不同的情形来选择ExecutorService(即线程池)执行我们的任务。 这有很多种情况需要我们来判断:
- 我们的任务之一可能是更新图形用户界面。 在这种情况下,我们希望它在人机界面(HMI)线程中运行。 Swing,JavaFX和Android就属于这种情况。
- 我们有些I/O任务或计算任务需要在专门的线程池中执行。
- 我们的变量中可能存在可见性问题,需要在同一个线程中执行任务。
- 我们希望在默认的fork/join池中异步执行任务。
所有的这些情况下,我们必须增加ExecutorService参数用来定制执行器 。
Note: 调整线程池的大小
《Java并发编程实战》一书中,Brian Goetz和合著者们为线程池大小 的优化提供了不少中肯的建议。这非常重要,如果线程池中线程的数量过多,最终它们会竞争 稀缺的处理器和内存资源,浪费大量的时间在上下文切换上。反之,如果线程的数目过少,正 如你的应用所面临的情况,处理器的一些核可能就无法充分利用。Brian Goetz建议,线程池大 小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
1、NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProce- ssors()得到
2、UCPU是期望的CPU利用率(该值应该介于0和1之间)
3、W/C是等待时间与计算时间的比率
丰富的API实现
CompletableFuture类中有很多API方法! 三种类型的任务,四种类型的链接和组合,三种方式指定ExecutorService。36种链接任务的方法。大量可用的方法使这个类变得很复杂。
逐一的学习API方法将是非常繁琐的,所以让我们看看该如何正确的选择合适的API。
模式选择
以下是一些可用模式的描述。
一对一的模式
在这种情况下,从第一个CompletableFuture开始,当完成其任务执行时,我们创建的第二个CompletableFuture开始执行。如下所示:
CompletableFuture<String> cf1 =
CompletableFuture.supplyAsync(() -> "Hello world");
CompletableFuture<String> cf2 =
cf1.thenApply(s -> s + " from the Future!");
有三种 "then-apply"的方法。 它们都有一个Function的参数,T为上游元素的结果,并返回一个新的对象R。
我们再为流水线添加一个步骤。 这次,我们thenAccept()方法,参数为Consumer<String>,没有返回值(Void)。
CompletableFuture<Void> cf3 =
cf2.thenAccept(System.out::println);
让我们为这个流水线添加最后一步。 调用thenRun(),参数为Runnable(不带参数,并且没有返回值) .
CompletableFuture<Void> cf4 =
cf3.thenRun(() -> System.out.println("Done processing this chain"));
这些方法的命名都很清晰:以then开头,后面跟上函数接口的名称(run的参数是Runnable,accept参数为Consumer,apply参数为Function)。所有这些方法都在与上游任务具有相同的执行器(同一个线程池)。
然后,这些方法还可以进一步的采用相同的后缀:async。 异步方法在默认的fork/join池( ForkJoinPool.commonPool()
)中执行其任务,当然你也可以指定任务执行器Executor。
我们用异步的方式重写cf4,如下所示:
CompletableFuture<Void> cf4 =
cf3.thenRunAsync(() -> System.out.println("Done processing this chain"));
在这种情况下,Runnable任务将在默认的fork/join池中执行。
二对一的组合模式
组合模式是下一步任务接收两个上游任务的结果的模式。 在这种情况下可以使用两个函数:BiFunction和BiConsumer。 也可以在组合模式中执行Runnable。 如下所示:
Method | Description |
---|---|
<U,V> CompletableFuture<V> thenCombine(CompletionStage<U> other, BiFunction<T, U, R> action) |
当前和另一个给定的阶段都正常完成时,两个结果作为BiFunction函数的参数执行。 |
<U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<U> other, BiConsumer<T, U> action) |
当这个和另一个给定的阶段都正常完成时,两个结果作为提供的BiConsumer操作的参数被执行。 |
CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) |
当这个和另一个给定的阶段都正常完成时,执行给定的Runnable动作。 |
这些方法也可以采用async后缀,与上一节方法集具有相同的语义。
二对一的选择模式
最后一类模式还是二对一模式。 但是这次,不是完成两个上游元素后再执行下游元素,而是,并且当两个上游元素中其中一个完成时,即可执行下游元素。这非常有用, 例如,当我们想要解析域名时, 我们可能会发现查询一组域名服务器的效率比只查询一个域名服务器更高。 我们不想从不同的服务器获得相同的结果,因此我们不只要其中一个服务器返回结果即可,所有其他查询可以安全的取消。
该模式只需要在上游元素的一个结果,这些方法的名称中都有关键字,因为只会选择其中一个,所以组合元素应该产生相同类型的结果。
Method | Description |
---|---|
<U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn) |
当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的Function函数的参数。。 |
CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action) |
当这个或另一个给定阶段正常完成时,执行相应的结果作为提供的Consumer操作的参数。 |
CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) |
当这个或另一个给定阶段正常完成时,执行给定的Runnable动作。 |
这些方法也可以采用async后缀,与上一节方法集具有相同的语义。
示例
我们先看看几个例子。
在Jersey中测试一个耗时的请求
下面是 Jersey documentation中的一段代码.
@Path("/resource")
public class AsyncResource {
@Inject
private Executor executor;
@GET
public void asyncGet(@Suspended final AsyncResponse asyncResponse) {
executor.execute(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
}
}
这是一段基本的REST服务代码,它调用耗时的操作。 这种情况下,典型的处理方法是在另一个线程中的异步调用耗时操作。 此方法没有返回值; 上面的代码时Jersey的实现方式。
我们在这里又遇到了这么一个问题:我们如何对该方法进行单元测试? 测试 longOperation()
不是问题:我们可以单独对该方法进行单元测试。 我们需要在这里测试的是如何将result
对象正确地传递给asyncResponse
对象的 resume()
方法。 这可以通过测试框架轻松完成,例如Mockito。 但是我们又面临的问题如下:
- 在 "main"主线程中执行executor.execute() 。
- 但是asyncResponse.resume()是在另一个线程中异步调用的, 同时我们无法获取到结果.
在测试中我们需要的是在asyncResponse.resume() 后执行的某种回调,以便我们可以模拟测试。如下所示:
Mockito.verify(mockAsyncResponse).resume(result);
我们运行这段简单的代码:
- 调用resume()方法
- 假设和执行resume()是同一个线程; 那么,确信我们的模拟测试中不会出现任何并发问题(特别是可见性)
这时候,CompletionStage框架终于排上用场了!我们依据Runnable创建一个CompletionStage对象,而不是将Runnable传递给executor.execute()方法。
原来:
executor.submit(() -> {
String result = longOperation();
asyncResponse.resume(result);
});
使用CompletionStage重写:
CompletableFuture<Void> completableFuture =
CompletableFuture.runAsync(() -> {
String result = longOperation();
asyncResponse.resume(result);
}, executor);
因为CompletionStage可以触发其他任务,我们使用下面的代码进行测试:
completableFuture
.thenRun(() -> {
Mockito.verify(mockAsyncResponse).resume(result);
}
);
这段代码完全符合我们的要求:
- 它由前一个CompletionStage的Runnable完成触发。
- 它在同一个线程中执行。
要实现此该方案,我们需要在类中创建一个公共方法,该类返回CompletableFuture。 如果我们修改了Jersey方法的返回类型,那么Jersey将尝试使用此返回类型构建响应,将其转换为XML或JSON。 对于CompletableFuture,可能会导致运行失败。
因此,完整的测试模式如下:
- 在mocks中创建模拟对象:
String result = Mockito.mock(String.class);
AsyncResponse response = Mockito.mock(AsyncResponse.class);
Runnable train = () -> {
Mockito.doReturn(result).when(response).longOperation();
}
Runnable verify = () -> Mockito.verify(response).resume(result);
2、创建调用和验证对象:
Runnable callAndVerify = () -> {
asyncResource.executeAsync(response).thenRun(verify); }
3、最后创建要测试的任务:
ExecutorService executor = Executors.newSingleThreadExecutor();
AsyncResource asyncResource = new AsyncResource();
asyncResource.setExecutorService(executor);
CompletableFuture
.runAsync(train, executor)
.thenRun(callAndVerify);
因为这是一个单元测试,如果在给定的时间后没有看到响应,我们可能希望失败。 我们可以使用CompletableFuture中对于Future接口的get()方法来实现。
异步分析网页的链接
让我们编码实现如下需求:在Swing面板中显示自动的分析网页的链接(异步方式)
我们需要如下几个步骤:
1、读取网页内容
2、获取网页链接
3、Swing面板中显示链接
当然,修改Swing组件应该从合适的线程完成,但是,我们不希望在此线程中运行长任务。
使用CompletableFuture,很简单就能实现了:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(page -> linkParser.getLinks(page))
.thenAcceptAsync(
links -> displayPanel.display(links),
executor
);
第一步是创建异步执行的Supplier。 比如它以String形式返回网页内容。
第二步是将获取到的页面内容传递给linkParser. 这是一个返回List<String> 的function函数. 这前两个任务在同一个线程中执行.
最后一步只是获取链接列表并显示。 这个任务需要访问Swing组件,所以它应该在Swing线程中执行。 我们通过传递正确的executor作为参数来做到这一点。
有一点比较好:Executor接口是一个functional interface。 我们可以用lambda实现它:
Executor executor = runnable -> SwingUtilities.invokeLater(runnable);
我们可以利用方法引用语法来编写此模式的最终版本:
CompletableFuture.supplyAsync(
() -> readPage("http://whatever.com/")
)
.thenApply(Parser::getLinks)
.thenAcceptAsync(
DisplayPanel::display,
SwingUtilities::invokeLater
);
CompletableFutures结合lambdas和方法引用可以编写非常优雅的代码。
异常处理
CompletionStage API还提供了异常处理模式。 让我们看一个例子。
假设我们有如图所示的处理链:
所有这些CompletableFutures都使用我们在上面说到的模式链接在一起。
现在假设CF21引发异常。 如果没有对此异常做处理,则所有下游的CompletableFutures都会出错。 这意味着两件事:
- CF21, CF31, 和 CF41的CompletableFutures调用isCompletedExceptionally()都返回 true .
- 这些对象调用get()方法都会抛出ExecutionException, 原因是因为CF21引发的根异常.
我们可以使用下图所示的模式处理CompletableFutures链中的异常。
cf30 = cf21.exceptionally();
此模式创建的CompletableFuture具有以下属性:
- 如果CF21正常完成,则CF30将透明地返回与CF21相同的值。
- 如果CF21发生异常,则CF30能够捕获它,并且可以将正常值传输到CF31。
有好几种方法可以做到这一点,用不同方法的接受异常。
exceptionally(Function<Throwable, T> function)是最简单的方法调用. 它返回一个CompletionStage,如果上游CompletionStage也正常完成,则返回的CompletionStage也会以相同的值正常完成。 否则,如果此上游CompletionStage发生异常,则将此异常传递给提供的函数。返回的CompletionStage正常完成,返回Function的结果。 此方法没有异步版本。
handle(BiFunction<T, Throwable, R> bifunction) 具有相同的语义. 它返回一个CompletionStage,当此阶段正常或异常完成时,将使用此阶段的结果和异常作为所提供函数的参数执行。 如果上游CompletionStage正常完成,则Throwable为null调用BiFunction,如果异常完成,则R为null调用BiFunction。 在这两种情况下,都被能正常返回的CompletionStage。该方法有两个姐妹方法handleAsync(BiFunction<? super T,Throwable,? extends U> fn)和handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor), 这两种方法的工作方式相同,都是异步的,只是执行器不同。执行器可以作为参数提供。如果没有提供,则使用公共的fork/join 线程池。
第三种处理异常的方式是whenComplete(BiConsumer<T, Throwable> biconsumer)。handle() 可以正常结束并返回CompletionStage,而whenComplete()则不尽然。 它遵循构建的CompletionStage的流水线行为。 因此,如果上游CompletionStage发生异常,则whenComplete()返回的CompletionStage也会异常完成(结合exceptionally()理解)。使用上游CompletionStage的返回值及其此阶段返回值调用BiConsumer. 与handle()情况一样,将使用结果(或 null如果没有))和此阶段的异常(或 null如果没有)调用BiConsumer;BiConsumer没有返回值。 所以它只是一个不会影响CompletionStages流水线处理的回调。 与handle()方法类似,该方法也有两个姐妹方法whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)和whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)。这两种方法的工作方式都是异步的。执行器可以作为参数提供。如果没有提供,则使用公共的fork/join 线程池。.
结论
CompletionStage 接口和CompletableFuture类带来了异步处理数据的新方式。这个API非常复杂,主要是由于这个接口和类暴露的方法数量较多,但是,丰富的API使得我们处理异步数据流水线方面有了更多的选择,以便更好的满足应用程序的需求。
这些API基于lambda表达式构建,从而创造非常干净且优雅的代码。 它可以很好地控制哪个线程执行每个任务。 它还允许以多种方式构建流水线和组合任务,并且在处理异常方面也提供对应的方式方法。