Java8 Stream流操作总结

Java List操作1(分片 partition)
Java List操作2(分组group by)
Java List操作3(获取list中bean对象中的某一列值—map)
Java List操作4(where,filter——过滤)
Java List操作5(sort—排序) Java List操作6(distinct—去重)

以前总结过使用java8 stream流操作处理List的方法,现在从stream流角度重新梳理一下stream流特性。

1、介绍 从Java1.8开始提出了Stream流的概念,侧重对于源数据计算能力的封装。

Stream 流操作可以分为 3 种类型:

  • 创建 Stream

  • Stream 中间处理

  • 终止 Steam

中间处理只是一种标记,只有终止操作才会触发实际计算。 中间操作又可以分为无状态的(Stateless)和有状态的(Stateful),无状态中间操作是指元素的处理不受前面元素的影响,而有状态的中间操作必须等到所有元素处理之后才知道最终结果,比如排序是有状态操作,在读取所有元素之前并不能确定排序结果; 结束操作又可以分为短路操作和非短路操作,短路操作是指不用处理全部元素就可以返回结果,比如 找到第一个满足条件的元素。之所以要进行如此精细的划分,是因为底层对每一种情况的处理方式不同。

image.png

2、创建 Stream

  • stream() : 创建一个新的stream串行流对象

  • parallelStream():创建一个可并行执行的stream流对象

  • Stream.of()/Stream.iterate()/Stream.generate():使用Stream的静态方法创建一个新的stream串行流对象


//集合创建串行流
Stream<String> stream = Arrays.asList("a", "b", "c").stream();

//创建并行流
Stream<String> parallelStream = Arrays.asList("a", "b", "c").parallelStream();

//使用Stream的静态方法
Stream<Integer> stream3 = Stream.of(1, 2, 3, 4, 5, 6);
Stream<Integer> stream4 = Stream.iterate(0, (x) -> x + 3).limit(4);
Stream<Double> stream5 = Stream.generate(Math::random).limit(3);
image

2.1 并行流

使用并行流可以有效利用计算机的多 CPU 硬件,提升逻辑的执行速度。并行流通过将一整个 stream 划分为多个片段,然后对各个分片流并行执行处理逻辑,最后将各个分片流的执行结果汇总为一个整体流。

图片

并行流类似于多线程在并行处理,所以与多线程场景相关的一些问题同样会存在,比如死锁等问题,所以在并行流终止执行的函数逻辑,必须要保证线程安全。

3、Stream 中间处理

负责对 Stream 进行处理操作,并返回一个新的 Stream 对象,中间管道操作可以进行叠加。

image.png

4、终止 Steam

通过终止Stream操作之后,Stream 流将会结束,最后可能会执行某些逻辑处理,或者是按照要求返回某些执行后的结果数据。

一旦一个 Stream 被执行了终止操作之后,后续便不可以再读这个流执行其他的操作了,否则会报错。

image.png

5、Stream收集器Collectors

Stream 主要用于对集合数据的处理场景,所以除了上面几种获取简单结果的终止方法之外,更多的场景是获取一个集合类的结果对象,比如 List、Set 或者 HashMap 等。

这里就需要 collect 方法出场了,collect是Stream流的一个终止方法,会使用传入的收集器(入参)对结果执行相关的操作,这个收集器必须是Collector接口的某个具体实现类,通常我们使用Collectors,Collectors是一个工具类,提供了很多的静态工厂方法,提供了很多Collector接口的具体实现类,是为了方便程序员使用而预置的一些较为通用的收集器(如果不使用Collectors类,而是自己去实现Collector接口,也可以)。

Stream结果收集操作的本质,其实就是将Stream中的元素通过收集器定义的函数处理逻辑进行加工,然后输出加工后的结果

Collectors里常用搜集器如下:

[图片上传失败...(image-f61f89-1675853638951)]

[图片上传失败...(image-d0773e-1675853638951)]

初始案例:

List<String> servers = new ArrayList<>();
servers.add("Felordcn");
servers.add("Tomcat");
servers.add("Jetty");
servers.add("Undertow");
servers.add("Resin");

5.1 类型归纳

这是一个系列,作用是将元素分别归纳进可变容器 ListMapSetCollection 或者ConcurrentMap

//Collectors.toList();
 //Collectors.toMap();
 //Collectors.toSet();
 //Collectors.toCollection();
 //Collectors.toConcurrentMap();

List<String> list = servers.stream().collect( Collectors.toList());</pre>

5.2 joining连接

将元素以某种规则连接起来。该方法有三种重载 joining(CharSequence delimiter)joining(CharSequence delimiter,CharSequence prefix,CharSequence suffix)

 //   输出 FelordcnTomcatJettyUndertowResin
 servers.stream().collect(Collectors.joining());

 //   输出 Felordcn,Tomcat,Jetty,Undertow,Resin
 servers.stream().collect(Collectors.joining("," ));

 //   输出 [Felordcn,Tomcat,Jetty,Undertow,Resin]
 servers.stream().collect(Collectors.joining(",", "[", "]")); </pre>

