JAVA语言系列:组合式异步编程


1. 导论

同步API和异步API:同步/异步关注的是消息通知的机制。

  • 同步:调用了某个方法,调用方在被调用方运行的过程中会等待,被调用方运行结束返回,调用方取得被调用方的返回值并继续运行。
  • 异步:过程调用发出后,调用者在没有得到结果之前,就可以继续执行后续操作。返回的方式:
  1. 轮询:即监听被调用者的状态,调用者需要每隔一定时间检查一次,效率会很低。
  2. 回调:当被调用者执行完成后,会调用调用者提供的回调函数。

阻塞和非阻塞:阻塞和非阻塞关注的是程序在等待调用结果(消息,返回值)时的状态、

  • 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
  • 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。

并行和并发

  • 并行是多个cpu,一个CPU执行一个线程时,另一个CPU可以执行另一个线程,两个线程互不抢占CPU资源。
  • 并发是个程序在同一CPU上运行,同一时刻只能有一个程序在CPU上运行。

异步和多线程:异步是目的,而多线程是实现异步的一个手段。多线程需要考虑线程上下文切换带来的负担,并需要考虑死锁的可能。


2. Future 接口:

目的:实现异步计算:把调用线程从潜在耗时的操作中解放出来,让它能继续执行其他工作,不再需要等待耗时操作完成。
原理:返回一个执行运算结果的引用,当运算结束后,这个引用被返回给调用方。
使用

  • 只需要将耗时的操作封装在一个Callable对象中,再将它提交给ExecutorService,让线程可以在ExecutorService以并发方式调用另一个线程执行耗时操作的同时,去执行一些其他的任务。
  • 如果你已经运行到没有异步操作的结果就无法继续任何有意义的工作时,可以调用它的get方法去获取操作的结果。如果操作已经完成,该方法会立刻返回操作的结果,否则它会阻塞你的线程,直到操作完成。
  ExecutorService executor = Executors.newCachedThreadPool();
  Future<Double> future = executor.submit(new Callable<Double>() {
        public Double call() {
        return doSomeLongComputation();
  }});
  doSomethingElse();
  try { 
    // 获取异步操作的结果,如果最终被阻塞,无法得到结果,那么在最多等待1秒钟之后退出
      Double result = future.get(1, TimeUnit.SECONDS);
  } catch (ExecutionException ee) {
    // 计算抛出一个异常
  } catch (InterruptedException ie) {
    // 当前线程在等待过程中被中断
  } catch (TimeoutException te) {
    // 在Future对象完成之前超过已过期
  }

局限性:很难表述Future结果之间的依赖性,如:

  • 将两个异步计算合并为一个——这两个异步计算之间相互独立,同时第二个又依赖于第一个的结果。
  • 等待Future集合中的所有任务都完成。
  • 仅等待Future集合中最快结束的任务完成。
  • 通过编程方式完成一个Future任务的执行(即以手工设定异步操作结果的方式,手工结束)。
  • 应对Future的完成事件(即当Future的完成事件发生时会收到通知,并能使用Future计算的结果进行下一步的操作,不只是简单地阻塞等待操作的结果)。

3. CompletableFuture:Java 8 提供的Future 实现

使用CompletableFuture实现异步方法: 将一个耗时的产品价格查询异步化

public Future<Double> getPriceAsync(String product) {
  // 创建CompletableFuture对象,它会包含计算的结果
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread( () -> {
    // 假设calculatePrice是一个耗时任务
    double price = calculatePrice(product);
    // 需长时间计算的任务结束并得出结果时,设置Future的返回值
    futurePrice.complete(price);
  }).start();
  return futurePrice;
}

上述代码问题:异常被限制,如果异步执行计算过程中产生了错误,用于提示错误的异常会被限制在试图计算商品价格的当前线程的范围内,最终会杀死该线程,而这会导致等待get方法返回结果的客户端永久地被阻塞。

