stream 流
概念
流是Java API的新成员,它允许你以声明性方式处理数据集合(通过查询语句来表达,而不 是临时编写一个实现)。就现在来说,你可以把它们看成遍历数据集的高级迭代器。此外,==流还可以透明地并行处理,你无需写任何多线程代码了!==
举例对比
定义菜及菜单
package com.wenx.unit2.chapter4;
public class Dish {
//名称
private final String name;
//是否素食
private final boolean vegetarian;
//热量
private final int calories;
//类型
private final Type type;
public Dish(String name, boolean vegetarian, int calories, Type type) {
this.name = name;
this.vegetarian = vegetarian;
this.calories = calories;
this.type = type;
}
public String getName() {
return name;
}
public boolean isVegetarian() {
return vegetarian;
}
public int getCalories() {
return calories;
}
public Type getType() {
return type;
}
@Override
public String toString() {
return name;
}
//肉、鱼、其它
public enum Type {
MEAT, FISH, OTHER
}
}
//菜单
List<Dish> menu = Arrays.asList(new Dish("pork", false, 800, Dish.Type.MEAT),
new Dish("beef", false, 700, Dish.Type.MEAT),
new Dish("chicken", false, 400, Dish.Type.MEAT),
new Dish("french fries", true, 530, Dish.Type.OTHER),
new Dish("rice", true, 350, Dish.Type.OTHER),
new Dish("season fruit", true, 120, Dish.Type.OTHER),
new Dish("pizza", true, 550, Dish.Type.OTHER),
new Dish("prawns", false, 300, Dish.Type.FISH),
new Dish("salmon", false, 450, Dish.Type.FISH));
从菜单中筛选低热量的菜肴名称,并按照卡路里排序
传统写法
List<Dish> lowCaloricDishes = new ArrayList<>();
//用累加器筛 选元素
for(Dish d: menu){
if(d.getCalories() < 400){
lowCaloricDishes.add(d);
}
}
//用匿名类对 菜肴排序
Collections.sort(lowCaloricDishes,new Comparator<Dish>(){
public int compare (Dish d1, Dish d2){
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});
List<String> lowCaloricDishesName = new ArrayList<>();
//处理排序后的菜名列表
for(Dish d: lowCaloricDishes){
lowCaloricDishesName.add(d.getName());
}
java8
import static java.util.Comparator.comparing;
import static java.util.stream.Collectors.toList;
List<String> lowCaloricDishesName =
menu.stream()
.filter(d -> d.getCalories() < 400) //选出400卡路里 以下的菜肴
.sorted(comparing(Dish::getCalories)) //按照卡路 里排序
.map(Dish::getName) //提取菜肴的名称
.collect(toList()); //将所有名称保 存在List中
//多核并行执行
List<String> lowCaloricDishesName =
menu.parallelStream() //开启并行
.filter(d -> d.getCalories() < 400) //选出400卡路里 以下的菜肴
.sorted(comparing(Dish::getCalories)) //按照卡路 里排序
.map(Dish::getName) //提取菜肴的名称
.collect(toList()); //将所有名称保 存在List中
说明
- 代码是以声明性方式写的:说明想要完成什么(筛选热量低的菜肴)而不是说明如何实现一个操作(利用循环和if条件等控制流语句)
- 你可以把几个基础操作链接起来,来表达复杂的数据处理流水线(在filter后面接上 sorted、map和collect操作),同时保持代码清晰可读。
filter、sorted、map和collect等操作是与具体线程模型无关的高层次构件,所以 它们的内部实现可以是单线程的,也可能透明地充分利用你的多核架构!在实践中,这意味着你 用不着为了让某些数据处理任务并行而去操心线程和锁了,Stream API都替你做好了!
流的特点
- 声明性——更简洁,更易读
- 可复合——更灵活
- 可并行——性能更好
定义:从支持数据处理操作的源生成的元素序列
- 元素序列:就像集合一样,流也提供了一个接口,可以访问特定元素类型的一组有序值。因为集合是数据结构,所以它的主要目的是以特定的时间/空间复杂度存储和访问元素(如ArrayList 与 LinkedList)。==集合讲的是数据,流讲的是计算。==
- 源:流会使用一个提供数据的源,如集合、数组或输入/输出资源。 请注意,从有序集 合生成流时会保留原有的顺序。由列表生成的流,其元素顺序与列表一致。
- 数据处理操作——流的数据处理功能支持类似于数据库的操作,以及函数式编程语言中 的常用操作,如filter、map、reduce、find、match、sort等。流操作可以顺序执 行,也可并行执行。
- 流水线:很多流操作本身会返回一个流,这样多个操作就可以链接起来,形成一个大 的流水线。
- 内部迭代:与使用迭代器显式迭代的集合不同,流的迭代操作是在背后进行的。
代码
List<String> menuNameList =
menu.stream() //建立操作流水线
.filter(dish -> dish.getCalories() >300) //首先选出高热量的菜肴
.limit(3) //只选择头三个
.map(Dish::getName) //获取菜名
.collect(toList()); //将结果保存在另一个List中
先是对menu调用stream方法,由菜单得到一个流。数据源是菜肴列表(菜 单),它给流提供一个元素序列。接下来,对流应用一系列数据处理操作:filter、map、limit 和collect。除了collect之外,所有这些操作都会返回另一个流,这样它们就可以接成一条流 水线,于是就可以看作对源的一个查询。最后,collect操作开始处理流水线,并返回结果(它 和别的操作不一样,因为它返回的不是流,在这里是一个List)。在调用collect之前,没有任 何结果产生,实际上根本就没有从menu里选择元素。
- filter——接受Lambda,从流中排除某些元素。
- map——接受一个Lambda,将元素转换成其他形式或提取信息。在本例中,通过传递方 法引用Dish::getName,相当于Lambda d -> d.getName(),提取了每道菜的菜名。
- limit——截断流,使其元素不超过给定数量。
- collect——将流转换为其他形式。
==请注意,和迭代器类似,流只能遍历一次。遍历完之后,我们就说这个流已经被消费掉了。 你可以从原始数据源那里再获得一个新的流来重新遍历一遍,就像迭代器一样(这里假设它是集 合之类的可重复的源,如果是I/O通道就没戏了)。==
错误示例
List<String> title = Arrays.asList("Java8", "In", "Action");
Stream<String> s = title.stream();
s.forEach(System.out::println);
s.forEach(System.out::println); //java.lang.IllegalStateException:流已被操作或关闭
内部迭代与外部迭代
//外部迭代
1.
List<String> names = new ArrayList<>();
for(Dish d: menu){
names.add(d.getName());
}
2.
List<String> names = new ArrayList<>();
Iterator<String> iterator = menu.iterator();
while(iterator.hasNext()) {
Dish d = iterator.next();
names.add(d.getName());
}
//内部迭代 --并行
List<String> names = menu.stream().map(Dish::getName).collect(toList());
流操作
流分为中间操作和终端操作,
可以连接起来的流操作称为中间操作,关闭流的操作称为终端操作。
使用流的三件事:
- 一个数据源(如集合)来执行一个查询;
- 一个中间操作链,形成一条流的流水线;
- 一个终端操作,执行流水线,并能生成结果。
中间操作
操作 | 类型 | 返回类型 | 操作参数 | 函数描述符 |
---|---|---|---|---|
filter | 中间 | Stream<T> | Predicate<T> | T -> boolean |
map | 中间 | Stream<R> | Function<T, R> | T -> R |
limit | 中间 | Stream<T> | ||
sorted | 中间 | Stream<T> | Comparator<T> | (T, T) -> int |
distinct | 中间 | Stream<T> |
终端操作
操作 | 类型 | 目的 |
---|---|---|
forEach | 终端 | 消费流中的每个元素并对其应用 Lambda。这一操作返回 void |
count | 终端 | 返回流中元素的个数。这一操作返回 long |
collect | 终端 | 把流归约成一个集合,比如 List、Map 甚至是 Integer。 |
使用流
//过滤素菜
List<Dish> vegetarianMenu = menu.stream() .filter(Dish::isVegetarian).collect(toList());
//获取偶数并去重
List<Integer> numbers = Arrays.asList(1, 2, 1, 3, 3, 2, 4);
numbers.stream().filter(i -> i % 2 == 0).distinct().forEach(System.out::println);
//跳过前3个
List<String> menuNameList =
menu.stream()
.filter(dish -> dish.getCalories() >300)
.skip(3)
.collect(toList());
//单词列表,返回单词长度
List<String> words = Arrays.asList("Java 8", "Lambdas", "In", "Action");
List<Integer> wordLengths = words.stream()
.map(String::length)
.collect(toList());
//所有满足
boolean isHealthy = menu.stream().allMatch(d ->d.getCalories() < 1000);
//所有不满足
boolean isHealthy = menu.stream().noneMatch(d ->d.getCalories() < 1000);
//任意一个满足
boolean isHealthy = menu.stream().anyMatch(d ->d.getCalories() < 1000);
//查找任意一个
Optional<Dish> dish =menu.stream().filter(Dish::isVegetarian).findAny();
//查找第一个
Optional<Dish> dish =menu.stream().filter(Dish::isVegetarian).findFirst();
规约
//使用 for-each 循环来对数字列表中的元素求和
int sum = 0;
for (int x : numbers) {
sum += x;
}
reduce
int sum = numbers.stream().reduce(0, (a, b) -> a + b);
- 一个初始值,这里是0
- 一个 BinaryOperator<T> 来将两个元素结合起来产生一个新值,这里用的是
lambda (a, b) -> a + b
int product = numbers.stream().reduce(1, (a, b) -> a * b);
Optional<Integer> sum = numbers.stream().reduce((a, b) -> (a + b));
最大值最小值
Optional<Integer> max = numbers.stream().reduce(Integer::max);
Optional<Integer> min = numbers.stream().reduce(Integer::min);
获取数字的个数、最小值、最大值、总和以及平均值
List<Integer> primes = Arrays.asList(2, 3, 5, 7, 11, 13, 17, 19, 23, 29);
IntSummaryStatistics stats = primes.stream().mapToInt((x) -> x).summaryStatistics();
System.out.println("Highest prime number in List : " + stats.getMax());
System.out.println("Lowest prime number in List : " + stats.getMin());
System.out.println("Sum of all prime numbers : " + stats.getSum());
System.out.println("Average of all prime numbers : " + stats.getAverage());
一些示例
Optional<Dish> dishOptional = menu.stream().max(comparingInt(Dish::getCalories));
IntSummaryStatistics intSummaryStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));
Integer collect = menu.stream().collect(summingInt(Dish::getCalories));
String collect1 = menu.stream().map(Dish::getName).collect(joining(","));
System.out.println("collect1 = " + collect1);
Integer collect2 = menu.stream().collect(reducing(0, Dish::getCalories, (i, j) -> i + j));
Integer collect3 = menu.stream().mapToInt(Dish::getCalories).reduce(0, (i, j) -> i + j);
List<Dish> collect4 = menu.stream()
.sorted(comparing(dish -> dish.getName().length(),reverseOrder())).collect(toList());
collect4.forEach(dish -> System.out.println(dish.getName()));
数值范围流
//偶数流
IntStream evenNumbers = IntStream.rangeClosed(1, 100).filter(n -> n % 2 == 0);
System.out.println(evenNumbers.count());
请注意,比较一下,如果改用 IntStream.range(1, 100) ,则结果将会是 49 个偶数,因为 range 是不包含结束值的。
构建流
Stream<String> stream = Stream.of("Java 8 ", "Lambdas ", "In ", "Action");
stream.map(String::toUpperCase).forEach(System.out::println);
//空流
Stream<String> emptyStream = Stream.empty();
//数组创建流
int[] numbers = {2, 3, 5, 7, 11, 13};
int sum = Arrays.stream(numbers).sum();
//由文件生成流 文件中有多少各不相同的词
long uniqueWords = 0;
try (Stream<String> lines =
Files.lines(Paths.get("data.txt"), Charset.defaultCharset())) {
uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" ")))
.distinct()
.count();
} catch (IOException e) {
}
//使用 Files.lines 得到一个流,其中的每个元素都是给定文件中的一行。
//然后,你可以对 line 调用 split 方法将行拆分成单词。
//应该注意的是,你该如何使用 flatMap产生一个扁平的单词流,而不是给每一行生成一个单词流。
//最后,把 distinct 和count方法链接起来,数数流中有多少各不相同的单词。
无限流
Stream API提供了两个静态方法来从函数生成流: Stream.iterate 和 Stream.generate
- 迭代:iterate 方法接受一个初始值(在这里是 0 ),还有一个依次应用在每个产生的新值上的
Lambda( UnaryOperator<t> 类型)。
Stream.iterate(0, n -> n + 2) .limit(10) .forEach(System.out::println);
- 生成:与 iterate 方法类似, generate 方法也可让你按需生成一个无限流。但 generate 不是依次对每个新生成的值应用函数的。它接受一个 Supplier<T> 类型的Lambda提供新的值。
Stream.generate(Math::random) .limit(5) .forEach(System.out::println);
IntStream ones = IntStream.generate(() -> 1);
收集数据
分组
对菜按照类别进行分组
通常写法
//结果组
Map<Dish.Type,List<Dish>> dishTypeMap = new HashMap<>();
//迭代
for(Dish dish : menu) {
Dish.Type dishType = dish.getType();
//查找当前类别是否存在
List<Dish> dishes = dishTypeMap.get(dishType);
if(dishes == null) {
//不存在新建分组
dishes = new ArrayList<>();
dishTypeMap.put(dishType,dishes);
continue;
}
//存在直接添加
dishes.add(dish);
}
java8
Map<Dish.Type, List<Dish>> typeListMap = menu.stream().collect(groupingBy(Dish::getType));
按照热量进行分组
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return
CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} ));
多级分组
//二级分组
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel =
menu.stream().collect(
groupingBy(Dish::getType,
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
} )
)
);
分组统计
Map<Dish.Type, Long> typesCount = menu.stream().collect(
groupingBy(Dish::getType, counting()));
分组求最大
Map<Dish.Type, Optional<Dish>> mostCaloricByType =
menu.stream()
.collect(groupingBy(Dish::getType,
maxBy(comparingInt(Dish::getCalories))));
上一个的优化版:
Map<Dish.Type, Dish> mostCaloricByType =
menu.stream()
.collect(groupingBy(Dish::getType,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
//收集器返回的结果转换为另一种类型,你可以使用Collectors.collectingAndThen 工厂方法返回的收集器
分组求和
Map<Dish.Type, Integer> totalCaloriesByType =
menu.stream().collect(groupingBy(Dish::getType,
summingInt(Dish::getCalories)));
分区
分区是分组的特殊情况:由一个谓词(返回一个布尔值的函数)作为分类函数,它称分区函
数。分区函数返回一个布尔值,这意味着得到的分组 Map 的键类型是 Boolean ,于是它最多可以
分为两组—— true 是一组, false 是一组。
Map<Boolean, List<Dish>> partitionedMenu =
menu.stream().collect(partitioningBy(Dish::isVegetarian));
分区后分组
Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianDishesByType =
menu.stream().collect(
partitioningBy(Dish::isVegetarian,
groupingBy(Dish::getType)));
分区求最大
Map<Boolean, Dish> mostCaloricPartitionedByVegetarian =
menu.stream().collect(
partitioningBy(Dish::isVegetarian,
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
二级分区
menu.stream().collect(partitioningBy(Dish::isVegetarian,
partitioningBy (d -> d.getCalories() > 500)));
分区统计
menu.stream().collect(partitioningBy(Dish::isVegetarian,counting()));
==Collectors 类的静态工厂方法==