用的比较多的是读取 HttpServletRequest 中的 body

HttpServletRequest.getReader().lines().collect(Collectors.joining());

5.3 groupingBy 聚合

按照条件对元素进行分组,和 SQL 中的 group by 用法有异曲同工之妙,通常也建议使用 Java 进行分组处理以减轻数据库压力。groupingBy 也有三个重载方法 我们将 servers 按照长度进行分组:

// 按照字符串长度进行分组,符合条件的元素将组成一个 List 映射到以条件长度为key 的 Map<Integer, List<String>> 中
 Map<Integer, List<String>> listMap = servers.stream().collect(Collectors.groupingBy(String::length));
// 生成Map<Integer, Set<String>> 
 Map<Integer, Set<String>> setMap = servers.stream().collect(Collectors.groupingBy(String::length, Collectors.toSet()));

我要考虑同步安全问题怎么办? 当然使用线程安全的同步容器啊,那前两种都用不成了吧! 看源码,其实第二种等同于下面的写法:

Supplier<Map<Integer,Set<String>>> mapSupplier = HashMap::new;
 Map<Integer,Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));

这就非常好办了,我们提供一个同步 Map 不就行了,于是问题解决了:

 Supplier<Map<Integer, Set<String>>> mapSupplier = () -> Collections.synchronizedMap(new HashMap<>());
 Map<Integer, Set<String>> collect = servers.stream.collect(Collectors.groupingBy(String::length, mapSupplier, Collectors.toSet()));

其实同步安全问题 Collectors 的另一个方法 groupingByConcurrent 给我们提供了解决方案。用法和 groupingBy 差不多。

5.4 collectingAndThen

该方法先执行了一个归纳操作,然后再对归纳的结果进行 Function 函数处理输出一个新的结果。

 // 比如我们将servers joining 然后转成大写,结果为: FELORDCN,TOMCAT,JETTY,UNDERTOW,RESIN 
 servers.stream.collect(Collectors.collectingAndThen(Collectors.joining(","), String::toUpperCase));

5.5 partitioningBy 分区

分区是分组的特殊情况,由一个谓词(返回一个布尔值的函数)作为分类函数.所以返回的Map集合只有两个key,一个true,一个false.

// 长度大于5  {false=[Jetty, Resin], true=[Felordcn, Tomcat, Undertow]
Map<Boolean, List<String>> map = servers.stream().collect(partitioningBy(e -> e.length() > 5));

// 长度大于5  且  按照长度分组 ,即先分区再分组
//{false={5=[Jetty, Resin]}, true={6=[Tomcat], 8=[Felordcn, Undertow]}}
Map<Boolean, Map<Integer, List<String>>> map2 = servers.stream().collect(partitioningBy(e -> e.length() > 5, groupingBy(String::length)));

5.6 counting 统计元素的的数量

该方法归纳元素的的数量,非常简单,不再举例说明。

// 5  
long size = servers.stream().collect(Collectors.counting ());

3.7 maxBy/minBy 查找大小元素

这两个方法分别提供了查找大小元素的操作,它们基于比较器接口 Comparator 来比较 ,返回的是一个 Optional 对象。 我们来获取 servers 中最小长度的元素:

// Jetty  
Optional<String> min = servers.stream().collect(Collectors.minBy(Comparator.comparingInt(String::length)));

这里其实 Resin 长度也是最小,这里遵循了 "先入为主" 的原则 。当然 Stream.min() 可以很方便的获取最小长度的元素。maxBy 同样的道理。

5.8 summingInt/Double/Long 累加计算

用来做累加计算。计算元素某个属性的总和,类似 Mysqlsum 函数,比如计算各个项目的盈利总和、计算本月的全部工资总和等等。我们这里就计算一下 servers 中字符串的长度之和 (为了举例不考虑其它写法)。

 // 总长度 32 
 servers.stream.collect(Collectors.summingInt(s -> s.length()));

5.9 summarizingInt/Double/Long 统计数据

如果我们对 5.6章节-5.8章节 的操作结果都要怎么办?难不成我们搞5个 Stream 流吗? 所以就有了 summarizingIntsummarizingDoublesummarizingLong 三个方法。 这三个方法通过对元素某个属性的提取,会返回对元素该属性的统计数据对象,分别对应 IntSummaryStatisticsDoubleSummaryStatisticsLongSummaryStatistics。我们对 servers 中元素的长度进行统计:

DoubleSummaryStatistics doubleSummaryStatistics = servers.stream.collect(Collectors.summarizingDouble(String::length));
 // {count=5, sum=32.000000, min=5.000000, average=6.400000, max=8.000000}
 System.out.println("doubleSummaryStatistics.toString() = " + doubleSummaryStatistics.toString());

结果 DoubleSummaryStatistics 中包含了 总数,总和,最小值,最大值,平均值 五个指标。

5.10 mapping

该方法是先对元素使用 Function 进行再加工操作,然后用另一个Collector 归纳。比如我们先去掉 servers 中元素的首字母,然后将它们装入 List

 // [elordcn, omcat, etty, ndertow, esin]
 servers.stream.collect(Collectors.mapping(s -> s.substring(1), Collectors.toList()));