解决

  • 使用重载版本的get方法,它使用一个超时参数来避免发生这样的情况。
  • 使用CompletableFuture的completeExceptionally方法将导致CompletableFuture内发生问题的异常抛出。
public Future<Double> getPriceAsync(String product) {
  CompletableFuture<Double> futurePrice = new CompletableFuture<>();
  new Thread( () -> {
  try {
    double price = calculatePrice(product);
    futurePrice.complete(price);
  } catch (Exception ex) {
  futurePrice.completeExceptionally(ex);
  }
  }).start();
  return futurePrice;
}

更加简洁地使用:使用工厂方法supplyAsync创建CompletableFuture

public Future<Double> getPriceAsync(String product) {
  return CompletableFuture.supplyAsync(() -> calculatePrice(product));
}
  • supplyAsync方法接受一个生产者(Supplier)作为参数,返回一个CompletableFuture对象,该对象完成异步执行后会读取调用生产者方法的返回值。
  • 生产者方法会交由ForkJoinPool池中的某个执行线程(Executor)运行,但是你也可以使用supplyAsync方法的重载版本,传递第二个参数指定不同的执行线程执行生产者方法。
  • 自动提供了错误管理机制。

4. 多个异步任务

需求:来自4个商店的指定产品的异步查询,返回4个商店的产品价格List<String> findPrices(String product); CPU为4个线程

实现一:使用顺序流,查询耗时大概为4*delay

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> String.format("%s price is %.2f", shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

实现二:使用并行流,查询耗时为 delay

public List<String> findPrices(String product) {
  return shops.parallelStream()
    .map(shop -> String.format("%s price is %.2f",shop.getName(), shop.getPrice(product)))
    .collect(toList());
}

实现三:使用CompletableFuture。查询耗时为 2* delay。join方法类似于get,但不会抛出检测到的异常。注意这里这里使用了两个不同的Stream流水线,而不是在同一个处理流的流水线上一个接一个地放置两个map操作——这其实是有缘由的。考虑流操作之间的延迟特性,如果你在单一流水线中处理流,发向不同商家的请求只能以同步、顺序执行的方式才会成功。因此,每个创建CompletableFuture对象只能在前一个操作结束之后执行查询指定商家的动作、通知join方法返回计算结果。先将CompletableFutures对象聚集到一个列表中,让对象们可以在等待其他对象完成操作之前就能启动。

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures =
    shops.stream()
    .map(shop -> CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product)))
    .collect(Collectors.toList());
  return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

总结

  • 顺序流随着商店的增加,耗时线性增长。
  • 并行流能够并行执行4个任务,充分利用了cpu,但是再增加一个任务时,由于cpu全部被占用,最后一个任务只能等到前面某一个操作完成释放出空闲线程才能继续,因此需要 2 *delay。Stream底层依赖的是线程数量固定的通用线程池,因此在数目增多时扩展性不好。
  • CompletableFuture有一个线程用于主线程,三个用于异步执行,因此需要2 *delay
  • 如果是9个商店,则并行流耗时3 * delay,CompletableFuture也是耗时 3*delay
  • 并行流和CompletableFuture采用的是同样的通用线程池,默认都使用固定数目的线程,具体线程数取决于Runtime.getRuntime().availableProcessors()的返回值。
  • CompletableFuture具有一定的优势,允许你对执行器(Executor)进行配置,尤其是线程池的大小。

改进CompletableFuture:使用定制的执行器。

《JAVA并发编程实战》建议:
线程池大小与处理器的利用率之比可以使用下面的公式进行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
❑NCPU是处理器的核的数目,可以通过Runtime.getRuntime().availableProcessors()得到
❑UCPU是期望的CPU利用率(该值应该介于0和1之间)
❑W/C是等待时间与计算时间的比率

