本文讨论:
- 串行和并行的执行效率区别
- 并行带来的一些问题
- 什么时候使用并行
ps:主要参考资料来自Effective Java 第三版
先来看一个关于性能提升的例子(来自effective java),是用来计算小于n的素数个数,机器是一个6核cpu(i5-8400)。
肉眼可见的性能得到了提升。
@Test
public void test() throws InterruptedException {
long start = System.currentTimeMillis();
pi(10000000);
long end = System.currentTimeMillis();
System.out.println("串行时间:" + (end - start) / 1000);
start = System.currentTimeMillis();
piParallel(10000000);
end = System.currentTimeMillis();
System.out.println("并行时间:" + (end - start) / 1000);
}
static long pi(long n) {
return LongStream.rangeClosed(2, n).mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
}
static long piParallel(long n) {
return LongStream.rangeClosed(2, n).parallel().mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
}
// console
串行时间:31
并行时间:6
并行处理看起来非常美好,但是,真的如此吗?先看一个官方给出的反面教材,下面的一段代码,每次执行都会得到不同的结果
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
线程安全集合
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
List<Integer> parallelStorage = Collections.synchronizedList(new ArrayList<>());
listOfIntegers.parallelStream()
// Don't do this! It uses a stateful lambda expression.
.map(e -> {
parallelStorage.add(e);
return e;
})
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " "));
// console
1 2 3 4 5 6 7 8
4 2 8 3 7 1 5 6
// console
1 2 3 4 5 6 7 8
6 5 1 4 2 3 7 8
线程不安全集合
Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8};
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
List<Integer> parallelStorage = Lists.newArrayList();
listOfIntegers.parallelStream()
// Don't do this! It uses a stateful lambda expression.
.map(e -> {
parallelStorage.add(e);
return e;
})
.forEachOrdered(e -> System.out.print(e + " "));
System.out.println();
parallelStorage.stream().forEachOrdered(e -> System.out.print(e + " "));
// console
1 2 3 4 5 6 7 8
4 8 7 1 5 2
// console
1 2 3 4 5 6 7 8
2 8 3 5 1 6 4 7
// console
1 2 3 4 5 6 7 8
null 4 6 1 5 8 7 2
从结果来看,有时候元素会变少,有时候甚至还出现了null。原因也非常简单:ArrayList不是线程安全的
使用并行流引出的第一个问题:
- 结果不确定性
官方文档有直接给出建议:
The lambda expression e -> { parallelStorage.add(e); return e; } is a stateful lambda expression. Its result can vary every time the code is run.
有状态的lambda表达式每次运行都会出现不同的结果。所以我们应该避免使用有状态的代码。
我们再看下面一个例子(也是来自effective java)
先来一个串行计算,和预计一样,很好的完了代码,并打印出了相应结果
@Test
public void test() {
primes().map(p -> BigInteger.valueOf(2).pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(10)
.forEach(System.out::println);
}
static Stream<BigInteger> primes() {
return Stream.iterate(BigInteger.valueOf(2), BigInteger::nextProbablePrime);
}
// console
3
7
31
127
8191
131071
524287
2147483647
2305843009213693951
618970019642690137449562111
再把他改成并行的试试,程序不动了
这里发生了什么?简而言之,流类库不知道如何并行化此管道并且启发式失败(heuristics fail)。 即使在最好的情况下,如果源来自 Stream.iterate 方法,或者使用中间操作 limit 方法,并行化管道也不太可能提高其性能
简而言之limit()方法是个stateful中间操作,并行处理的时候并不知道该如何并行
@Test
public void test() {
primes().map(p -> BigInteger.valueOf(2).pow(p.intValueExact()).subtract(ONE))
.filter(mersenne -> mersenne.isProbablePrime(50))
.limit(10)
.forEach(System.out::println);
}
再看一些其他的例子,对比一下串行,并行,和传统写法的性能差别
List<Long> list = Lists.newArrayList();
for (long i = 0; i < 1000000; i++) {
list.add(i);
}
long start = System.currentTimeMillis();
System.out.println("串行:" + list.stream().filter(n -> n % 3 == 0).mapToLong(n -> n).sum());
long end = System.currentTimeMillis();
System.out.println("串行时间:" + (end - start) + "ms");
start = System.currentTimeMillis();
System.out.println("并行:" + list.stream().parallel().filter(n -> n % 3 == 0).mapToLong(n -> n).sum());
end = System.currentTimeMillis();
System.out.println("并行时间:" + (end - start) + "ms");
start = System.currentTimeMillis();
long sum = 0;
for (long l : list) {
if (l % 3 == 0) {
sum += l;
}
}
System.out.println("传统:" + sum);
end = System.currentTimeMillis();
System.out.println("传统时间:" + (end - start) + "ms");
// console
串行:166666833333
串行时间:49ms
并行:166666833333
并行时间:23ms
传统:166666833333
传统时间:9ms
最后贴几个effective java里面关于并行流建议
- 通常,并行性带来的性能收益在 ArrayList、HashMap、HashSet 和 ConcurrentHashMap 实例、数组、int 类型范围和 long 类型的范围的流上最好
- 并行化一个流不仅会导致糟糕的性能,包括活性失败(liveness failures);它会导致不正确的结果和不可预知的行为 (安全故障)
- 在适当的情况下,只需向流管道添加一个 parallel 方法调用,就可以实现处理器内核数量的近似线性加速
最重要的一个建议就是
总之,甚至不要尝试并行化流管道,除非你有充分的理由相信它将保持计算的正确性并提高其速度。不恰当地并行化流的代价可能是程序失败或性能灾难。如果您认为并行性是合理的,那么请确保您的代码在并行运行时保持正确,并在实际情况下进行仔细的性能度量。如果您的代码是正确的,并且这些实验证实了您对性能提高的怀疑,那么并且只有这样才能在生产代码中并行化流。