有点类似 Stream 先进行了 map 操作再进行 collect

 servers.stream.map(s -> s.substring(1)).collect(Collectors.toList());

5.11 reducing

这个方法非常有用!但是如果要了解这个就必须了解其参数 BinaryOperator<T> 。 这是一个函数式接口,是给两个相同类型的量,返回一个跟这两个量相同类型的一个结果,伪表达式为 (T,T) -> T。默认给了两个实现 maxByminBy ,根据比较器来比较大小并分别返回最大值或者最小值。当然你可以灵活定制。然后 reducing 就很好理解了,元素两两之间进行比较根据策略淘汰一个,随着轮次的进行元素个数就是 reduce 的。那这个有什么用处呢? Java 官方给了一个例子:统计每个城市个子最高的人。

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Map<String, Optional<Person>> tallestByCity = people.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(BinaryOperator.maxBy(byHeight))));

结合最开始给的例子你可以使用 reducing 找出最长的字符串试试。

上面这一层是根据 Height 属性找最高的 Person ,而且如果这个属性没有初始化值或者没有数据,很有可能拿不到结果所以给出的是 Optional<Person>。 如果我们给出了 identity 作一个基准值,那么我们首先会跟这个基准值进行 BinaryOperator 操作。 比如我们给出高于 2 米 的人作为 identity。 我们就可以统计每个城市不低于 2 米 而且最高的那个人,当然如果该城市没有人高于 2 米则返回基准值identity

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Person identity= new Person();
 identity.setHeight(2.);
 identity.setName("identity");
 Map<String, Person> collect = persons.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, BinaryOperator.maxBy(byHeight))));

这时候就确定一定会返回一个 Person 了,最起码会是基准值identity 不再是 Optional

还有些情况,我们想在 reducing 的时候把 Person 的身高先四舍五入一下。这就需要我们做一个映射处理。定义一个 Function<? super T, ? extends U> mapper 来干这个活。那么上面的逻辑就可以变更为:

 Comparator<Person> byHeight = Comparator.comparing(Person::getHeight);
 Person identity = new Person();
 identity.setHeight(2.);
 identity.setName("identity");
 // 定义映射 处理 四舍五入
 Function<Person, Person> mapper = ps -> {
 Double height = ps.getHeight();

 BigDecimal decimal = new BigDecimal(height);
 Double d = decimal.setScale(1, BigDecimal.ROUND_HALF_UP).doubleValue();
 ps.setHeight(d);
 return ps;
 };
 Map<String, Person> collect = persons.stream()
 .collect(Collectors.groupingBy(Person::getCity, Collectors.reducing(identity, mapper, BinaryOperator.maxBy(byHeight))));

6 parallelStream并发流线程安全问题

parallelStream是以多线程的方式,执行定义的代码块。parallelStream中使用的是ForkJobTask。Fork/Join的框架是通过把一个大任务不断fork成许多子任务,然后多线程执行这些子任务,最后再Join这些子任务得到最终结果。

因为是多线程,所以在代码块里操作线程不安全的Collection,就会引发Concurrency问题。比如

public class TestParallelStream {
    public static void main(String[] args) {
        List<String> alist = new ArrayList<String>(Arrays.asList("1","2","3","4","5","6","7"));
        for(int i=0;i<10000;i++) {
            Map<String, String> result = new HashMap<String, String>();
            alist.parallelStream().forEach(item->{
                result.put(item, item);
            });
            System.out.println("i="+i+",map大小:"+result.size());
        }
    }
}


结果如下
......
i=5678,map大小:7
i=5679,map大小:7
i=5680,map大小:6      //出现map只有6个元素的情况了
i=5681,map大小:7
......

从程序上看,就是先将alist集合fork成多段,然后多线程添加到HashMap中,而HashMap的add方法并不能保证原子性。

方法一:将parallelStream改成stream、foreach,串行执行

但是这样其实就失去了优化

方法二:给collection上锁

//对要存储元素的map要求线程安全
Map<String, String> result = new HashMap<String, String>();
修改为
Map<String, String> result = Collections.synchronizedMap(new HashMap());
Map<String, String> result = new ConcurrentHashMap<>(new HashMap());

方法三:使用java8中的收集器(使用parallelStream后使用collect )

我们使用外面的集合,无非是为了收集元素。Java8 Stream的collect方法,就是收集Stream里的元素,返回List,Set或Map等,并且它是线程安全的。

List<String> results = Collections.synchronizedList(new ArrayList<>());
sources.parallelStream().forEach(source -> {  
  results.add(sigmaString(source));  
});    

使用用collect改写上面的代码:

List<String> results = sources.parallelStream()  
  .flatMap(source -> Stream.of(sigmaString(source)))  
  .collect(Collectors.toList());  
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,864评论 6 494
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,175评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,401评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,170评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,276评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,364评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,401评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,179评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,604评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,902评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,070评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,751评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,380评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,077评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,312评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,924评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,957评论 2 351