组合式异步编程

背景

如果你想要在同一个CPU上执行几个松耦合的任务,同时防止因某个任务等待过长而阻塞线程的执行,那么你需要做的是充分利用CPU的核,让其足够忙碌,最大化程序的吞吐量从而实现并发。

并行与并发的区别:

并行:在同一个核上同时执行多个任务,任务不互相阻塞

并发:多个任务分发给多个核去执行

在java5中,已经引入了Future接口方便开发人员进行异步编程。由于其使用繁琐,代码复杂,不足以让我们编写简介并发代码,因此java8引入了CompletableFuture接口。

使用CompletableFuture构建异步应用

查询商品价格的例子,假设获取价格是一个远程服务,我们使用sleep 1秒来模拟此行为。

public class Shop {
    private String name;
    public Double getPrice(String product){
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("MyFavoriteShop2"),
                new Shop("MyFavoriteShop3"),
                new Shop("MyFavoriteShop4"),
                new Shop("MyFavoriteShop5"),
                new Shop("MyFavoriteShop6"),
                new Shop("MyFavoriteShop7"),
                new Shop("BuyItAll"));
  • 情景一

试想此场景,我们需要根据某个商品名称去查询商品的价格,发货地等等一系列操作,我们可能会写出如下伪代码:

操作A
shop.getPrice(product)
操作B
操作C
...

这些操作直接没有什么关联性,上述代码中靠后的操作需要等待前面的操作执行完之后才能执行,造成了阻塞。

那么我们其实可以使用CompletableFuture来实现异步执行,下面的代码中每个操作都不需要等待前面的操作便能执行。

操作A
CompletableFuture<Double> completableFuture 
                            = CompletableFuture.supplyAsync(() -> shop.getPrice(product));
操作B
操作C
...
Double d = completableFuture.get();
  • 情景二

    如果给定一个product和一个List<Shop>,想要获取所有shop中对此product的定价,该如何实现?

    我们已经知道流的使用,按照常规思路,写出下列代码应该不难

    List<Double> list = 
          shops.stream()
          .map(
              (Shop s) -> s.getPrice(product)
          )
          .collect(Collectors.toList());
    

    但是我们可不可以把map中获取价格的代码实现异步执行呢?答案当然是可以的。

    其中一种操作是将流转为并行流,这里我们使用另一种方式:

    List<CompletableFuture<Double>> list 
              = shops.stream()
                              .map(
                              (Shop s) -> CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product)
                                                  )
                              )
                              .collect(Collectors.toList());
    List<Double> list2    
          =list.stream()
                      .map(CompletableFuture::join).collect(Collectors.toList());
    
  • 情景三

    如果你试过情景三种的实现方式后,你会发现其执行速度并没有多少提升。那么有没有方法能够让他更快点呢?我们可以通过调整线程池的大小,确保整体的计算不会因为线程都在等待I/O而发生阻塞。

    List中有10个shop,我们可以调整线程池大小为10个。

    final Executor executor =
                    Executors.newFixedThreadPool(11,
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    Thread t = new Thread(r);
                                    t.setDaemon(true);
                                    return t;
                                }
                            });
    // CompletableFuture.supplyAsync()方法可以设置第二个参数            
    CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product),executor
                                                  )             
    
    • 情景四

      对两个异步操作进行流水线,第一个操作完成时,将其 结果作为参数传递给第二个操作。使用thenCompose连接。

      List<CompletableFuture<String>> list 
              = shops.stream()
              .map(
                          (Shop s) -> CompletableFuture.supplyAsync(
                                                                          () -> s.getPrice(product), executor
                                                                          )
              )
              .map(c -> c.thenCompose(
                      (Double d) -> CompletableFuture.supplyAsync(
                                                      () ->d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)))
              ))
              .collect(Collectors.toList());
      
    • 情景五

      将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务 。使用thenCombine连接

      List<CompletableFuture<String>> list = shops.stream()
              .map(s ->
                      CompletableFuture.supplyAsync(() -> s.getPrice(product), executor)
                              .thenCombine(
                                      CompletableFuture.supplyAsync(
                                              () -> new Random().nextInt(10)
                                      ), (d, c) -> d + "-----" + c
      
                              ))
              .collect(Collectors.toList());
      
  • 情景六

    响应CompletableFuture的completion事件

    一旦CompletableFuture计算得到结果,就得到一个相应。那么可以使用thenAccept

    Stream<CompletableFuture<String>> list = shops.stream()
                    .map((Shop s) -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor))
                    .map(c -> c.thenCompose(
                            (Double d) -> CompletableFuture.supplyAsync(() -> d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)),executor)
                    ));
    
            list.map(c->c.thenAccept(System.out::println));
    
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容