学习基于记录,而不止于记录。
希望自己能坚持下去~
0.写在前面
java 版本:1.8.0_181
开发工具:IntelliJ IDEA 2018.3.2 (Ultimate Edition)
总结Java8 并行流和串行流的基本用法。
1.概念理解
并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。Java 8 中将并行进行了优化,我们可以很容易的对数据进行并 行操作。Stream API 可以声明性地通过 parallel() 与 sequential() 在并行流与顺序流之间进行切换。
Fork/Join 框架就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个 小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总.
Fork/Join 框架与传统线程池的区别,采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程 的等待时间,提高了性能.
2.代码实例
ForkJoinDemo类
package fokjoin;
import java.util.concurrent.RecursiveTask;
/**
* @Author: grj
* @Date: 2020/5/20 21:38
* 并行流和串行流,
* FokJoin模式
*/
public class ForkJoinDemo extends RecursiveTask<Long> {
private Long start;
private Long end;
public ForkJoinDemo(Long start, Long end) {
this.start = start;
this.end = end;
}
private static final long THRESHOLD = 10000;
@Override
protected Long compute() {
long length = end - start;
if(length<= THRESHOLD) {
long sum = 0;
for (long i = start; i <= end; i++) {
sum += i;
}
return sum;
}
long middle = (start+end) / 2;
ForkJoinDemo fokJoin1 = new ForkJoinDemo(start,middle);
fokJoin1.fork(); //拆分子任务,压入线程
ForkJoinDemo fokJoin2 = new ForkJoinDemo(middle+1,end);
fokJoin2.fork();
return fokJoin1.join() + fokJoin2.join();
}
}
测试类:
package fokjoin;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
/**
* @Author: grj
* @Date: 2020/5/20 21:51
*/
public class TestForkJoin {
public static void main(String[] args) {
test1();
test2();
test3();
/*
处理数据量越大,forkjoin优势越明显
sum:-5340232216128654848
forkjoin耗时:283000000
sum:-5340232216128654848
常规耗时:507000000
sum:-5340232216128654848
常规耗时:170000000
*/
}
public static void test1() {
Instant start1 = Instant.now();
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0L,10000000000L);
Long sum = forkJoinPool.invoke(forkJoinTask);
System.out.println("sum:"+sum);
Instant end1 = Instant.now();
System.out.println("forkjoin耗时:"+Duration.between(start1,end1).getNano());
}
//常规对比组
public static void test2() {
Instant start2 = Instant.now();
long sum = 0L;
for (long i = 0; i <= 10000000000L; i++) {
sum += i;
}
System.out.println("sum:"+sum);
Instant end2 = Instant.now();
System.out.println("常规耗时:"+Duration.between(start2,end2).getNano());
}
//Java8 并行流
public static void test3() {
Instant start3 = Instant.now();
long reduce = LongStream.rangeClosed(0, 10000000000L)
.parallel() //切换成并行流
.reduce(0, Long::sum);
System.out.println("sum:"+reduce);
Instant end3 = Instant.now();
System.out.println("常规耗时:"+Duration.between(start3,end3).getNano());
}
}
3.总结
从上面的代码实例可以看出Java8并行流处理数据尤其是大量数据时,并行处理速度显著提高而且比传统的ForkJoin模式实现更简单。但是必须要提出的一点是在开发过程中,可能会产生并发操作数据异常。
可以看看这篇博客的内容:Java8 parallelStream浅析
摘抄:显而易见,stream.parallel.forEach()中执行的操作并非线程安全。如果需要线程安全,可以把集合转换为同步集合,即:Collections.synchronizedList(new ArrayList<>())。