我们已经在前面的几篇文章中已经在介绍stream api 的一些简单的使用,大都如下面的示例
List<Integer> list = Lists.newArrayList(1,2,3,4,5,6,7,8,9);
list.stream().filter(x -> x % 2 ==0)
.filter(x -> x > 8)
.forEach(System.out::println);
同样的,我们也注意到另外一个函数,parallelStream, 看名字像是并行的Stream,将上面的代码改成parallelStream,
list.stream().filter(x -> x % 2 ==0)
.filter(x -> x > 8)
.forEach(System.out::println);
这样就会用不同的线程分别处理每一个数据块的流了,多核处理器就开始忙起来了,实际上,并行流内部使用到了ForkJoinPool,那我们如何比较两者的性能呢,而且是不是我们加了parallel之后就是并行的了?出来的结果是不是一定正确的?
答案是否定的,因为在并行流中,处理的数据要被多个线程共享,那么就关系到了线程安全的问题,如果你加了同步锁,那么并行流就失去了意义,所有说并行流不是万能的,不仅仅如此,你还要考虑流背后的数据结构是不是便于拆解,比如说ArrayList和LinkedList, 因为ArrayList 是不需要进行遍历就可以被拆分进行操作,而LinkedList 却要进行遍历。
那么并行流内部用到的ForkJoinPool到底是哪方神圣,接下来我们来使用ForkJoinPool手写一个对于List中元素求和的功能。
public class ForkJoinPoolExample extends RecursiveTask<Integer> {
private List<Integer> list;
private int start;
private int end;
private final static int threshold = 4;
public ForkJoinPoolExample(List<Integer> data) {
this(data, 0, data.size());
}
public ForkJoinPoolExample(List<Integer> data, int start, int end) {
this.list = data;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int len = end - start;
if (len <= threshold) {
return computerSequentially();
}
ForkJoinPoolExample left = new ForkJoinPoolExample(list, start, start + len / 2);
left.fork();
ForkJoinPoolExample right = new ForkJoinPoolExample(list, start + len / 2, end);
right.fork();
return left.join() + right.join();
}
public Integer computerSequentially() {
int sum = 0;
for (int i = start; i < end; i++) {
sum = sum + list.get(i);
}
return sum;
}
public static void main(String[] arg) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<Integer> list = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6);
ForkJoinPoolExample forkJoinPoolExample = new ForkJoinPoolExample(list);
Integer sum = forkJoinPool.invoke(forkJoinPoolExample);
System.out.println("The sum is " + sum);
}
}
关于ForkJoinPool,由于每一个核运行的效率不同,导致如果平均分配线程在核上运行的话,那样效率就低了,因此分支/合并 框架工程采用了工作窃取work stealing,采用了双向的链式队列,使得“有余力”可以在队列尾部获得新的任务运行.
那么高效的使用并行流有哪些需要注意的呢?
- 要注意,实现了并行流不一定比顺序流快,原因是,如果处理的对象存在互斥,那么相当于顺序流,关键还是要看基准测试。
- 注意拆箱和装箱的性能消耗,使用java8 中提供的原始类型流,IntStream,LongStrean,DoubleStream.
- 等等。