源起
在importnew上面看到了这样一篇文章http://www.importnew.com/17262.html,文章中通过测试发现流处理性能不佳,文章下面评论均表示并非如此,由于在项目中首次尝试引入流处理,因此对性能也格外关注。而后看到这篇更为可靠的性能测试对比文章https://segmentfault.com/a/1190000004171551,该篇文章表明串行流在消除自动装箱的干扰后略逊与被编译器优化了多年的foreach,而在多核条件下并行流则强于前两者。
自己动手丰衣足食
纸上得来终觉浅,自己尝试测试了一把。测试代码如下:
将500000个数并求和,并且进行100次蒙特卡洛实验,pc是4核,保证测试期间gc不影响结果
public static void compareForAndStream() {
//p1表示for性能,p2表示流处理性能
long p1 = 0, p2 = 0;
int n = 500000;
ArrayList<Integer> arr = Lists.newArrayList();
for(int j = 0; j < 100; j ++) {
for(int i = 0; i < n; i ++) {
arr.add(i);
}
Integer sum = 0;
long t1 = System.nanoTime();
for(Integer v : arr) {
sum = sum + v;
}
long t2 = System.nanoTime();
arr.stream().reduce(0, (a, b) -> a + b);
//arr.stream().parallel().reduce(0, (a, b) -> a + b);
long t3 = System.nanoTime();
p1 += (t2 - t1);
p2 += (t3 - t2);
arr.clear();
}
System.out.println(p1 / 100 / 1000);
System.out.println(p2 / 100 / 1000);
}
测试结果如下,符合预期,流处理性能并不差。并且java性能权威指南上关于流处理更多逻辑的循环时性能还要优于for。使用意见基本可以参照上面第二篇文章。
for: 7-9ms
串行流:7-9ms
并行流:5-6ms
关于并行流
网上很多文章说并行流存在自己的陷阱,归纳起来就是使用公共的forkjoinpool线程池,一旦存在某些耗时的任务,后续任务将会阻塞。可以这么理解,对于一个四核的cpu,forkjoinpool会启用四个线程执行任务,同时为四个线程分别创建任务队列,如果四个线程均在处理耗时的任务,那么任务队列里面的任务将会绵绵不绝的阻塞。这个问题如何解决呢?
两种方法
一是将并行流的源数据拆成多个,形成多个并行流在单独的线程中执行,当线程池中达到四个线程执行任务时,新启动的并行流就只能在其本身的线程X中执行,成了串行流,不过执行顺序被打乱。此时X也拥有可被窃取的任务队列,当线程池中有线程空闲时将会窃取X中的任务执行。
二是创建新的forkjoinpool线程池Y,将任务拆分到Y中执行,这种方式可能不大方便。
通过上面的分析发现,在一个大型项目中如果广泛使用并行流,某些情况下可能达不到并行的效果,当线程池线程均处于运行状态时,其他线程中执行的并行流可能变为无序的串行流。
最后观察并行流线程的变化以及任务窃取的发生是通过如下代码完成,通过断点并且查看栈信息的变化(idea真好用啊)可以发现下述结果:先启动的任务使用了所有的forkjoinpool默认线程池的线程,第二个任务刚开始在自己线程中执行,当第一个任务中有线程执行完自身任务队列中所有任务后将窃取第二个任务的任务队列中的任务。
public static void testParallelStream() {
int[] arr = IntStream.range(1, 5).toArray();
new Thread(() -> {
Arrays.stream(arr).parallel().forEach((v) -> {
try {
System.out.println("first:" + v);
int sum = 0;
for(long i = 0; i < (1<<28); i ++) {
sum += i % 2;
}
System.out.println("first:" + v + ":" + sum);
} catch (Exception e) {
e.printStackTrace();
}
});
}).start();
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(() -> {
Arrays.stream(arr).parallel().forEach((v) -> {
try {
System.out.println("first1:" + v);
int sum = 0;
for(long i = 0; i < (1<<30); i ++) {
sum += i % 2;
}
System.out.println("first1:"+ v + ":" + sum);
} catch (Exception e) {
e.printStackTrace();
}
});
}).start();
try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
Arrays.stream(arr).forEach((v)-> {
//此处有断点
System.out.println("second"+v);});
}
其他的,对于流处理reduce的理解
/**
* reduce的使用,第一个参数是种子,如果不传递将以流的第一个数据为种子
* 第二个参数是累积器accumulator,迭代对流的数据操作,第三个参数combiner不填默认同第二个参数相同,
* 第三个参数只在并行流下有意义,当且仅当
* combiner.apply(u, accumulator.apply(identity, t)) == accumulator.apply(u, t)
* 时,计算结果与串行流相同
*/
public static void testReduce() {
IntStream stream = IntStream.range(1, 100);
System.out.println(stream.sum());
stream = IntStream.range(1, 100);
//并行计算1-99的和,种子值为1,由于存在并行计算,因此结果将会比预期值大
System.out.println(stream.parallel().reduce(1, (a, b) -> a + b));
stream = IntStream.range(1, 100);
//种子值为0,与预期相符
System.out.println(stream.parallel().reduce(0, (a, b) -> a + b));
//串行计算
stream = IntStream.range(1, 100);
System.out.println(stream.reduce(0, (a, b) -> a + b));
//串行计算平方和
stream = IntStream.range(1, 5);
System.out.println(stream.reduce(0, (a, b) -> a + b * b ));
//并行计算平方和,combiner这样写得不到想要的结果
stream = IntStream.range(1, 5);
System.out.println(stream.parallel().reduce(0, (a, b) -> a + b * b));
//适合计算平方和的combiner
System.out.println(
Stream.of(1,2,3,4).parallel()
.reduce(0, (a, b) -> a + b * b, (a, b) -> a + b));
}