学习札记-Java8系列-11-并行流
并发和并行
●并发和并行
并发是多个任务共享时间段(由CPU切换执行,就像是在同时执行)
并行是多个任务发生在同一时刻(真真正正的同时执行)(必须在多核CPU下)
并行就像用更多的马车(CPU)来拉货(执行任务),货物总量(任务量)一定,那么花费的时间自然减少了。
所以并行可以缩短任务执行时间,提高多核CPU的利用率
●数据并行化
任务可以并行化执行,同样的数据也可以并行化处理!
数据并行化是指将数据分成块,为每块数据分配单独的处理单元。也就是并行流
如何使数据并行化
在Java8之前,当需要对存在于集合或数组中的若干元素进行并发操作时,简直就是噩梦!我们需要仔细考虑多线程环境下的原子性、竞争甚至锁问题。
使用Java5的java.util.concurrent.*并发库也还是要考虑诸多细节必须十分谨慎,
使用Java7的fork/join框架编码和调试对于一般程序员来说难度还是太大
而这一切对于Java8中的Stream,不过是小菜一碟!直接调用API方法就可以搞定!
(Stream API 可以声明性地通过parallel() 与sequential() 在并行流与顺序流之间进行切换)
转换为并行流
Stream 的父接口java.util.stream.BaseStream 中定义了一个parallel 方法:
只需要在流上调用一下无参数的parallel 方法,那么当前流即可变身成为支持并发操作的流,返回值仍然为
Stream 类型。例如:
Stream<Integer> stream = Stream.*of*(10, 20, 30, 40, 50).parallel();
直接获取并行流
在通过集合获取流时,也可以直接调用parallelStream 方法来直接获取支持并发操作的流。
代码为:
Stream<String> stream= **new** ArrayList<String>().parallelStream();
使用并行流
并行流后续操作的使用方式还是和以前一样,只是底层执行的时候会使用多CPU并行执行
比如多次执行下面这段代码,并行流的输出顺序在很大概率上是不一定的:
@Test
public void test1() throws Exception {
System.out.println("=======================顺序流=========================");
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).forEach(System.out::println);
System.out.println("=======================并行流=========================");
Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).parallel().forEach(System.out::println);
}
效率对比
需求:使用普通for和Stream和Parallel Stream 分别计算累加和并统计执行效率
public class ParallelStream {
// 需求:使用普通for和Stream和Parallel Stream 分别计算累加和并统计执行效率
@Test//35592ms
public void testFor() throws Exception {
long start = System.currentTimeMillis();
long sum = 0;
for (long i = 0; i < 100000000000L; i++) {
sum += i;
}
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
@Test//52974ms
public void testStream() throws Exception {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 100000000000L).sum();
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
@Test//12808ms
public void testParallelStream() throws Exception {
long start = System.currentTimeMillis();
long sum = LongStream.rangeClosed(0, 100000000000L).parallel().sum();
System.out.println(sum);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}
并行流的性能
影响并行流性能的主要因素
●数据大小
输入数据的大小会影响并行化处理对性能的提升。将问题分解之后并行化处理,再将结果合并会带来额外的开销。因此只有数据足够大、每个数据处理管道花费的时间足够多时,并行化处理才有意义。
●源数据结构
每个管道的操作都基于一些初始数据源,通常是集合。将不同的数据源分割相对容易,这里的开销影响了在管道中并行处理数据时到底能带来多少性能上的提升。
●装箱
处理基本类型比处理装箱类型要快。
●核的数量
极端情况下,只有一个核,因此完全没必要并行化。显然,拥有的核越多,获得潜在性能提升的幅度就越大。在实践中,核的数量不单指你的机器上有多少核,更是指运行时你的机器能使用多少核。这也就是说同时运行的其他进程,或者线程关联性(强制线程在某些核或CPU 上运行)会影响性能。
●单元处理开销
比如数据大小,这是一场并行执行花费时间和分解合并操作开销之间的战争。花在流中每个元素身上的时间越长,并行操作带来的性能提升越明显。
注意:
在底层,并行流还是沿用了fork/join 框架。fork 递归式地分解问题,然后每段并行执行,最终由join 合并结果,返回最后的值。
根据性能的好坏,将核心类库提供的通用数据结构分成以下3 组
●性能好
ArrayList、数组或IntStream.range,这些数据结构支持随机读取,也就是说它们能轻而易举地被任意分解。
●性能一般
HashSet、TreeSet,这些数据结构不易公平地被分解,但是大多数时候分解是可能的。
●性能差
有些数据结构难于分解,比如,可能要花O(N) 的时间复杂度来分解问题。其中包括LinkedList,对半分解太难了。还有Streams.iterate 和BufferedReader.lines,它们长度未知,因此很难预测该在哪里分解。