为什么需要Stream
Java 8 中的Stream是对集合 (Collection) 对象功能的增强, 他专注于对集合对象进行各种非常便利,高效的聚合操作(aggregate operation), 或者大批量数据操作 (bulk data operation).
Stream API借助于同样新出现的Lambda 表达式, 极大的提高编程效率和程序可读性. 同时他提供穿行和并行两种模式进行汇聚操作, 并发模式能够成分利用多核处理器的优势, 使用fork/join 并行法师来拆分任务和加速处理过程.
通常编写并行代码很难而且容易出错, 但使用Steam API无需编写一行多线程的代码, 就可以很方便地写出高性能的并发代码。
几个例子:
1. Java 8的排序,取值实现
List<Integer> transactionsIds =
transactions.stream().filter(t -> t.getType() == Transaction.Type.GEOCERY)
.sorted(Comparator.comparing(Transaction::getValue).reversed())//排序
.map(Transaction::getId)//取出id组装新stream
.collect(Collectors.toList());
System.out.println(transactionsIds);//[6, 5, 3, 1]
2. 流转换为其他数据结构(collect)
一个Stream只可以使用一次,否则会报错
Stream stream = Stream.of("a1", "b1", "c1");
// 1. Array
String[] strArray1 = (String[]) stream.toArray(String[]::new);
for (String s : strArray1) { System.out.print(s); } //a1b1c1
// 2.Collection list
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
List<String> list1 = (List<String>) stream.collect(Collectors.toList());
for (String s : list1) { System.out.print(s); }//a1b1c1
// 2.Collection list
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
List<String> list2 = (List<String>) stream.collect(Collectors.toCollection(ArrayList::new));
for (String s : list2) { System.out.print(s); } //a1b1c1
// 2.Collection set
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
Set<String> set = (Set<String>) stream.collect(Collectors.toSet());
for (String s : set) { System.out.print(s); } //a1c1b1
// 2.Collection stack
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
Stack<String> stack = (Stack<String>) stream.collect(Collectors.toCollection(Stack::new));
for (String s : stack) { System.out.print(s); } //a1b1c1
// 3. String
stream = Stream.of("a1", "b1", "c1");// stream has already been operated upon or closed
String str = stream.collect(Collectors.joining()).toString();
System.out.print(str); // a1b1c1
String str = stream.collect(Collectors.joining("&")).toString();
System.out.print(str); // a1&b1&c1
String str = stream.collect(Collectors.joining("&","prefix","suffix")).toString();
System.out.print(str); // prefixa1&b1&c1suffix
//4. stream元素数量
Object str = stream.collect(Collectors.counting());
System.out.print(str); // 3
//5. groupby
Object str = stream.collect(Collectors.groupingBy(o -> o));
System.out.print(str); // {a1=[a1], c1=[c1], b1=[b1]}
Stream stream = Stream.of("a1", "a1", "b1", "c1");// 分组求和
Object str = stream.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
System.out.print(str); // {a1=2, c1=1, b1=1}
//自定义对象分组求和
Map<String, Integer> sumMap = fruitList.stream().collect.
(Collectors.groupingBy(Fruit::getName, Collectors.summingInt(Fruit::getPrice)));
3. 打印姓名 (forEach 和pre-java8的对比)
forEach 不能修改自己包含的本地变量值,也不能用break/return 之类的关键字提前结束循环,只能进行全部值迭代后处理
// JDK 8
roster.stream().filter(p -> p.gender == Person.Sex.MALE)
.forEach(p -> System.out.println(p.name));
// JDK 7
for (Person p : roster) {
if(p.gender == Person.Sex.MALE){
System.out.println(p.name);
}
}
4. peek 对每个元素执行操作并且返回一个新的Stream
【peek : 偷窥】注意执行顺序
Stream.of("one", "two", "three", "four")
.filter(p -> p.length() > 3)
.peek(v -> System.out.println("Filtered Value:" + v))
.map(String::toUpperCase)
.peek(v -> System.out.println("Mapped Value:" + v))
.collect(Collectors.toList());
// 1. Filtered Value:three
// 2. Mapped Value:THREE
// 3. Filtered Value:four
// 4. Mapped Value:FOUR
5.reduce的用例
使用方式说明
- 方式一
一个参数,reduce(BinaryOperator<T> accumulator)
方法需要一个函数式接口参数,该函数式接口需要两个参数,返回一个结果(reduce中返回的结果会作为下次累加器计算的第一个参数),也就是累加器。 - 方式二
2个参数,T reduce(T identity, BinaryOperator<T> accumulator)
提供一个跟Stream中数据同类型的初始值identity,通过累加器accumulator迭代计算Stream中的数据,得到一个跟Stream中数据相同类型的最终结果。 - 方式三
3个参数,<U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
reduce的第三个参数是在使用parallelStream的reduce操作时,合并各个流结果的,如果使用的是stream,所以第三个参数是不起作用的。
一或者二参数的使用示例
// 1. 求和 SUM 10
Integer sum = Stream.of(1, 2, 3, 4).reduce(0, (a, b) -> a + b);
sum = Stream.of(1, 2, 3, 4).reduce(0, Integer::sum); //有起始值
sum = Stream.of(1, 2, 3, 4).reduce(Integer::sum).get(); //无起始值
// 2. 最小值 minValue = -3.0
double minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MAX_VALUE, Double::min);
minValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::min).get();
// 2. 最大数值 maxValue = 1.0
double maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double.MIN_VALUE, Double::max);
maxValue = Stream.of(-1.5, 1.0, -3.0, -2.0).reduce(Double::max).get();
// 3. 字符串连接 Concat "ABCD"
String concat = Stream.of("A", "B", "C", "D").reduce("", String::concat);
// 4. 过滤和字符串连接 Filter & Concat = "ace"
concat = Stream.of("a", "B", "c", "D", "e", "F")
.filter(x -> x.compareTo("Z") > 0)
.reduce("", String::concat);
//5. Stringbuffer连接
StringBuffer reduce = target.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).
reduce(new StringBuffer(),
(stringBuffer, stringEntry) -> stringBuffer.append(stringEntry.getKey()).append("=").append(stringEntry.getValue().getClass()).append("&"),
StringBuffer::append);
三个参数进行并行计算reduce高级用法
需要排序的功能一定不要用并行计算,因为并行计算为多线程,返回结果进行combine后是乱序的。
基本用法示例
当Stream是并行时,第三个参数就有意义了,它会将不同线程计算的结果调用combiner做汇总后返回。
注意由于采用了并行计算,前两个参数与非并行时也有了差异!第一个参数如果是并行运算,那么每个线程都是从这个初始值开始运算,如果初始值不是0或者“”那么与单线程的初始值就有差异了。
举个简单点的例子,计算4+1+2+3的结果,其中4是初始值:
Integer reduce = Stream.of(1, 2, 3).parallel().reduce(4, (integer, integer2) -> integer + integer2
, new BinaryOperator<Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("reduce = " + reduce);
并行时的计算结果是18,而非并行时的计算结果是10!
为什么会这样?
先分析下非并行时的计算过程;第一步计算4 + 1 = 5,第二步是5 + 2 = 7,第三步是7 + 3 = 10。
并行计算时,线程之间没有影响,因此每个线程在调用第二个参数BiFunction进行计算时,直接都是使用result值当其第一个参数,因此计算过程现在是这样的:线程1:1 + 4 = 5;线程2:2 + 4 = 6;线程3:3 + 4 = 7;Combiner函数: 5 + 6 + 7 = 18!
如果初始值是0,那么并行计算和非并行是结果一样的。
Integer reduce = Stream.of(1, 2, 3).parallel().reduce(0, (integer, integer2) -> integer + integer2
, new BinaryOperator<Integer>() {
@Override
public Integer apply(Integer integer, Integer integer2) {
return integer + integer2;
}
});
System.out.println("reduce = " + reduce);
输出reduce = 6
特殊操作示例
- 转换成List,当然最简单的是
collect(Collectors.toList());
说明:如果第一个参数的类型是ArrayList等对象而非基本数据类型的包装类或者String,第三个函数的处理上可能容易引起误解。下面第三个参数
(l, s) -> l
的入参l和s一样都是第一个参数list,所以这里只返回了l。
按照第三个参数作用是:它会将不同线程计算的结果调用combiner做汇总后返回。那就会这样写l.addAll(s);
这个时候出来的结果与预期的结果就完全不一样了,要多了很多元素!
//parallelStream的并行流,是多线程的操作,结果不是按照流的数据顺序是乱序的。
ArrayList<String> reduce1 = target.entrySet().parallelStream().map(stringEntry -> stringEntry.getKey()).reduce(new ArrayList<String>(), (l, s) -> {
l.add(s);
return l;
}, (l, s) -> l);
System.out.println("reduce1 = " + reduce1);
输出reduce1 = [SELF-TASK, SELF, OTHER-TASK, OTHER]
- 第三个参数是把各个流结果合并为null,由于是parallelStream所以规则起作用,返回null。
//第三个参数是把各个流结果合并为null,由于是parallelStream所以规则起作用,返回null。
String reduce2 = target.entrySet().parallelStream().map(stringEntry -> stringEntry.getKey()).reduce("", (r, s) -> r.concat(s).concat(","), (r, s) -> null);
System.out.println("reduce2 = " + reduce2);
输出reduce2 = null
- 第三个参数是把各个流结果合并为null,由于是stream不是parallelStream所以规则不起作用。
//第三个参数是把各个流结果合并为null,由于是stream不是parallelStream所以规则不起作用。
val reduce5 = target.entrySet().stream().map(stringEntry -> stringEntry.getKey()).reduce("", (r, s) -> r.concat(s), (r, s) -> null);
System.out.println("reduce5 = " + reduce5);
输出reduce5 = OTHERSELF-TASKSELFOTHER-TASK
- 初始值为"",进行字符串连接,然后进行parallelStream()各个流的再连接。parallelStream所以出来的结果是乱序的。
//初始值为"",进行字符串连接,然后进行parallelStream()各个流的再连接
String reduce4 = target.entrySet().parallelStream().map(stringEntry -> stringEntry.getKey()).reduce("", (r, s) -> r.concat(s).concat(","), String::concat);
System.out.println("reduce4 = " + reduce4);
输出reduce4 = OTHER,SELF-TASK,SELF,OTHER-TASK,
- 简化版的字符串连接示例
//使用一个参数简化版的字符串连接。
val reduce3 = target.entrySet().parallelStream().map(stringEntry -> stringEntry.getKey()).reduce(String::concat);
System.out.println("reduce3 = " + reduce3);
输出reduce3 = Optional[OTHERSELF-TASKSELFOTHER-TASK]
- 迭代stream里面是否包含字符串“SELF”,主要演示在并行流情况下,reduce处理boolean型数据
//迭代stream里面是否包含字符串“SELF”
boolean flag = false;
Boolean f1 = target.entrySet().parallelStream().map(stringEntry -> stringEntry.getKey()).peek(s -> System.out.println("当前值:" + s + ",处理的线程:" + Thread.currentThread().getName())).reduce(flag, (f, s) -> {
f = Boolean.logicalOr(f, s.contains("SELF"));
System.out.println("当前boolean 值f = " + f + ",处理的线程:" + Thread.currentThread().getName());
return f;
}, Boolean::logicalOr);
System.out.println("f1 = " + f1);
- 第一次运行,输出
当前值:SELF-TASK,处理的线程:restartedMain
当前值:OTHER-TASK,处理的线程:ForkJoinPool.commonPool-worker-2
当前boolean 值f = true,处理的线程:restartedMain
当前值:OTHER,处理的线程:ForkJoinPool.commonPool-worker-1
当前值:SELF,处理的线程:restartedMain
当前boolean 值f = true,处理的线程:restartedMain
当前boolean 值f = true,处理的线程:ForkJoinPool.commonPool-worker-2
当前boolean 值f = true,处理的线程:ForkJoinPool.commonPool-worker-1
f1 = true
- 再一次运行,由于是多线程,输出不一样,但是最后的结果f1是一样的。
当前值:SELF-TASK,处理的线程:restartedMain
当前boolean 值f = true,处理的线程:restartedMain
当前值:SELF,处理的线程:restartedMain
当前值:OTHER,处理的线程:ForkJoinPool.commonPool-worker-2
当前boolean 值f = true,处理的线程:restartedMain
当前值:OTHER-TASK,处理的线程:ForkJoinPool.commonPool-worker-1
当前boolean 值f = false,处理的线程:ForkJoinPool.commonPool-worker-1
当前boolean 值f = false,处理的线程:ForkJoinPool.commonPool-worker-2
f1 = true
6. 我的一个实践
这个功能是使用在我微信小程序的后端接口,实现通过搜索名字来展示开发语言列表。
原代码
List<String> filterList = new ArrayList<String>();
languageList.forEach(c -> {
if (c.toLowerCase().contains(language_name.toLowerCase()))
filterList.add("guo"+c);
});
使用Stream后的代码
List<String> filterList = languageList.stream().
filter(s -> s.toLowerCase().contains(language_name.toLowerCase())).
map("guo"::concat).
collect(Collectors.toList());
一样的使用效果
访问:https://localhost:8888/v2/vue/api/programLanguage/getByName?language_name=c
返回:["C","C++"]
几个重要示例
List的排序、过滤、peek输出
@Autowried
List<? extends IBankRouterHandler> source;
List<? extends IBankRouterHandler> collect = source.stream().
filter(bankHandler -> StringUtils.isNotEmpty(bankHandler.getRouterChannel())).
sorted(Comparator.comparing(bankHandler -> bankHandler.getRouterChannel())).
peek(bankHandler -> System.out.println("class为 = " + bankHandler.getRouterChannel())).
collect(Collectors.toList());
List转Map(把对象的channel属性作为key,对象本身最为value)
Map<String, ? extends IBankRouterHandler> target = source.stream().
filter(bankHandler -> StringUtils.isNotEmpty(bankHandler.getRouterChannel())).
sorted(Comparator.comparing(bankHandler -> bankHandler.getRouterChannel())).
peek(bankHandler -> System.out.println("class为 = " + bankHandler.getRouterChannel())).
collect(Collectors.toMap(IBankRouterHandler::getRouterChannel, s -> s));//两个参数分别为Map的key和value。把对象的channel作为key,对象本身最为value。
Map排序
Map<String, ? extends IBankRouterHandler> target;
target.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).collect(Collectors.toMap(stringEntry -> stringEntry.getKey(), stringEntry -> stringEntry.getValue()));
Map转List
target.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(s -> s.getValue()).collect(Collectors.toList());
forEach
StringBuffer params = new StringBuffer();
target.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).forEach((s) -> params.append(s.getKey() + "=" + s.getValue().getClass() + "&"));
System.out.println("params = " + params.toString());
上面forEach的reduce(StringBuffer)等价实现
StringBuffer reduce = target.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).reduce(new StringBuffer(), (stringBuffer, stringEntry) -> stringBuffer.append(stringEntry.getKey()).append("=").append(stringEntry.getValue().getClass()).append("&"), StringBuffer::append);
System.out.println("paramsreduce = " + reduce.toString());
上面forEach的reduce(String)等价实现
String reduce1 = target.entrySet().stream().map(stringEntry -> stringEntry.getKey()).filter(key -> key.length() < 11).sorted(Comparator.comparing(s -> s)).reduce("", (s, s2) -> s.concat(s2).concat("&"), String::concat);
上面forEach的collector方法等价实现【更简单,推荐方法】
Map<String, String> params = new HashMap<String, String>();
params.put("age", "12");
params.put("location", "天津");
params.put("zoo", "cat");
params.put("name", "guo");
params.put("go", "school");
String collect1 = params.entrySet().stream().sorted(Comparator.comparing(Map.Entry::getKey)).map(entry -> entry.getKey() + "=" + entry.getValue()).collect(Collectors.joining("&"));
System.out.println("collect1 = " + collect1);
输出:collect1 = age=12&go=school&location=天津&name=guo&zoo=cat
自定义比较器在Stream的max、min方法
Optional<? extends IBankRouterHandler> max = target.entrySet().stream().map(stringEntry -> stringEntry.getValue()).max((o1, o2) -> o1.getRouterChannel().compareTo(o2.getRouterChannel()));