第四十八条:谨慎使用Stream并行流【Lambda和Stream end】

在主流的编程语言中,Java一直走在简化并发编程任务的最前沿。1996年Java发布时,就通过同步和wait/notify内置了对线程的支持。Java5引入了java.util.concurrent类库,提供了并行集合(concurrent collection)和执行者框架(executor framework)。Java7引入fork-join包,这是一个处理并行分解的高性能框架。Java8引入Stream,只需要调用一次parallel方法就可以实现并行处理。在Java中编写并发程序变得越来越容易,但是要编写出正确又快速的并发程序,则一向没那么简单。安全性和活性失败是并发编程中需要面对的问题,Stream pipeline并行也不例外。

请看摘自第45条的这段程序:

// Stream-based program to generate the first 20 Mersenne primes
public static void main(String[] args) {
  primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE)) 
    .filter(mersenne -> mersenne.isProbablePrime(50))
    .limit(20)
    .forEach(System.out::println);
}
static Stream<BigInteger> primes() {
  return Stream.iterate(TWO, BigInteger::nextProbablePrime); 
}

在我的机器上,这段程序会立即开始打印素数,完成运行花了12.5秒。假设我天真的想通过在Stream pipeline上添加一个parallel()调用来提速。你认为这样会对其性能产生什么样的影响?运行速度会稍微块一点吗?还是会慢一点?遗憾的是,其结果是根本不打印任何内容,CPU的使用率却定在90%一动不动了(活性失败)。程序最后可能会终止,但是我们不想一探究竟,半个小时后就强行把它终止了。

这是怎么回事呢?简单的说,Stream类库不知道如何并行这个pipeline,以及如何探索失败。即便在最佳环境下,如果源头是来自Stream.iterate,或者使用了中间操作的limit,那么并行pipeline也不可能提升性能。这个pipeline必须同时满足这两个条件。更糟糕的是,默认的并行策略在处理limit的不可预知性时,是假设额外多处理几个元素,并放弃任何不需要的结果,这些都不会影响性能。在这种情况下,他查找每个梅森素数时,所花费的时间大概是查找之前元素的两倍。因而,额外多计算一个元素的成本,大概相当于计算所有之前元素总和的时间,这个貌似无伤大雅的pipeline,却使得自动并行算法濒临崩溃。这个故事的寓意很简单:千万不要任意的并行Stream pipeline。它造成的性能后果有可能是灾难性的。

总之,在Stream上通过并行获得性能,最好是通过ArrayList、HashMap、HashSet和ConcurrentHashMap实例,数组,int范围和long范围等。这些数据结构的共性是,都可以被精确、轻松的分成任意大小的子范围,使并行线程的分工变得更加轻松。Stream类库用来执行这个任务的抽象是分割迭代器,它是由Stream和Iterable中的spliterator方法返回的。

这些数据结构共有的另一项重要特性是,在进行顺序处理时,它们提供了优异的引用局部性:序列化的元素引用一起保存在内存中。被那些引用访问到的对象在内存中可能不是一个紧挨着一个,这降低了引用的局部性。事实证明,引用局部性对于并发批处理来说至关重要:没有它,线程就会出现闲置,需要等待数据从内存转移到处理器的缓存。具有最佳引用局部性的数据结构是基本类型数组,因为数据本身是相邻的保存在内存中的。

Stream pipeline的终止操作本质上也影响了并发执行的效率。如果大量的工作在终止操作中完成,而不是全部工作在pipeline中完成,并且这个操作是固有的顺序,那么并行pipeline的效率就会受到限制。并行的最佳终止操作是做减法(reduction),用一个Stream的reduce方法,将所有从pipeline产生的元素都合并在一起,或者预先打包像min、max、count和sum这类方法。短路操作anyMatch、allMatch和noneMatch也都可以并行。由Stream的collect方法执行的操作,都是可变的减法,不是并行的最好选择,因为合并集合的成本非常高。

