组合式异步编程

背景

如果你想要在同一个CPU上执行几个松耦合的任务,同时防止因某个任务等待过长而阻塞线程的执行,那么你需要做的是充分利用CPU的核,让其足够忙碌,最大化程序的吞吐量从而实现并发。

并行与并发的区别:

并行:在同一个核上同时执行多个任务,任务不互相阻塞

并发:多个任务分发给多个核去执行

在java5中,已经引入了Future接口方便开发人员进行异步编程。由于其使用繁琐,代码复杂,不足以让我们编写简介并发代码,因此java8引入了CompletableFuture接口。

使用CompletableFuture构建异步应用

查询商品价格的例子,假设获取价格是一个远程服务,我们使用sleep 1秒来模拟此行为。

public class Shop {
    private String name;
    public Double getPrice(String product){
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return new Random().nextDouble() * product.charAt(0) + product.charAt(1);
    }
}

List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                new Shop("LetsSaveBig"),
                new Shop("MyFavoriteShop"),
                new Shop("MyFavoriteShop2"),
                new Shop("MyFavoriteShop3"),
                new Shop("MyFavoriteShop4"),
                new Shop("MyFavoriteShop5"),
                new Shop("MyFavoriteShop6"),
                new Shop("MyFavoriteShop7"),
                new Shop("BuyItAll"));
  • 情景一

试想此场景,我们需要根据某个商品名称去查询商品的价格,发货地等等一系列操作,我们可能会写出如下伪代码:

操作A
shop.getPrice(product)
操作B
操作C
...

这些操作直接没有什么关联性,上述代码中靠后的操作需要等待前面的操作执行完之后才能执行,造成了阻塞。

那么我们其实可以使用CompletableFuture来实现异步执行,下面的代码中每个操作都不需要等待前面的操作便能执行。

操作A
CompletableFuture<Double> completableFuture 
                            = CompletableFuture.supplyAsync(() -> shop.getPrice(product));
操作B
操作C
...
Double d = completableFuture.get();
  • 情景二

    如果给定一个product和一个List<Shop>,想要获取所有shop中对此product的定价,该如何实现?

    我们已经知道流的使用,按照常规思路,写出下列代码应该不难

    List<Double> list = 
          shops.stream()
          .map(
              (Shop s) -> s.getPrice(product)
          )
          .collect(Collectors.toList());
    

    但是我们可不可以把map中获取价格的代码实现异步执行呢?答案当然是可以的。

    其中一种操作是将流转为并行流,这里我们使用另一种方式:

    List<CompletableFuture<Double>> list 
              = shops.stream()
                              .map(
                              (Shop s) -> CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product)
                                                  )
                              )
                              .collect(Collectors.toList());
    List<Double> list2    
          =list.stream()
                      .map(CompletableFuture::join).collect(Collectors.toList());
    
  • 情景三

    如果你试过情景三种的实现方式后,你会发现其执行速度并没有多少提升。那么有没有方法能够让他更快点呢?我们可以通过调整线程池的大小,确保整体的计算不会因为线程都在等待I/O而发生阻塞。

    List中有10个shop,我们可以调整线程池大小为10个。

    final Executor executor =
                    Executors.newFixedThreadPool(11,
                            new ThreadFactory() {
                                @Override
                                public Thread newThread(Runnable r) {
                                    Thread t = new Thread(r);
                                    t.setDaemon(true);
                                    return t;
                                }
                            });
    // CompletableFuture.supplyAsync()方法可以设置第二个参数            
    CompletableFuture.supplyAsync(
                                                  () -> s.getPrice(product),executor
                                                  )             
    
    • 情景四

      对两个异步操作进行流水线,第一个操作完成时,将其 结果作为参数传递给第二个操作。使用thenCompose连接。

      List<CompletableFuture<String>> list 
              = shops.stream()
              .map(
                          (Shop s) -> CompletableFuture.supplyAsync(
                                                                          () -> s.getPrice(product), executor
                                                                          )
              )
              .map(c -> c.thenCompose(
                      (Double d) -> CompletableFuture.supplyAsync(
                                                      () ->d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)))
              ))
              .collect(Collectors.toList());
      
    • 情景五

      将两个完全不相干的CompletableFuture对象的结果整合起来,而且你也不希望等到第一个任务完全结束才开始第二项任务 。使用thenCombine连接

      List<CompletableFuture<String>> list = shops.stream()
              .map(s ->
                      CompletableFuture.supplyAsync(() -> s.getPrice(product), executor)
                              .thenCombine(
                                      CompletableFuture.supplyAsync(
                                              () -> new Random().nextInt(10)
                                      ), (d, c) -> d + "-----" + c
      
                              ))
              .collect(Collectors.toList());
      
  • 情景六

    响应CompletableFuture的completion事件

    一旦CompletableFuture计算得到结果,就得到一个相应。那么可以使用thenAccept

    Stream<CompletableFuture<String>> list = shops.stream()
                    .map((Shop s) -> CompletableFuture.supplyAsync(() -> s.getPrice(product), executor))
                    .map(c -> c.thenCompose(
                            (Double d) -> CompletableFuture.supplyAsync(() -> d.doubleValue() + "---" + (d.doubleValue() - new Random().nextInt(10)),executor)
                    ));
    
            list.map(c->c.thenAccept(System.out::println));
    
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,951评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,606评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,601评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,478评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,565评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,587评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,590评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,337评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,785评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,096评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,273评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,935评论 5 339
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,578评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,199评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,440评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,163评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,133评论 2 352

推荐阅读更多精彩内容