private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),
  new ThreadFactory() {
    public Thread newThread(Runnable r) {
      Thread t = new Thread(r);
      t.setDaemon(true);//使用守护线程——这种方式不会阻止程序的关停
      return t;
  }
});
// 传递执行器
CompletableFuture.supplyAsync(() -> shop.getName() + " price is " + shop.getPrice(product), executor);

并行流还是CompletableFuture

  • CompletableFuture提供了更多的灵活性,你可以调整线程池的大小,而这能帮助你确保整体的计算不会因为线程都在等待I/O而发生阻塞。
  • 如果进行的是计算密集型的操作,并且没有I/O,那么推荐使用Stream接口,因为实现简单,同时效率也可能是最高的(如果所有的线程都是计算密集型的,那就没有必要创建比处理器核数更多的线程)。
  • 如果并行的工作单元还涉及等待I/O的操作(包括网络连接等待),那么使用CompletableFuture灵活性更好,可以依据等待/计算,或者W/C的比率设定需要使用的线程数。另一个原因是:处理流的流水线中如果发生I/O等待,流的延迟特性会让我们很难判断到底什么时候触发了等待。

5. 多个异步任务的流水线操作

两个相关的异步任务的组合

需求:假设需要在获取到产品价格后还需要进一步获取折扣价格,这是两个异步的远程任务,如何组合?

实现一:流水线方式,耗时随着商店增多线性增长。

public List<String> findPrices(String product) {
  return shops.stream()
    .map(shop -> shop.getPrice(product))  //延迟1秒
    .map(Quote::parse)
    .map(Discount::applyDiscount)  //延迟1秒
    .collect(toList());
}

实现二:同步组合和异步组合

public List<String> findPrices(String product) {
  List<CompletableFuture<String>> priceFutures =
  shops.stream()
  .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPrice(product), executor))
  .map(future -> future.thenApply(Quote::parse))
  .map(future -> future.thenCompose(quote ->CompletableFuture.supplyAsync(() ->  Discount.applyDiscount(quote), executor)))
  .collect(toList());
  return priceFutures.stream().map(CompletableFuture::join).collect(toList());
}

thenApply方法:由于这里不涉及IO操作,因此采用同步执行。thenApply方法用于对第一步中CompletableFuture连接同步方法。这意味着CompletableFuture最终结束运行时,希望传递Lambda表达式给thenApply方法,将Stream中的每个CompletableFuture<String>对象转换为对应的CompletableFuture<Quote>对象。
thenCompose方法:这里涉及到远程操作,因此希望能够异步执行。thenCompose方法允许对两个异步操作进行流水线,第一个操作完成时,将其结果作为参数传递给第二个操作。意味着可以创建两个CompletableFutures对象,对第一个CompletableFuture 对象调用thenCompose , 并向其传递一个函数。当第一个CompletableFuture执行完毕后,它的结果将作为该函数的参数,这个函数的返回值是以第一个CompletableFuture的返回做输入计算出的第二个CompletableFuture对象。
thenComposeAsync方法:以Async结尾的方法会将后续的任务提交到一个线程池,所以每个任务是由不同的线程处理的。不带Async的方法和它的前一个任务一样,在同一个线程中运行。

两个不相关的异步任务的组合

需求:不希望等到第一个任务完全结束才开始第二项任务。
实现:合并两个独立的CompletableFuture对象

Future<Double> futurePriceInUSD =
  CompletableFuture.supplyAsync(() -> shop.getPrice(product)).thenCombine(
    CompletableFuture.supplyAsync(() -> exchangeService.getRate(Money.EUR, Money.USD)),
    (price, rate) -> price * rate
);

thenCombine方法:接收名为BiFunction的第二参数,这个参数定义了当两个CompletableFuture对象完成计算后,结果如何合并。
thenCombineAsync会导致BiFunction中定义的合并操作被提交到线程池中,由另一个任务以异步的方式执行。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,686评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,668评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,160评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,736评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,847评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,043评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,129评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,872评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,318评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,645评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,777评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,861评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,589评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,687评论 2 351

推荐阅读更多精彩内容