Java8新特性-并行流和串行流_2020.05.20

学习基于记录,而不止于记录。

希望自己能坚持下去~

0.写在前面

java 版本:1.8.0_181
开发工具:IntelliJ IDEA 2018.3.2 (Ultimate Edition)

总结Java8 并行流和串行流的基本用法。

1.概念理解

     并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。Java 8 中将并行进行了优化,我们可以很容易的对数据进行并 行操作。Stream API 可以声明性地通过 parallel() 与 sequential() 在并行流与顺序流之间进行切换。

     Fork/Join 框架就是在必要的情况下,将一个大任务,进行拆分(fork)成若干个 小任务(拆到不可再拆时),再将一个个的小任务运算的结果进行 join 汇总.

    Fork/Join 框架与传统线程池的区别,采用 “工作窃取”模式(work-stealing):
当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线 程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的 处理方式上.在一般的线程池中,如果一个线程正在执行的任务由于某些原因 无法继续运行,那么该线程会处于等待状态.而在fork/join框架实现中,如果 某个子问题由于等待另外一个子问题的完成而无法继续运行.那么处理该子 问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了线程 的等待时间,提高了性能.

2.代码实例

ForkJoinDemo类

package fokjoin;

import java.util.concurrent.RecursiveTask;

/**
 * @Author: grj
 * @Date: 2020/5/20 21:38
 * 并行流和串行流,
 * FokJoin模式
 */
public class ForkJoinDemo extends RecursiveTask<Long> {

    private Long start;
    private Long end;

    public ForkJoinDemo(Long start, Long end) {
        this.start = start;
        this.end = end;
    }

    private static final long THRESHOLD = 10000;

    @Override
    protected Long compute() {
        long length = end - start;

        if(length<= THRESHOLD) {
            long sum = 0;
            for (long i = start; i <= end; i++) {
                 sum += i;
            }
            return sum;
        }
        long middle = (start+end) / 2;
        ForkJoinDemo fokJoin1 = new ForkJoinDemo(start,middle);
        fokJoin1.fork(); //拆分子任务,压入线程

        ForkJoinDemo fokJoin2 = new ForkJoinDemo(middle+1,end);
        fokJoin2.fork();

        return fokJoin1.join() + fokJoin2.join();
    }
}

测试类:

package fokjoin;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;

/**
 * @Author: grj
 * @Date: 2020/5/20 21:51
 */
public class TestForkJoin {

    public static void main(String[] args) {
        test1();
        test2();
        test3();
        /*
        处理数据量越大,forkjoin优势越明显
        sum:-5340232216128654848
        forkjoin耗时:283000000
        sum:-5340232216128654848
        常规耗时:507000000
        sum:-5340232216128654848
        常规耗时:170000000
         */
    }

    public static void test1() {
        Instant start1 = Instant.now();

        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Long> forkJoinTask = new ForkJoinDemo(0L,10000000000L);
        Long sum = forkJoinPool.invoke(forkJoinTask);
        System.out.println("sum:"+sum);

        Instant end1 = Instant.now();
        System.out.println("forkjoin耗时:"+Duration.between(start1,end1).getNano());
    }

    //常规对比组
    public static void test2() {
        Instant start2 = Instant.now();

        long sum = 0L;
        for (long i = 0; i <= 10000000000L; i++) {
            sum += i;
        }
        System.out.println("sum:"+sum);

        Instant end2 = Instant.now();
        System.out.println("常规耗时:"+Duration.between(start2,end2).getNano());
    }
  
    //Java8 并行流
    public static void test3() {
        Instant start3 = Instant.now();

        long reduce = LongStream.rangeClosed(0, 10000000000L)
                .parallel() //切换成并行流
                .reduce(0, Long::sum);
        System.out.println("sum:"+reduce);

        Instant end3 = Instant.now();
        System.out.println("常规耗时:"+Duration.between(start3,end3).getNano());

    }
}

3.总结

    从上面的代码实例可以看出Java8并行流处理数据尤其是大量数据时,并行处理速度显著提高而且比传统的ForkJoin模式实现更简单。但是必须要提出的一点是在开发过程中,可能会产生并发操作数据异常。

可以看看这篇博客的内容:Java8 parallelStream浅析
摘抄:显而易见,stream.parallel.forEach()中执行的操作并非线程安全。如果需要线程安全,可以把集合转换为同步集合,即:Collections.synchronizedList(new ArrayList<>())。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容