1. 导论
同步API和异步API:同步/异步关注的是消息通知的机制。
- 同步:调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。
- 异步:过程调用发出后,调用者在没有得到结果之前,就可以继续执行后续操作。返回的方式:
- 轮询:即监听被调用者的状态,调用者需要每隔一定时间检查一次,效率会很低。
- 回调:当被调用者执行完成后,会调用调用者提供的回调函数。
阻塞和非阻塞:阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态、
- 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
- 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
并行和并发:
- 并行是多个cpu,一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源。
- 并发是个程序在同一CPU上运行,同一时刻只能有一个程序在CPU上运行。
异步和多线程:异步是目的,而多线程是实现异步的一个手段。多线程需要考虑线程上下文切换带来的负担,并需要考虑死锁的可能。
2. Future 接口:
目的:实现异步计算:把调用线程从潜在耗时的操作中解放出来,让它能继续执行其他工作,不再需要等待耗时操作完成。
原理:返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。
使用:
- 只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,让线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。
- 如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成。
ExecutorService executor = Executors.newCachedThreadPool();
Future<Double> future = executor.submit(new Callable<Double>() {
public Double call() {
return doSomeLongComputation();
}});
doSomethingElse();
try {
// 获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
Double result = future.get(1, TimeUnit.SECONDS);
} catch (ExecutionException ee) {
// 计算抛出一个异常
} catch (InterruptedException ie) {
// 当前线程在等待过程中被中断
} catch (TimeoutException te) {
// 在Future对象完成之前超过已过期
}
局限性:很难表述Future结果之间的依赖性,如:
- 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
- 等待Future集合中的所有任务都完成。
- 仅等待Future集合中最快结束的任务完成。
- 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式,手工结束)。
- 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。
3. CompletableFuture:Java 8 提供的Future 实现
使用CompletableFuture实现异步方法: 将一个耗时的产品价格查询异步化
public Future<Double> getPriceAsync(String product) {
// 创建CompletableFuture对象,它会包含计算的结果
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
// 假设calculatePrice是一个耗时任务
double price = calculatePrice(product);
// 需长时间计算的任务结束并得出结果时,设置Future的返回值
futurePrice.complete(price);
}).start();
return futurePrice;
}
上述代码问题:异常被限制,如果异步执行计算过程中产生了错误,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。
解决:
- 使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。
- 使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
public Future<Double> getPriceAsync(String product) {
CompletableFuture<Double> futurePrice = new CompletableFuture<>();
new Thread( () -> {
try {
double price = calculatePrice(product);
futurePrice.complete(price);
} catch (Exception ex) {
futurePrice.completeExceptionally(ex);
}
}).start();
return futurePrice;
}
更加简洁地使用:使用工厂方法supplyAsync创建CompletableFuture
public Future<Double> getPriceAsync(String product) {
return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
- supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。
- 生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。
- 自动提供了错误管理机制。
4. 多个异步任务
需求:来自4个商店的指定产品的异步查询,返回4个商店的产品价格List<String> findPrices(String product); CPU为4个线程
实现一:使用顺序流,查询耗时大概为4*delay
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
.collect(toList());
}
实现二:使用并行流,查询耗时为 delay
public List<String> findPrices(String product) {
return shops.parallelStream()
.map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
.collect(toList());
}
实现三:使用CompletableFuture。查询耗时为 2* delay。join方法类似于get,但不会抛出检测到的异常。注意:这里这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。先将CompletableFutures对象聚集到一个列表中,让对象们可以在等待其他对象完成操作之前就能启动。
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
.collect(Collectors.toList());
return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}
总结:
- 顺序流随着商店的增加,耗时线性增长。
- 并行流能够并行执行4个任务,充分利用了cpu,但是再增加一个任务时,由于cpu全部被占用,最后一个任务只能等到前面某一个操作完成释放出空闲线程才能继续,因此需要 2 *delay。Stream底层依赖的是线程数量固定的通用线程池,因此在数目增多时扩展性不好。
- CompletableFuture有一个线程用于主线程,三个用于异步执行,因此需要2 *delay
- 如果是9个商店,则并行流耗时3 * delay,CompletableFuture也是耗时 3*delay
- 并行流和CompletableFuture采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。
- CompletableFuture具有一定的优势,允许你对执行器(Executor)进行配置,尤其是线程池的大小。
改进CompletableFuture:使用定制的执行器。
《JAVA并发编程实战》建议:
线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
❑NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
❑UCPU是期望的CPU利用率(该值应该介于0和1之间)
❑W/C是等待时间与计算时间的比率
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
new ThreadFactory() {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);//使用守护线程——这种方式不会阻止程序的关停
return t;
}
});
// 传递执行器
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);
并行流还是CompletableFuture:
- CompletableFuture提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
- 如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
- 如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以依据等待/计算,或者W/C的比率设定需要使用的线程数。另一个原因是:处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。
5. 多个异步任务的流水线操作
两个相关的异步任务的组合
需求:假设需要在获取到产品价格后还需要进一步获取折扣价格,这是两个异步的远程任务,如何组合?
实现一:流水线方式,耗时随着商店增多线性增长。
public List<String> findPrices(String product) {
return shops.stream()
.map(shop -> shop.getPrice(product)) //延迟1秒
.map(Quote::parse)
.map(Discount::applyDiscount) //延迟1秒
.collect(toList());
}
实现二:同步组合和异步组合
public List<String> findPrices(String product) {
List<CompletableFuture<String>> priceFutures =
shops.stream()
.map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
.map(future -> future.thenApply(Quote::parse))
.map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote), executor)))
.collect(toList());
return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}
thenApply方法:由于这里不涉及IO操作,因此采用同步执行。thenApply方法用于对第一步中CompletableFuture连接同步方法。这意味着CompletableFuture最终结束运行时,希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture<String>对象转换为对应的CompletableFuture<Quote>对象。
thenCompose方法:这里涉及到远程操作,因此希望能够异步执行。thenCompose方法允许对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。意味着可以创建两个CompletableFutures对象,对第一个CompletableFuture 对象调用thenCompose , 并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。
thenComposeAsync方法:以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。不带Async的方法和它的前一个任务一样,在同一个线程中运行。
两个不相关的异步任务的组合
需求:不希望等到第一个任务完全结束才开始第二项任务。
实现:合并两个独立的CompletableFuture对象
Future<Double> futurePriceInUSD =
CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(
CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
(price, rate) -> price * rate
);
thenCombine方法:接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。
thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。