并行流
并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。
Java8中将并行进行了优化,我们可以很容易地对数据进行行操作。Stream API可以声明性地通过parallel()
与sequential()
在并行流与顺序流之间进行切换。
Fork/Join框架
【Fork/Join框架介绍】
Fork/Join框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个小任务(拆到不可再拆时),压如到线程队列,并行求值,得出结果之后,再将一个个的小任务运算的结果进行join汇总。
【Fork/Join 框架与传统线程池的区别】
Fork/Join框架采用“工作窃取”模式:当执行新的任务时,它可以将其拆分成更小的任务执行,并将小任务加到线程队列中。如果线程获取不到任务,它不会闲着,会从一个随机线程的队列中偷一个并把它放在自己的队列中,保证尽可能地利用CPU的资源。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上。在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法继续运行,那么处理该子问题的线程会主动寻找其他尚未运行的子问题来执行。这种方式减少了线程的等待时间,提高了性能 。
【RecursiveTask】
要求:求0-100000000000相加之和。
创建RecursiveTask的继承ForkJoinCalculate类
/**
* RecursiveTask代表有返回值的任务
* RecursiveAction代表没有返回值的任务
*/
public class ForkJoinCaculate extends RecursiveTask<Long> {
private static final long serialVersionUID = 1238793729l;
private long start;
private long end;
// 临界值,说明每个小任务最多累加10000个数
private static final long THRESHOLD = 10000;
public ForkJoinCaculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
long length = end - start;
// 如果任务足够小就计算任务
if(length <= THRESHOLD){
long sum = 0;
for(long i =start;i<=end;i++){ //到达临界值进行+操作
sum +=i;
}
return sum;
}else{
// 如果任务大于阈值,就分裂成两个子任务计算
long mid = (start+end)/2;
ForkJoinCaculate left = new ForkJoinCaculate(start,mid);
ForkJoinCaculate right = new ForkJoinCaculate(mid+1,end);
// 并行执行两个小任务
left.fork();
right.fork();
// 等待任务执行结束合并其结果
return left.join()+right.join();
}
}
}
public class TestForkJoin {
/**
* ForkJoin框架
*/
@Test
public void test1(){
Instant start = Instant.now();
ForkJoinPool pool = new ForkJoinPool();
// ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类
// 它有两个抽象子类:RecursiveAction和RecursiveTask
ForkJoinTask<Long> task = new ForkJoinCaculate(0,100000000000L);
Long sum = pool.invoke(task);
System.out.println(sum);
Instant end = Instant.now();
//如果数值过小,拆分需要时间,效率就会比for循环还要低
System.out.println("耗费时间为:"+Duration.between(start,end).toMillis()); //2016
}
/**
* 普通for循环
*/
@Test
public void test2(){
Instant start = Instant.now();
long sum = 0L;
for(long i=0;i<=100000000000L;i++){
sum+=i;
}
System.out.println(sum);
Instant end = Instant.now();
System.out.println("耗费时间为:"+Duration.between(start,end).toMillis()); //3135
}
}
Java8对并行流进行了一个提升,用起来不再这么麻烦:
public class TestForkJoin {
@Test
public void test1(){
Instant start = Instant.now();
//底层就是forkjoin
LongStream.rangeClosed(0,100000000000L)
.parallel() // 并行流,若顺序流则为sequential()
.reduce(0,Long::sum);
Instant end = Instant.now();
System.out.println("耗费时间为:"+Duration.between(start,end).toMillis());
}
}