Java8 ParallelStream流

前言

并行编程势不可挡,Java从1.7开始就提供了Fork/Join 支持并行处理。java1.8 进一步加强。

并行处理就是将任务拆分子任务,分发给多个处理器同时处理,之后合并。

一、ForkJoinPool

ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。

1.1 work-stealing(工作窃取算法)

work-stealing(工作窃取):ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。

1.2 常用方法

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务。

ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

线程池监控

在线程池使用监控方面,主要通过如下方法:

  • isTerminated:判断线程池对应的workQueue中是否有待执行任务未执行完;
  • awaitTermination:判断线程池是否在约定时间内完成,并返回完成状态;
  • getQueuedSubmissionCount:获取所有待执行的任务数;
  • getRunningThreadCount:获取正在运行的任务数。

1.3 例子

提交有返回值的任务

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

/**
 * @Description 提交有返回值的任务
 */
 
public class ForkJoinRecursiveTask {

    /**
     * 最大计算数
     */
    private static final int MAX_THRESHOLD = 100;

    public static void main(String[] args) {
        //创建ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool();
        //异步提交RecursiveTask任务
        ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecursiveTask(0, 1000));
        try {
            //根据返回类型获取返回值
            Integer result = forkJoinTask.get();
            System.out.println("结果为:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }

    private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
        private final int start;
        private final int end;

        public CalculatedRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            //判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
            if ((end - start) <= MAX_THRESHOLD) {
                //返回[start,end]的总和
                return IntStream.rangeClosed(start, end).sum();
            } else {
                //任务分割
                int middle = (end + start) / 2;
                CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
                CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
                //执行
                task1.fork();
                task2.fork();
                //等待返回结果
                return task1.join() + task2.join();
            }
        }
    }
}

提交无返回值的任务

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

/**
 * @Description 提交无返回值的任务
 */

public class ForkJoinRecursiveAction {

    /**
     * 最大计算数
     */
    private static final int MAX_THRESHOLD = 100;
    private static final AtomicInteger SUM = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
        //创建ForkJoinPool
        ForkJoinPool pool = new ForkJoinPool();
        //异步提交RecursiveAction任务
        pool.submit(new CalculatedRecursiveTask(0, 1000));
        //等待3秒后输出结果,因为计算需要时间
        pool.awaitTermination(1, TimeUnit.SECONDS);
        System.out.println("结果为:" + SUM);
        pool.shutdown();
    }

    private static class CalculatedRecursiveTask extends RecursiveAction {
        private final int start;
        private final int end;

        public CalculatedRecursiveTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected void compute() {
            //判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
            if ((end - start) <= MAX_THRESHOLD) {
                //因为没有返回值,所有这里如果我们要获取结果,需要存入公共的变量中
                SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
            } else {
                //任务分割
                int middle = (end + start) / 2;
                CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
                CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
                //执行
                task1.fork();
                task2.fork();
            }
        }
    }
}

虽然ForkJoin实际的代码非常复杂,但是通过这个例子应该了解到ForkJoinPool底层的分治算法和工作窃取原理。ForkJoin不仅在java8之后的stream中广泛使用。golang等其他语言的协程机制,也是采用类似的原理来实现的。

二、Stream API

Java 8 引入了许多特性,Stream API是其中重要的一部分。区别 InputStream OutputStream,Stream API 是处理对象流而不是字节流。

执行原理如下,流分串行和并行两种执行方式


// 串行执行流
stream().filter(e -> e > 10).count();

// 并行执行流
.parallelStream().filter(e -> e > 10).count()

三、ParallelStreams

Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEach(out::println);

3.1 ParallelStreams 执行原理

并行流就是一个把内容拆分成多个数据块,用不同线程分别处理每个数据块的流。对收集源调用parallelStream方法就能将集合转换为并行流。

并行执行时,java将流划分为多个子流,分散在不同CPU并行处理,然后进行合并。

并行比串行更快,取决于两方面条件:

  • 处理器核心数量,并行处理核心数越多自然处理效率会更高。
  • 处理的数据量越大,优势越强。

3.2 ParallelStreams注意事项

3.2.1 因为是并行流,所以所涉及到的数据结构,需要使用线程安全的,比如

listByPage.parallelStream().forEach(str-> {
           //使用线程安全的数据结构
           //ConcurrentHashMap
           //CopyOnWriteArrayList
           //等等进行操作
        });

3.2.2 默认优先用在CPU密集型计算中

  • 用在IO密集比如HTTP请求也可以,实际使用是情况而定。

由于默认并行流使用的是全局的线程池,线程数量是根据cpu核数设置的,所以如果某个操作占用了线程,将影响全局其他使用并行流的操作。默认情况,fork/join 池会为每个处理器分配一个线程。

所以折中的方案是自定义线程池来执行某个并行流操作

