ITEM 48: USE CAUTION WHEN MAKING STREAMS PARALLEL
在主流语言中,Java 一直处于提供简化并发编程任务的工具的前沿。当 Java 在1996年发布时,它内置了对线程的支持,包括 synchronization 和 wait/notify。Java 5 引入了 java.util.concurrent 库,具有并发集合和执行程序框架。Java 7 引入了 fork-join 包,这是一个用于并行分解的高性能框架。Java 8 引入了流,它可以通过对并行方法的单个调用来并行化。 用 Java 编写并发程序变得越来越容易,但是编写正确且快速的并发程序和以前一样困难。安全性和活性违规在并发编程中是不可避免的,并行流管道也不例外。
考虑项目45中的这个项目:
// Stream-based program to generate the first 20 Mersenne primes
public static void main(String[] args) {
primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(20)
.forEach(System.out::println);
}
static Stream<BigInteger> primes() {
return Stream.iterate(TWO, BigInteger::nextProbablePrime);
}
在我的机器上,这个程序立即开始打印素数,运行 12.5 秒才能完成。假设我天真地试图通过向流管道添加一个对 parallel() 的调用来加速它。你认为它的表现会怎样?会快几个百分点吗?慢几个百分点?遗憾的是,它不会打印任何东西,但是 CPU 使用率会飙升到 90% 并无限期地停留在那里(活性失败)。程序最终可能会终止,但我不愿意去等待;半小时后我强行把它停了下来。
这是怎么回事?简单地说,streams 库不知道如何并行化这个管道,因此试探法失败了。即使在最好的情况下,如果源来自 Stream.iterate,或者使用 limit 操作限制,那么并行化管道也不太可能提高其性能。这个管道必须同时处理这两个问题。更糟糕的是,默认的并行化策略通过假设处理一些额外的元素和丢弃任何不需要的结果没有害处来处理不可预测性的限制。在这种情况下,找到每一个梅森素数所需的时间大约是找到前一个梅森素数所需时间的两倍。因此,计算单个额外元素的成本大约等于计算前面所有元素的总和,而这个看起来没什么问题的管道使自动并行化算法陷入了瘫痪。这个故事的寓意很简单:不要不加选择地并行化流管道。性能结果可能是灾难性的。
通常,通过 ArrayList、HashMap、HashSet 和 ConcurrentHashMap 实例、数组,int区间,和long区间 获得的并行性性能最好。这些数据结构的共同之处在于,它们都可以精确而廉价地划分成任意大小的子例程,这使得在并行线程之间划分工作变得很容易。streams 库用于执行此任务的抽象是 spliterator,它由 spliterator 方法在 Stream 和 Iterable 上返回。
所有这些数据结构的另一个重要共同点是,当按顺序处理时,它们提供了从优秀的引用位置:顺序元素引用一起存储在内存中。这些引用引用的对象在内存中可能彼此不太接近,从而降低了引用的位置。对于并行化大量操作来说,引用的位置是非常重要的:如果没有引用,线程将花费大量时间在空闲上,等待数据从内存传输到处理器的缓存中。具有最佳引用位置的数据结构是基本数组,因为数据本身在内存中是连续存储的。
流管道终端操作的性质也会影响并行执行的效率。如果在终端操作中所做的大量工作与管道的总体工作相比较,并且该操作天生是顺序的,那么并行化管道的有效性将是有限的。并行性的最佳终端操作是 reductions,即使用 Stream 的一种 reduce 方法或预先包装的 reductions 方法(如min、max、count和sum) 组合管道中出现的所有元素。
短路操作 anyMatch、allMatch 和 noneMatch 也支持并行性。流的 collect method 执行的操作(称为可变约减) 不适合并行性,因为组合集合的开销非常大。
如果您编写自己的流、可迭代的或集合实现,并且希望获得良好的并行性能,则必须覆盖spliterator 方法,并广泛地测试结果流的并行性能。编写高质量的 spliterators 是困难的,并且超出了本书的范围。
并行化流不仅可能会导致性能低下,包括活性失效;它还会导致不正确的结果和不可预测的行为(安全故障)。安全故障可能是由于使用映射器、过滤器和其他程序员提供的函数对象的管道并行化而导致的,这些对象没有遵守规范。
流规范对这些功能对象有严格的要求。例如,传递给 Stream 的 reduce 操作的累加器和组合器函数必须是关联的、非干扰的和无状态的。如果您违反了这些要求(item 46 条中讨论了其中的一些要求),但按顺序运行管道,则可能会得到正确的结果;如果你并行化它,它很可能会失败,也许是灾难性的。
值得注意的是,即使并行化的 Mersenne 素数程序运行到完成,它也不会以正确的(升序)顺序打印素数。为了保留顺序版本所显示的顺序,您必须将 forEach 终端操作替换为 forEachOrdered,该操作保证按偶遇顺序遍历并行流。即使假设您正在使用一个有效的可分割的源流、一个可并行化的或廉价的终端操作,以及不干扰的函数对象,您也无法从并行化中获得良好的加速效果,除非管道做了足够多的实际工作来抵消与并行化相关的成本。作为一个非常粗略的估计,流中的元素数乘以每个元素执行的代码行数至少应该是十万级别 [Lea14]。
重要的是要记住,并行化流是严格意义上的性能优化。与任何优化一样,您必须在更改之前和之后测试性能,以确保这是值得做的(item 67)。理想情况下,您应该在实际的系统设置中执行测试。通常,程序中的所有并行流管道都在公共 fork-join 池中运行。一个行为不端的管道可能会损害系统中其他不相关部分的性能。
使用并行化流管道时听起来似乎有很多困难,那是因为它们确实存在。一位维护大量使用流的数百万在线代码库的程序员发现,只有少数地方并行流是有效的。这并不意味着您应该避免并行化流。在适当的情况下,只要向流管道中添加一个并行调用,就可以实现处理器核心数量的近似线性加速。某些领域,如机器学习和数据处理,特别适合这些加速。
一个流管道并行性是有效的例子,考虑这个函数计算π(n),质数数目小于或等于n:
// Prime-counting stream pipeline - benefits from parallelization
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
在我的机器上,使用这个方法需要31秒计算π(108)。简单地添加一个 parallel() 调用可以将时间缩短到9.2秒:
// Prime-counting stream pipeline - parallel version
static long pi(long n) {
return LongStream.rangeClosed(2, n)
.parallel()
.mapToObj(BigInteger::valueOf)
.filter(i -> i.isProbablePrime(50))
.count();
}
换句话说,在我的四核计算机上,并行化计算速度提高了3.7倍。值得注意的是,这不是你如何计算π(n)为大n的值。还有更有效的算法,尤其是 Lehmer 公式。如果要并行化一个随机数流,请从 SplittableRandom 实例开始,而不是 ThreadLocalRandom (或实际上已经过时的random)。SplittableRandom 正是为这种用途而设计的,并且具有线性加速的潜力。ThreadLocalRandom 是为单线程使用而设计的,它将自身调整为并行流源,但速度不会像SplittableRandom 那么快。随机在每个操作上同步,因此它将导致过度的并行杀死争用。
总之,不要轻易尝试并行化流管道,除非您有充分的理由相信它可以保持计算的正确性并提高速度。不适当地并行化流的代价可能是程序失败或性能灾难。如果您认为并行性是合理的,那么确保您的代码在并行运行时保持正确,并在实际条件下进行仔细的性能测量。如果您的代码保持正确,并且这些实验证实了您对性能提高的怀疑,那么,也只有在这种情况下,才能将生产代码中的流并行化。