如果是自己编写Stream、Iterable或者Collection实现,并且想要得到适当的并行性能,就必须覆盖spliterator方法,并广泛的测试结果Stream的并行性能。编写高质量的分割迭代器很困难,并且超出了本书的讨论范畴。

并行Stream不仅可能降低性能,包括活性失败,还可以导致结果出错,以及难以预计的行为(如安全性失败)。安全性失败可能是因为并行的pipeline使用了映射、过滤器或者程序员自己编写的其他函数对象,并且没有遵守它们的规范。Stream规范对于这些函数对象有着严格的要求条件。例如,传到Stream的reduce操作的收集器函数和组合器函数,必须是有关联、互不干扰,并且是无状态的。如果不满足这些条件(在第46条提到了一些),但是按序列运行pipeline,可能会得到正确的结果;如果并发运行,则可能会突发性失败。

以上值得注意的是,并行的梅森素数程序虽然运行完成了,但是并没有按正确的顺序(升序)打印出素数。为了保存序列化版本程序显示的顺序,必须用forEachOrdered代替终止操作的forEach,它可以确保按enconuter顺序遍历并行的Stream。

假如在使用的是一个可以有效分割的源Stream,一个可行的或者简单的终止操作,以及互不干扰的函数对象,那么将无法获得通过并行实现的提速,除非pipeline完成了足够的实际工作,抵消了与并行相关的成本。据不完全估计,Stream中的元素数量,是每个元素所执行的代码行数的很多倍,至少是十万倍。

切记:并行Stream是一项严格的性能优化。对于任何优化都必须在改变前后对性能进行测试,以确保值得这么做(详见第67条)。最理想的是在实现的系统设置中进行测试。一般来说,程序中所有的并行Stream pipeline都是在一个通用的fork-join池中运行的。只要有一个pipeline运行异常,都会损害到系统中其他不相关部分的性能。

听起来貌似在并行Stream pipeline时怪事连连,其实正是如此。我有一个朋友,他发现在大量使用Stream的几百万行代码中,只有少数几个并行Stream是有效的。这并不意味着应该避免使用并行Stream。在适当的条件下,给Stream pipeline添加parallel调用,确实可以在多处理器核的情况下实现近乎线性的倍增。某些域如机器学习和数据处理,尤其适用于这样的提速。

简单举一个并行Stream pipeline有效的例子。假设下面这个函数是用来计算π(n),素数的数量少于或者等于n:

// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
  return LongStream.rangeClosed(2, n)
    .mapToObj(BigInteger::valueOf)
    .filter(i -> i.isProbablePrime(50))
    .count();
}

在我的机器上,这个函数花31秒完成了计算π(108)。只要添加一个parallel()调用,就把调用时间减到了9.2秒:

// Prime-counting stream pipeline - parallel version
static long pi(long n) {
  return LongStream.rangeClosed(2, n)
    .parallel()
    .mapToObj(BigInteger::valueOf) 
    .filter(i -> i.isProbablePrime(50)) 
    .count();
}

换句话说,并行计算在我的四核机器上添加了parallel()调用后,速度加快了3.7倍。值得注意的是,这并不是在实践计算n值很大时的π(n)的方法。还有更加高效的算法,如著名的Lehmer公式。

如果要并行一个随机数的Stream,应该从SplittableRandom实例开始,而不是从ThreadLocalRandom(或实际上已经过时的Random)开始。SplittableRandom正是专门为此设计的,还有线性提速的可能。ThreadLocalRandom则只用于单线程,它将自身当作一个并行的Stream源运用到函数中,但是没有SplittableRandom那么快。Random在每个操作上都进行同步,因此会导致滥用,扼杀了并行的优势。

总而言之,尽量不要并行Stream pipelien,除非有足够的理由相信它能保证计算的正确性,并且能加快程序的运行速度。如果对Stream进行不恰当的并行操作,可能导致程序运行失败,或者造成性能灾难。如果确信并行是可行的,并行运行时一定要确保代码正确,并在真实环境下认真的进行性能测量。如果代码正确,这些实验也证明它有助于提升性能,只有这时候,才可以在编写代码时并行Stream。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343