背景
如果你想要在同一个CPU上执行几个松耦合的任务,同时防止因某个任务等待过长而阻塞线程的执行,那么你需要做的是充分利用CPU的核,让其足够忙碌,最大化程序的吞吐量从而实现并发。
并行与并发的区别:
并行:在同一个核上同时执行多个任务,任务不互相阻塞
并发:多个任务分发给多个核去执行
在java5中,已经引入了Future接口方便开发人员进行异步编程。由于其使用繁琐,代码复杂,不足以让我们编写简介并发代码,因此java8引入了CompletableFuture接口。
使用CompletableFuture构建异步应用
查询商品价格的例子,假设获取价格是一个远程服务,我们使用sleep 1秒来模拟此行为。
public class Shop {
private String name;
public Double getPrice(String product){
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
}
}
List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
new Shop("LetsSaveBig"),
new Shop("MyFavoriteShop"),
new Shop("MyFavoriteShop2"),
new Shop("MyFavoriteShop3"),
new Shop("MyFavoriteShop4"),
new Shop("MyFavoriteShop5"),
new Shop("MyFavoriteShop6"),
new Shop("MyFavoriteShop7"),
new Shop("BuyItAll"));
-
情景一
试想此场景,我们需要根据某个商品名称去查询商品的价格,发货地等等一系列操作,我们可能会写出如下伪代码:
操作A
shop.getPrice(product)
操作B
操作C
...
这些操作直接没有什么关联性,上述代码中靠后的操作需要等待前面的操作执行完之后才能执行,造成了阻塞。
那么我们其实可以使用CompletableFuture来实现异步执行,下面的代码中每个操作都不需要等待前面的操作便能执行。
操作A
CompletableFuture<Double> completableFuture
= CompletableFuture.supplyAsync(() -> shop.getPrice(product));
操作B
操作C
...
Double d = completableFuture.get();
-
情景二
如果给定一个product和一个List<Shop>,想要获取所有shop中对此product的定价,该如何实现?
我们已经知道流的使用,按照常规思路,写出下列代码应该不难
List<Double> list = shops.stream() .map( (Shop s) -> s.getPrice(product) ) .collect(Collectors.toList());
但是我们可不可以把map中获取价格的代码实现异步执行呢?答案当然是可以的。
其中一种操作是将流转为并行流,这里我们使用另一种方式:
List<CompletableFuture<Double>> list = shops.stream() .map( (Shop s) -> CompletableFuture.supplyAsync( () -> s.getPrice(product) ) ) .collect(Collectors.toList()); List<Double> list2 =list.stream() .map(CompletableFuture::join).collect(Collectors.toList());
-
情景三
如果你试过情景三种的实现方式后,你会发现其执行速度并没有多少提升。那么有没有方法能够让他更快点呢?我们可以通过调整线程池的大小,确保整体的计算不会因为线程都在等待I/O而发生阻塞。
List中有10个shop,我们可以调整线程池大小为10个。
final Executor executor = Executors.newFixedThreadPool(11, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); // CompletableFuture.supplyAsync()方法可以设置第二个参数 CompletableFuture.supplyAsync( () -> s.getPrice(product),executor )
-
情景四
对两个异步操作进行流水线,第一个操作完成时,将其 结果作为参数传递给第二个操作。使用thenCompose连接。
List<CompletableFuture<String>> list = shops.stream() .map( (Shop s) -> CompletableFuture.supplyAsync( () -> s.getPrice(product), executor ) ) .map(c -> c.thenCompose( (Double d) -> CompletableFuture.supplyAsync( () ->d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10))) )) .collect(Collectors.toList());
-
情景五
将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务 。使用thenCombine连接
List<CompletableFuture<String>> list = shops.stream() .map(s -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor) .thenCombine( CompletableFuture.supplyAsync( () -> new Random().nextInt(10) ), (d, c) -> d + "-----" + c )) .collect(Collectors.toList());
-
-
情景六
响应CompletableFuture的completion事件
一旦CompletableFuture计算得到结果,就得到一个相应。那么可以使用thenAccept
Stream<CompletableFuture<String>> list = shops.stream() .map((Shop s) -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor)) .map(c -> c.thenCompose( (Double d) -> CompletableFuture.supplyAsync(() -> d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)),executor) )); list.map(c->c.thenAccept(System.out::println));