在主流的编程语言中,Java一直走在简化并发编程任务的最前沿。1996年Java发布时,就通过同步和wait/notify内置了对线程的支持。Java5引入了java.util.concurrent类库,提供了并行集合(concurrent collection)和执行者框架(executor framework)。Java7引入fork-join包,这是一个处理并行分解的高性能框架。Java8引入Stream,只需要调用一次parallel方法就可以实现并行处理。在Java中编写并发程序变得越来越容易,但是要编写出正确又快速的并发程序,则一向没那么简单。安全性和活性失败是并发编程中需要面对的问题,Stream pipeline并行也不例外。
请看摘自第45条的这段程序:
// Stream-based program to generate the first 20 Mersenne primes
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.forEach(System.out::println);
}
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
在我的机器上,这段程序会立即开始打印素数,完成运行花了12.5秒。假设我天真的想通过在Stream pipeline上添加一个parallel()调用来提速。你认为这样会对其性能产生什么样的影响?运行速度会稍微块一点吗?还是会慢一点?遗憾的是,其结果是根本不打印任何内容,CPU的使用率却定在90%一动不动了(活性失败)。程序最后可能会终止,但是我们不想一探究竟,半个小时后就强行把它终止了。
这是怎么回事呢?简单的说,Stream类库不知道如何并行这个pipeline,以及如何探索失败。即便在最佳环境下,如果源头是来自Stream.iterate,或者使用了中间操作的limit,那么并行pipeline也不可能提升性能
。这个pipeline必须同时满足这两个条件。更糟糕的是,默认的并行策略在处理limit的不可预知性时,是假设额外多处理几个元素,并放弃任何不需要的结果,这些都不会影响性能。在这种情况下,他查找每个梅森素数时,所花费的时间大概是查找之前元素的两倍。因而,额外多计算一个元素的成本,大概相当于计算所有之前元素总和的时间,这个貌似无伤大雅的pipeline,却使得自动并行算法濒临崩溃。这个故事的寓意很简单:千万不要任意的并行Stream pipeline
。它造成的性能后果有可能是灾难性的。
总之,在Stream上通过并行获得性能,最好是通过ArrayList、HashMap、HashSet和ConcurrentHashMap实例,数组,int范围和long范围等
。这些数据结构的共性是,都可以被精确、轻松的分成任意大小的子范围,使并行线程的分工变得更加轻松。Stream类库用来执行这个任务的抽象是分割迭代器,它是由Stream和Iterable中的spliterator方法返回的。
这些数据结构共有的另一项重要特性是,在进行顺序处理时,它们提供了优异的引用局部性:序列化的元素引用一起保存在内存中。被那些引用访问到的对象在内存中可能不是一个紧挨着一个,这降低了引用的局部性。事实证明,引用局部性对于并发批处理来说至关重要:没有它,线程就会出现闲置,需要等待数据从内存转移到处理器的缓存。具有最佳引用局部性的数据结构是基本类型数组,因为数据本身是相邻的保存在内存中的。
Stream pipeline的终止操作本质上也影响了并发执行的效率。如果大量的工作在终止操作中完成,而不是全部工作在pipeline中完成,并且这个操作是固有的顺序,那么并行pipeline的效率就会受到限制。并行的最佳终止操作是做减法(reduction),用一个Stream的reduce方法,将所有从pipeline产生的元素都合并在一起,或者预先打包像min、max、count和sum这类方法。短路操作anyMatch、allMatch和noneMatch也都可以并行。由Stream的collect方法执行的操作,都是可变的减法,不是并行的最好选择,因为合并集合的成本非常高。
如果是自己编写Stream、Iterable或者Collection实现,并且想要得到适当的并行性能,就必须覆盖spliterator方法,并广泛的测试结果Stream的并行性能。编写高质量的分割迭代器很困难,并且超出了本书的讨论范畴。
并行Stream不仅可能降低性能,包括活性失败,还可以导致结果出错,以及难以预计的行为
(如安全性失败)。安全性失败可能是因为并行的pipeline使用了映射、过滤器或者程序员自己编写的其他函数对象,并且没有遵守它们的规范。Stream规范对于这些函数对象有着严格的要求条件。例如,传到Stream的reduce操作的收集器函数和组合器函数,必须是有关联、互不干扰,并且是无状态的。如果不满足这些条件(在第46条提到了一些),但是按序列运行pipeline,可能会得到正确的结果;如果并发运行,则可能会突发性失败。
以上值得注意的是,并行的梅森素数程序虽然运行完成了,但是并没有按正确的顺序(升序)打印出素数。为了保存序列化版本程序显示的顺序,必须用forEachOrdered代替终止操作的forEach,它可以确保按enconuter顺序遍历并行的Stream。
假如在使用的是一个可以有效分割的源Stream,一个可行的或者简单的终止操作,以及互不干扰的函数对象,那么将无法获得通过并行实现的提速,除非pipeline完成了足够的实际工作,抵消了与并行相关的成本。据不完全估计,Stream中的元素数量,是每个元素所执行的代码行数的很多倍,至少是十万倍。
切记:并行Stream是一项严格的性能优化。对于任何优化都必须在改变前后对性能进行测试,以确保值得这么做(详见第67条)。最理想的是在实现的系统设置中进行测试。一般来说,程序中所有的并行Stream pipeline都是在一个通用的fork-join池中运行的。只要有一个pipeline运行异常,都会损害到系统中其他不相关部分的性能。
听起来貌似在并行Stream pipeline时怪事连连,其实正是如此。我有一个朋友,他发现在大量使用Stream的几百万行代码中,只有少数几个并行Stream是有效的。这并不意味着应该避免使用并行Stream。在适当的条件下,给Stream pipeline添加parallel调用,确实可以在多处理器核的情况下实现近乎线性的倍增
。某些域如机器学习和数据处理,尤其适用于这样的提速。
简单举一个并行Stream pipeline有效的例子。假设下面这个函数是用来计算π(n),素数的数量少于或者等于n:
// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
在我的机器上,这个函数花31秒完成了计算π(108)。只要添加一个parallel()调用,就把调用时间减到了9.2秒:
// Prime-counting stream pipeline - parallel version
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.parallel()
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
换句话说,并行计算在我的四核机器上添加了parallel()调用后,速度加快了3.7倍。值得注意的是,这并不是在实践计算n值很大时的π(n)的方法。还有更加高效的算法,如著名的Lehmer公式。
如果要并行一个随机数的Stream,应该从SplittableRandom实例开始,而不是从ThreadLocalRandom(或实际上已经过时的Random)开始。SplittableRandom正是专门为此设计的,还有线性提速的可能。ThreadLocalRandom则只用于单线程,它将自身当作一个并行的Stream源运用到函数中,但是没有SplittableRandom那么快。Random在每个操作上都进行同步,因此会导致滥用,扼杀了并行的优势。
总而言之,尽量不要并行Stream pipelien,除非有足够的理由相信它能保证计算的正确性,并且能加快程序的运行速度。如果对Stream进行不恰当的并行操作,可能导致程序运行失败,或者造成性能灾难。如果确信并行是可行的,并行运行时一定要确保代码正确,并在真实环境下认真的进行性能测量。如果代码正确,这些实验也证明它有助于提升性能,只有这时候,才可以在编写代码时并行Stream。