java8和streams:天作之合,但可能有点势不可挡。
Stream的添加是Java 8中的主要新功能之一。文章是对Stream支持的许多功能的介绍,重点是简单,实际的示例。
要理解这些需要对Java8(lambda表达式、方法引用)有基本的工作知识。
介绍
首先,不应将Java 8流与Java I / O流(例如:FileInputStream等)混淆;它们之间几乎没有关系。
简而言之,流是数据源周围的包装器,使我们能够使用该数据源进行操作,并使批量处理方便快捷。
流不存储数据,从这个意义上说,它不是数据结构。它也永远不会修改基础数据源。
此新功能java.util.stream-支持对元素流进行功能样式的操作,例如对集合进行map-reduce转换。
创建流
从现有数组中获取流:
private static Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
Stream.of(arrayOfEmps);
还可以从现有列表中获取流:
private static List<Employee> empList = Arrays.asList(arrayOfEmps);
empList.stream();
请注意,Java 8向Collection接口添加了新的stream()方法。
我们可以使用Stream.of()从单个对象创建一个流:
Stream.of(arrayOfEmps[0], arrayOfEmps[1], arrayOfEmps[2]);
或者简单地使用Stream.builder():
Stream.Builder<Employee> empStreamBuilder = Stream.builder();
empStreamBuilder.accept(arrayOfEmps[0]);
empStreamBuilder.accept(arrayOfEmps[1]);
empStreamBuilder.accept(arrayOfEmps[2]);
Stream<Employee> empStream = empStreamBuilder.build();
还有其他获取流的方法,在下面的部分中介绍其中的一些方法。
流操作
forEach()是最简单,最常见的操作;它在流元素上循环,在每个元素上调用提供的函数。
该方法非常普遍,已直接在Iterable,Map等中引入:
@Test
public void whenIncrementSalaryForEachEmployee_thenApplyNewSalary() {
empList.stream().forEach(e -> e.salaryIncrement(10.0));
assertThat(empList, contains(
hasProperty("salary", equalTo(110000.0)),
hasProperty("salary", equalTo(220000.0)),
hasProperty("salary", equalTo(330000.0))
));
}
这将有效地调用salaryIncrement()中的每个元素empList。
forEach()是终端操作,这意味着在执行该操作之后,流流被视为已消耗,并且无法再使用。在下一节中,我们将详细讨论终端操作。
在对原始流的每个元素应用函数之后,map()会产生一个新的流。新的流可以是不同的类型。
以下示例将Integer的流转换为Employee的流:
@Test
public void whenMapIdToEmployees_thenGetEmployeeStream() {
Integer[] empIds = { 1, 2, 3 };
List<Employee> employees = Stream.of(empIds)
.map(employeeRepository::findById)
.collect(Collectors.toList());
assertEquals(employees.size(), empIds.length);
}
在这里,我们得到一个整数雇员ID的从数组流。每个Integer都传递给函数employeeRepository :: findById(),该函数 返回相应的Employee对象。这有效地形成了员工流。
我们在前面的示例中了解了collect()的工作方式;一旦完成所有处理,它就是从流中获取内容的常用方法之一:
@Test
public void whenCollectStreamToList_thenGetList() {
List<Employee> employees = empList.stream().collect(Collectors.toList());
assertEquals(empList, employees);
}
collect()对Stream实例中保存的数据元素执行可变的折叠操作(将元素重新打包到某些数据结构并应用一些其他逻辑,将它们串联等)。
此操作的策略是通过Collector接口实现提供的。在上面的示例中,我们使用toList收集器将所有Stream元素收集到一个List实例中。
接下来,让我们看一下filter();这将产生一个新流,其中包含通过给定测试(由谓词指定)的原始流的元素。
让我们看一下它是如何工作的:
@Test
public void whenFilterEmployees_thenGetFilteredStream() {
Integer[] empIds = { 1, 2, 3, 4 };
List<Employee> employees = Stream.of(empIds)
.map(employeeRepository::findById)
.filter(e -> e != null)
.filter(e -> e.getSalary() > 200000)
.collect(Collectors.toList());
assertEquals(Arrays.asList(arrayOfEmps[2]), employees);
}
在上面的示例中,我们首先过滤掉无效的空员工ID的引用,然后再次应用过滤器以仅保留薪水超过特定阈值的员工。
@Test
public void whenFindFirst_thenGetFirstEmployeeInStream() {
Integer[] empIds = { 1, 2, 3, 4 };
Employee employee = Stream.of(empIds)
.map(employeeRepository::findById)
.filter(e -> e != null)
.filter(e -> e.getSalary() > 100000)
.findFirst()
.orElse(null);
assertEquals(employee.getSalary(), new Double(200000));
}
在此,将返回薪水大于100000的第一位员工。如果不存在这样的雇员,则返回null。
我们看到了如何使用collect()从流中获取数据。如果需要从流中获取数组,则可以简单地使用toArray():
@Test
public void whenStreamToArray_thenGetArray() {
Employee[] employees = empList.stream().toArray(Employee[]::new);
assertThat(empList.toArray(), equalTo(employees));
}
语法Employee [] :: new创建一个空的Employee数组 -然后用流中的元素填充它。
流可以保存诸如Stream <List <String >>的复杂数据结构。在这种情况下,flatMap()可帮助我们展平数据结构以简化进一步的操作:
@Test
public void whenFlatMapEmployeeNames_thenGetNameStream() {
List<List<String>> namesNested = Arrays.asList(
Arrays.asList("Jeff", "Bezos"),
Arrays.asList("Bill", "Gates"),
Arrays.asList("Mark", "Zuckerberg"));
List<String> namesFlatStream = namesNested.stream()
.flatMap(Collection::stream)
.collect(Collectors.toList());
assertEquals(namesFlatStream.size(), namesNested.size() * 2);
}
在本节前面的部分中,我们看到了forEach(),这是一个终端操作。但是,有时我们需要在应用任何终端操作之前对流的每个元素执行多项操作。
peek()在这种情况下很有用。简而言之,它对流的每个元素执行指定的操作,并返回一个可以进一步使用的新流。peek()是一个中间操作:
@Test
public void whenIncrementSalaryUsingPeek_thenApplyNewSalary() {
Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
List<Employee> empList = Arrays.asList(arrayOfEmps);
empList.stream()
.peek(e -> e.salaryIncrement(10.0))
.peek(System.out::println)
.collect(Collectors.toList());
assertThat(empList, contains(
hasProperty("salary", equalTo(110000.0)),
hasProperty("salary", equalTo(220000.0)),
hasProperty("salary", equalTo(330000.0))
));
}
方法类型和管道
正如我们一直在讨论的,流操作分为中间操作和终端操作。
诸如filter()之类的中间操作返回一个新的流,可以在该流上执行进一步的处理。终端操作(例如forEach())将流标记为已使用,此后将无法再使用该流。
流管道包括一个流源,后跟零个或多个中间操作,以及一个终端操作。
@Test
public void whenStreamCount_thenGetElementCount() {
Long empCount = empList.stream()
.filter(e -> e.getSalary() > 200000)
.count();
assertEquals(empCount, new Long(1));
}
一些操作被认为是短路操作。短路操作允许对无限流的计算在有限时间内完成:
@Test
public void whenLimitInfiniteStream_thenGetFiniteElements() {
Stream<Integer> infiniteStream = Stream.iterate(2, i -> i * 2);
List<Integer> collect = infiniteStream
.skip(3)
.limit(5)
.collect(Collectors.toList());
assertEquals(collect, Arrays.asList(16, 32, 64, 128, 256));
}
懒惰评估
流的最重要特征之一是它们允许通过惰性评估进行重大优化。
仅在启动终端操作时才执行对源数据的计算,并且仅在需要时才使用源元素。所有中间操作都是惰性的,因此直到实际需要处理结果时才执行它们。
例如,考虑我们之前看到的findFirst()示例。此处执行map()操作多少次?4次,因为输入数组包含4个元素?
@Test
public void whenFindFirst_thenGetFirstEmployeeInStream() {
Integer[] empIds = { 1, 2, 3, 4 };
Employee employee = Stream.of(empIds)
.map(employeeRepository::findById)
.filter(e -> e != null)
.filter(e -> e.getSalary() > 100000)
.findFirst()
.orElse(null);
assertEquals(employee.getSalary(), new Double(200000));
}
Stream执行映射和两次过滤操作,一次执行一个元素。
它首先对id 1执行所有操作。由于id 1的薪水不大于100000,因此处理移至下一个元素。
Id 2满足两个过滤器谓词,因此流评估终端操作findFirst()并返回结果。
无法对ID 3和ID 4执行任何操作。
不必要地处理流可以避免在不需要时检查所有数据。当输入流是无限且不仅很大时,此行为甚至变得更加重要。
基于比较的流操作
让我们从sorted()操作开始-这根据传递给我们的比较器对流元素进行排序。
例如,我们可以根据Employee的名称对其进行排序:
@Test
public void whenSortStream_thenGetSortedStream() {
List<Employee> employees = empList.stream()
.sorted((e1, e2) -> e1.getName().compareTo(e2.getName()))
.collect(Collectors.toList());
assertEquals(employees.get(0).getName(), "Bill Gates");
assertEquals(employees.get(1).getName(), "Jeff Bezos");
assertEquals(employees.get(2).getName(), "Mark Zuckerberg");
}
请注意,短路不适用于sorted()。
@Test
public void whenFindMin_thenGetMinElementFromStream() {
Employee firstEmp = empList.stream()
.min((e1, e2) -> e1.getId() - e2.getId())
.orElseThrow(NoSuchElementException::new);
assertEquals(firstEmp.getId(), new Integer(1));
}
还可以避免使用Comparator.comparing()定义比较逻辑:
@Test
public void whenFindMax_thenGetMaxElementFromStream() {
Employee maxSalEmp = empList.stream()
.max(Comparator.comparing(Employee::getSalary))
.orElseThrow(NoSuchElementException::new);
assertEquals(maxSalEmp.getSalary(), new Double(300000.0));
}
exclude()不接受任何参数,并返回流中的distinct元素,从而消除重复项。它使用元素的equals()方法来确定两个元素是否相等:
@Test
public void whenApplyDistinct_thenRemoveDuplicatesFromStream() {
List<Integer> intList = Arrays.asList(2, 5, 3, 2, 4, 3);
List<Integer> distinctIntList = intList.stream().distinct().collect(Collectors.toList());
assertEquals(distinctIntList, Arrays.asList(2, 5, 3, 4));
}
allMatch,anyMatch和noneMatch
这些操作都带有一个谓词,并返回一个布尔值。确定答案后,就会发生短路并停止处理:
@Test
public void whenApplyMatch_thenReturnBoolean() {
List<Integer> intList = Arrays.asList(2, 4, 5, 6, 8);
boolean allEven = intList.stream().allMatch(i -> i % 2 == 0);
boolean oneEven = intList.stream().anyMatch(i -> i % 2 == 0);
boolean noneMultipleOfThree = intList.stream().noneMatch(i -> i % 3 == 0);
assertEquals(allEven, false);
assertEquals(oneEven, true);
assertEquals(noneMultipleOfThree, false);
}
allMatch()检查流中所有元素的谓词是否为true。在这里,一旦遇到5 ,它就返回false,而5不能被2整除。
anyMatch()检查流中任何一个元素的谓词是否为true。在此,再次施加短路,并且在第一个元素之后立即返回true。
noneMatch()检查是否没有与谓词匹配的元素。在这里,一旦遇到6 ,它就简单地返回false,该值可以被3整除。
专用Stream
根据到目前为止的讨论,Stream是对象引用的流。但是,还有IntStream,LongStream和DoubleStream,它们分别是int,long和double的原始特化。这些在处理许多数字基元时非常方便。
创建
创建IntStream的最常见方法是在现有流上调用mapToInt():
@Test
public void whenFindMaxOnIntStream_thenGetMaxInteger() {
Integer latestEmpId = empList.stream()
.mapToInt(Employee::getId)
.max()
.orElseThrow(NoSuchElementException::new);
assertEquals(latestEmpId, new Integer(3));
}
从一个流<Employee>开始,通过向mapToInt提供Employee::getId来获得一个IntStream。最后,我们调用max(),它返回最高的整数。
也可以使用IntStream.of()用于创建IntStream:
IntStream.of(1, 2, 3);
或IntStream.range():
IntStream.range(10, 20)
这将创建数字10到19的IntStream。
在继续讨论下一个主题之前,需要注意的一个重要区别是:
Stream.of(1, 2, 3)
这将返回Stream <Integer>而不是IntStream。
同样,使用map()而不是mapToInt()返回Stream <Integer>而不是IntStream。
empList.stream().map(Employee::getId);
专用的操作
与标准流相比,专用流提供了额外的操作-在处理数字时非常方便。
例如sum(),average(),range()等:
@Test
public void whenApplySumOnIntStream_thenGetSum() {
Double avgSal = empList.stream()
.mapToDouble(Employee::getSalary)
.average()
.orElseThrow(NoSuchElementException::new);
assertEquals(avgSal, new Double(200000));
}
减少操作
归约运算(也称为折叠)采用一系列输入元素,并通过重复应用组合操作将它们组合为单个汇总结果。我们已经看到了很少的归约运算,例如findFirst(),min()和max()。
让我们来看一下通用的reduce()操作。
reduce()的最常见形式是:
T reduce(T identity, BinaryOperator<T> accumulator)
在这里,标识是起始值,累加器是我们重复应用的二进制运算。
@Test
public void whenApplyReduceOnStream_thenGetValue() {
Double sumSal = empList.stream()
.map(Employee::getSalary)
.reduce(0.0, Double::sum);
assertEquals(sumSal, new Double(600000));
}
在这里,我们从初始值0开始,然后在流的元素上重复应用Double :: sum()。通过在Stream上应用reduce(),我们有效地实现了DoubleStream.sum()。
进阶收集
我们已经了解了如何使用Collectors.toList()从流中获取列表。现在让我们看看从流中收集元素的更多方法。
@Test
public void whenCollectByJoining_thenGetJoinedString() {
String empNames = empList.stream()
.map(Employee::getName)
.collect(Collectors.joining(", "))
.toString();
assertEquals(empNames, "Jeff Bezos, Bill Gates, Mark Zuckerberg");
}
还可以使用toSet()从流元素中获取一个集合:
@Test
public void whenCollectBySet_thenGetSet() {
Set<String> empNames = empList.stream()
.map(Employee::getName)
.collect(Collectors.toSet());
assertEquals(empNames.size(), 3);
}
toCollection
@Test
public void whenToVectorCollection_thenGetVector() {
Vector<String> empNames = empList.stream()
.map(Employee::getName)
.collect(Collectors.toCollection(Vector::new));
assertEquals(empNames.size(), 3);
}
在这里,在内部创建一个空集合,并在流的每个元素上调用其add()方法。
summarizingDouble
summarizingDouble()是另一个有趣的收集器-对每个输入元素应用双重生成的映射函数,并返回一个特殊类,该类包含有关所得值的统计信息:
@Test
public void whenApplySummarizing_thenGetBasicStats() {
DoubleSummaryStatistics stats = empList.stream()
.collect(Collectors.summarizingDouble(Employee::getSalary));
assertEquals(stats.getCount(), 3);
assertEquals(stats.getSum(), 600000.0, 0);
assertEquals(stats.getMin(), 100000.0, 0);
assertEquals(stats.getMax(), 300000.0, 0);
assertEquals(stats.getAverage(), 200000.0, 0);
}
请注意,我们如何分析每个员工的薪水并获取有关该数据的统计信息,例如最小值,最大值,平均值等。
当我们使用一种特殊的流时,summaryStatistics()可用于生成类似的结果:
@Test
public void whenApplySummaryStatistics_thenGetBasicStats() {
DoubleSummaryStatistics stats = empList.stream()
.mapToDouble(Employee::getSalary)
.summaryStatistics();
assertEquals(stats.getCount(), 3);
assertEquals(stats.getSum(), 600000.0, 0);
assertEquals(stats.getMin(), 100000.0, 0);
assertEquals(stats.getMax(), 300000.0, 0);
assertEquals(stats.getAverage(), 200000.0, 0);
}
partitioningBy
我们可以根据元素是否满足特定条件将流分成两部分。
让我们将数据列表分为偶数和奇数:
@Test
public void whenStreamPartition_thenGetMap() {
List<Integer> intList = Arrays.asList(2, 4, 5, 6, 8);
Map<Boolean, List<Integer>> isEven = intList.stream().collect(
Collectors.partitioningBy(i -> i % 2 == 0));
assertEquals(isEven.get(true).size(), 4);
assertEquals(isEven.get(false).size(), 1);
}
在这里,流被划分为一个Map,偶数和奇数存储为真和假键。
分组
groupingBy()提供了高级分区功能-在这里我们可以将流划分为两个以上的组。
它以分类函数为参数。此分类功能应用于流的每个元素。
该函数返回的值用作从groupingBy收集器获取的映射的键:
@Test
public void whenStreamGroupingBy_thenGetMap() {
Map<Character, List<Employee>> groupByAlphabet = empList.stream().collect(
Collectors.groupingBy(e -> new Character(e.getName().charAt(0))));
assertEquals(groupByAlphabet.get('B').get(0).getName(), "Bill Gates");
assertEquals(groupByAlphabet.get('J').get(0).getName(), "Jeff Bezos");
assertEquals(groupByAlphabet.get('M').get(0).getName(), "Mark Zuckerberg");
}
在此快速示例中,我们根据员工名字的首字母对员工进行分组。
在上一节中讨论过的groupingBy()使用Map来对流的元素进行分组。
但是,有时我们可能需要将数据分组为除元素类型以外的其他类型。
这就是我们可以做到的;我们可以使用mapping(),它实际上可以使收集器适应其他类型-使用映射函数:
@Test
public void whenStreamMapping_thenGetMap() {
Map<Character, List<Integer>> idGroupedByAlphabet = empList.stream().collect(
Collectors.groupingBy(e -> new Character(e.getName().charAt(0)),
Collectors.mapping(Employee::getId, Collectors.toList())));
assertEquals(idGroupedByAlphabet.get('B').get(0), new Integer(2));
assertEquals(idGroupedByAlphabet.get('J').get(0), new Integer(1));
assertEquals(idGroupedByAlphabet.get('M').get(0), new Integer(3));
}
在这里,mapping()使用getId()映射函数将流元素Employee仅映射到雇员ID(它是Integer )。这些ID仍根据员工名字的首字母进行分组。
减少()是类似于减少() - 这是我们之前探讨。它只是返回一个收集器,该收集器将减少其输入元素:
@Test
public void whenStreamReducing_thenGetValue() {
Double percentage = 10.0;
Double salIncrOverhead = empList.stream().collect(Collectors.reducing(
0.0, e -> e.getSalary() * percentage / 100, (s1, s2) -> s1 + s2));
assertEquals(salIncrOverhead, 60000.0, 0);
}
在这里reduce()获取每个雇员的薪水增量并返回总和。
当在groupingBy()或partitioningBy()的下游进行多级归约时,reducing()最有用。要对流执行简单的还原,请使用reduce()。
例如,让我们看看如何将reduce()与groupingBy()结合使用:
@Test
public void whenStreamGroupingAndReducing_thenGetMap() {
Comparator<Employee> byNameLength = Comparator.comparing(Employee::getName);
Map<Character, Optional<Employee>> longestNameByAlphabet = empList.stream().collect(
Collectors.groupingBy(e -> new Character(e.getName().charAt(0)),
Collectors.reducing(BinaryOperator.maxBy(byNameLength))));
assertEquals(longestNameByAlphabet.get('B').get().getName(), "Bill Gates");
assertEquals(longestNameByAlphabet.get('J').get().getName(), "Jeff Bezos");
assertEquals(longestNameByAlphabet.get('M').get().getName(), "Mark Zuckerberg");
}
在这里,我们根据员工名字的首字母对员工进行分组。在每个组中,我们找到姓名最长的员工。
并行流
使用对并行流的支持,我们可以并行执行流操作,而无需编写任何样板代码;我们只需要将流指定为并行:
@Test
public void whenParallelStream_thenPerformOperationsInParallel() {
Employee[] arrayOfEmps = {
new Employee(1, "Jeff Bezos", 100000.0),
new Employee(2, "Bill Gates", 200000.0),
new Employee(3, "Mark Zuckerberg", 300000.0)
};
List<Employee> empList = Arrays.asList(arrayOfEmps);
empList.stream().parallel().forEach(e -> e.salaryIncrement(10.0));
assertThat(empList, contains(
hasProperty("salary", equalTo(110000.0)),
hasProperty("salary", equalTo(220000.0)),
hasProperty("salary", equalTo(330000.0))
));
}
这里salaryIncrement()将在该料流的多个元素,平行通过简单地将得到执行并行()的语法。
当然,如果您需要对操作的性能特征进行更多控制,则可以进一步调整和配置此功能。
与编写多线程代码一样,在使用并行流时,我们需要注意一些事项:
- 我们需要确保代码是线程安全的。如果并行执行的操作修改了共享数据,则需要格外小心。
- 如果执行操作的顺序或输出流中返回的顺序很重要,则不应使用并行流。例如,在并行流的情况下,诸如findFirst()之类的操作可能会生成不同的结果。
- 另外,我们应该确保值得让代码并行执行。当然,特别要了解操作的性能特征,而且要了解整个系统的性能特征,这一点自然很重要。
无限流
有时,我们可能想在元素仍在生成时执行操作。我们可能事先不知道我们需要多少个元素。与使用list或map填充所有元素不同,我们可以使用无限流,也称为无界流。
有两种生成无限流的方法:
我们提供了一个供应商来产生()每当新流元素需要生成其中被调用:
@Test
public void whenGenerateStream_thenGetInfiniteStream() {
Stream.generate(Math::random)
.limit(5)
.forEach(System.out::println);
}
在这里,我们将Math:: random()作为Supplier传递,它返回下一个随机数。
对于无限流,我们需要提供一个条件以最终终止处理。一种常见的实现方法是使用limit()。在上面的示例中,我们将流限制为5个随机数,并在生成它们时打印它们。
请注意,传递给generate()的Supplier可能是有状态的,并且在并行使用时,此类流可能不会产生相同的结果。
iterate()具有两个参数:一个初始值(称为种子元素)和一个使用前一个值生成下一个元素的函数。根据设计,iterate()是有状态的,因此在并行流中可能没有用:
@Test
public void whenIterateStream_thenGetInfiniteStream() {
Stream<Integer> evenNumStream = Stream.iterate(2, i -> i * 2);
List<Integer> collect = evenNumStream
.limit(5)
.collect(Collectors.toList());
assertEquals(collect, Arrays.asList(2, 4, 8, 16, 32));
}
在这里,我们传递2作为种子值,它成为流的第一个元素。此值作为输入传递给lambda,然后返回4。此值又作为输入传递给下一个迭代。
这一直持续到我们生成由limit()指定的作为终止条件的元素数量为止。
文件操作
让我们看看如何在文件操作中使用流。
文件写入操作
@Test
public void whenStreamToFile_thenGetFile() throws IOException {
String[] words = {
"hello",
"refer",
"world",
"level"
};
try (PrintWriter pw = new PrintWriter(
Files.newBufferedWriter(Paths.get(fileName)))) {
Stream.of(words).forEach(pw::println);
}
}
在这里,我们使用forEach()通过调用PrintWriter.println()将流的每个元素写入文件。
文件读取操作
private List<String> getPalindrome(Stream<String> stream, int length) {
return stream.filter(s -> s.length() == length)
.filter(s -> s.compareToIgnoreCase(
new StringBuilder(s).reverse().toString()) == 0)
.collect(Collectors.toList());
}
@Test
public void whenFileToStream_thenGetStream() throws IOException {
List<String> str = getPalindrome(Files.lines(Paths.get(fileName)), 5);
assertThat(str, contains("refer", "level"));
}
getPalindrome()在流上工作,完全不知道流是如何生成的。这也提高了代码的可重用性并简化了单元测试。
结论
在本文中,专注于Java 8中新的Stream功能的细节。我们看到了所支持的各种操作以及如何使用lambda和管道来编写简洁的代码。
还看到了流的一些特性,例如惰性求值,并行流和无限流。