public static void main(String[] args) throws Exception {
    List<String> list = Arrays.asList("1", "2", "3", "4", "5");
    
    ForkJoinPool forkJoinPool = new ForkJoinPool(10);
    forkJoinPool.execute(() -> {
        list.parallelStream().forEach(number -> {
            try {
                System.out.println(Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            
            }
        });
    });

    ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
    ForkJoinTask<?> forkJoinTask = forkJoinPool2.submit(() -> {
        list.parallelStream().forEach((number) -> {
            try {
                System.out.println(Thread.currentThread().getName());
                Thread.sleep(5000);
            } catch (InterruptedException e) {
            
            }
        });
    });
    
    // 阻塞,等线程执行完毕
    //forkJoinTask.get();

    // 阻止主线程关闭
    Thread.sleep(10000L);
}

3.2.3 使用并行流时,不要使用collectors.groupingBy、collectors.toMap

使用并行流时,不要使用collectors.groupingBy、collectors.toMap,替代为collectors.groupingByConcurrent、collectors.toConcurrentMap,或直接使用串行流。

原因,并行流执行时,通过操作Key来合并多个map的操作比较昂贵。详细大家可以查看官网介绍。


https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#concurrent_reduction

Map<String, List<Person>> byGender = roster.stream()
    .collect(Collectors.groupingBy(Person::getGender));

ConcurrentMap<String, List<Person>> byGender = roster.parallelStream()
    .collect(Collectors.groupingByConcurrent(Person::getGender));

ParallelStreams 默认使用 ForkJoinPool.commonPool()线程池。

注意:默认情况下,你写的 ParallelStreams 都是通过该线程池调度执行,整个应用程序都共享这个线程池。

针对 Stream API 一些局限性,Github上有个开源库做了补充。
https://github.com/pivovarit/parallel-collectors

四、并行流和顺序流转换

parallel 和 sequential

Integer reduce = Stream.iterate(0, n -> n + 2).limit(10000).reduce(1, Integer::sum);
// 将顺序流转化为并行流
Integer reduce1 = Stream.iterate(0, n -> n + 2).limit(10000).parallel().reduce(1, Integer::sum);
// 将并行流转为顺序流
Integer reduce2 = Stream.iterate(0, n -> n + 2).limit(10000).parallel().map(integer -> integer + 2).sequential().reduce(1, Integer::sum);

最后一次parallel或sequential调用会影响整个流水线

配置并行流使用的线程池:

  • 1、并行流内部使用了默认的ForkJoinPool。它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
  • 2、可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来修改线程池大小
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
System.out.println( System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
  • 3、这是一个全局设置,因此它会对代码中所有的并行流产生影响。反过来说,目前我们还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很充足的理由,否则强烈建议你不要修改它。

正确的姿势使用并行流

并行流并不总是比顺序流快。所以正确的姿势使用并行流是尤为重要的,不然适得其反。

决定某个特定情况下是否有必要使用并行流。可以参考一下几点建议

  • 1、如果有疑问,测量。并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,很重要的建议就是用适当的基准来检查其性能。

  • 2、留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream和DoubleStream)来避免这种操作,但凡有可能都应该用这些流

  • 3、有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的N个元素而不是专门要前N个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。

  • 4、考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。

  • 5、对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。

  • 6、考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。可以参考一下表格:

数据源 性能
ArrayList 极佳
LinkedList
IntStrean.range 极佳
Strean.iterate
HashSet
TreeSet
  • 7、流自身的特点以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数无法预测,从而导致流本身的大小未知。

  • 8、还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

SpringBoot 配置ForkJoinPool并行度

/**
 * @author: huangyibo
 * @Date: 2022/3/21 10:39
 * @Description: 线程池配置
 */

@Configuration
@EnableAsync
public class TaskPoolConfig {


    /**
     * 异步执行线程池————ForkJoinPool
     * @return
     */
    @Bean("asyncForkJoinPoolExecutor")
    public ExecutorService asyncForkJoinPoolExecutor() {
        return new ForkJoinPool(16, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
    }


    /**
     * 配置默认 stream并行流线程池并行度
     * @return
     */
    @Bean
    public String parallelStreamConfig() {
        return System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "16");
    }
}

参考:
https://www.cnblogs.com/sky233/p/13052380.html

https://blog.csdn.net/niyuelin1990/article/details/78658251

https://blog.csdn.net/qq_34748010/article/details/124533694

https://www.cnblogs.com/GGuoLiang/p/13616999.html

https://blog.csdn.net/weixin_43144460/article/details/119250381

https://blog.csdn.net/myth_g/article/details/118787482

https://www.pudn.com/news/62adbd9bdfc5ee19687d169e.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,295评论 6 512
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,928评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,682评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,209评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,237评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,965评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,586评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,487评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,016评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,136评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,271评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,948评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,619评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,139评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,252评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,598评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,267评论 2 358