这篇博客,我们将介绍Java中的并行流(Parallel Stream)。
Java 8
引入了并行流的概念来进行并行处理。 随着技术的发展,我们所使用的计算机硬件成本越来越廉价,性能却遵循者摩尔定律越来越快。无论是大型服务器还是个人计算机,多核CPU已不再是触不可及的稀缺资源,因此可以使用并行处理来充分榨取CPU资源获取更好的性能。
下面让我们通过2个例子来做个对比以更好的了解并行技术
public class Java8ParallelStreamMain {
public static void main(String[] args) {
System.out.println("=================================");
System.out.println("Using Sequential Stream");
System.out.println("=================================");
int[] array= {1,2,3,4,5,6,7,8,9,10};
IntStream intArrStream=Arrays.stream(array);
intArrStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
System.out.println("=================================");
System.out.println("Using Parallel Stream");
System.out.println("=================================");
IntStream intParallelStream=Arrays.stream(array).parallel();
intParallelStream.forEach(s->
{
System.out.println(s+" "+Thread.currentThread().getName());
}
);
}
}
在自己的电脑上执行上述代码,你会看到控制台的输出如下:
=================================
Using Sequential Stream
=================================
1 main
2 main
3 main
4 main
5 main
6 main
7 main
8 main
9 main
10 main
=================================
Using Parallel Stream
=================================
7 main
6 main
8 ForkJoinPool.commonPool-worker-3
2 main
1 ForkJoinPool.commonPool-worker-3
5 ForkJoinPool.commonPool-worker-3
4 ForkJoinPool.commonPool-worker-5
9 ForkJoinPool.commonPool-worker-2
3 ForkJoinPool.commonPool-worker-1
10 ForkJoinPool.commonPool-worker-4
我们可以看到控制台的输出,在顺序执行的情况下主线程(main thread)会完成所有工作。所有的工作串行执行,直到等到当前迭代执行完成,才会进行下一个迭代。
同样,我们注意到在并行流的情况下,会同时生成6个线程,并在内部使用Fork和Join池创建和管理线程。并行流通过静态ForkJoinPool.commonPool() 方法创建ForkJoinPool
实例。
并行流(Parallel Stream)利用所有可用CPU内核的优势,并并行处理任务。 如果任务数超过内核数,则其余任务将等待当前正在运行的任务完成。
你可以通过Runtime.getRuntime().availableProcessors()
来获取当前计算机的CPU 内核数量。
并行流优势如此大,那么我们应该始终使用它吗?
答案是否定的,
通过上述代码我们可以看到仅通过添加parallel()
即可轻松将顺序流转换为并行流,但是这并不意味着我们应该始终使用它。
在使用并行流时我们需要考虑很多因素,否则我们将会遭受并行流所带来的负面影响。
并行流比顺序流具有更高的开销,并且在线程之间的上下文进行协调切换需要花费大量时间。
仅在以下情况下,才需要考虑是否使用并行流:
- 需要处理大量数据集。
- 我们知道 Java 使用
ForkJoinPool
实现并行性,ForkJoinPool
派生源流并提交执行,因此源数据流应该是可拆分的。例如:ArrayList
非常容易拆分,因为我们可以通过索引找到中间元素并将其拆分,但是LinkedList
很难拆分,并且在大多数情况下表现不佳。在这种情况下就不适合用并行流。 - 我们在处理问题的时候确实遇到流性能问题,否则请不要为了并行而并行。
- 我们需要确保线程之间的所有共享资源都是正确同步,否则可能会产生数据不一致问题。
衡量并行度的最简单公式是Brian Goetz在其演讲中提到的NQ模型。
N x Q >10000
N = 数据集中的项目数量
Q = 每个项目的工作量
这意味着如果您有大量数据集并且每个项目的工作量比较少(例如:求和),那么并行性可能会帮助您提升程序性能,反之亦然。 因此,如果您有较少的数据集和每个项目更多的工作(做一些计算工作),那么并行性也可以帮助您提升程序性能。
下面,让我们看一个相对复杂一点的用例。
即将展示的这个示例中,我们将非常明显的看到在并行流和顺序流的情况下,程序执行的时长和CPU的行为。
public class PerformanceComparisonMain {
public static void main(String[] args) {
long currentTime=System.currentTimeMillis();
List<Integer> data=new ArrayList<Integer>();
for (int i = 0; i < 100000; i++) {
data.add(i);
}
long sum=data.stream()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
System.out.println(sum);
long endTime=System.currentTimeMillis();
System.out.println("Time taken to complete:"+(endTime-currentTime)/(1000*60)+" minutes");
}
public static int performComputation(int number)
{
int sum=0;
for (int i = 1; i < 1000000; i++) {
int div=(number/i);
sum+=div;
}
return sum;
}
}
当执行上述代码,控制台输出如下:
117612733
Time taken to complete:5 minutes
上述代码执行时长与我们电脑的CPU密切相关,我们来看看在执行代码过程中CPU的行为
如您所见,在顺序流的情况下,CPU没有得到充分利用。
让我们更改一下上面的代码,在8行加上.parallel()
并行处理。
long sum=data.stream()
.parallel()
.map(i ->(int)Math.sqrt(i))
.map(number->performComputation(number))
.reduce(0,Integer::sum);
再次执行上述代码,您将会看到如下控制台输出
117612733
Time taken to complete:1 minutes
让我们再次来看看在并行执行的情况下CPU的运行情况
综上对比我们发现在并行执行的情况下程序的执行性能提升了将近5倍。前面我们已经提到,并行流底层实现采用的ForkJoin
机制,关于Fork-Join
的更多细节,下一篇博文呈现给大家。
祝您编码愉快~