函数式编程(二) Stream

函数式编程(一) lambda、FunctionalInterface、Method Reference
Stream是Java8最大的亮点,它是对集合对象功能的增强,专注于对集合对象进行各种高效的数据处理,Stream API借助于lambda表达式极大的提高了编程效率及程序的可读性,同时Stream提供串行和并行两种模式进行汇聚操作,并行模式可以充分利用多核处理器的优势。结合lambda表达式及Stream API可以很方便的编写高性能并发程序。

class Student{
    String name;
    int age;
    int score;
    public Student(String name, int age, int score) {
        this.name = name;
        this.age = age;
        this.score = score;
    public String toString() {
        return "Student [name=" + name + ", age=" + age + ", score=" + score + "]";


        List<Student> list = Arrays.asList(new Student("wang", 20, 90), 
                new Student("zhao", 30, 80), new Student("li", 25, 99),
                new Student("sun", 20, 80), new Student("zhou", 30, 70));


        Map<Integer, List<String>> rst = new HashMap<>();
        for (Student stu : list) {
            if(stu.getScore() >= 80) {
                List<String> names = rst.getOrDefault(stu.getAge(), new ArrayList<String>());
                rst.put(stu.getAge(), names);


Map<Integer, List<String>> map = list.stream().filter(s -> s.score>=80).
collect(Collectors.groupingBy(Student::getAge, Collectors.mapping(Student::getName, Collectors.toList())));

对比上面上段代码,不难发现采用Stream方式的代码可读性非常高,而这正是Java8 Stream带来的函数式编程的强大之处。


JavaDoc里对Stream的第一句话:A sequence of elements supporting sequential and parallel aggregate operations.流其实是支持对系列数据的并行和串行操作。
A stream pipeline consists of a source (which might be an array, a collection, a generator function, an I/O channel, etc), zero or more intermediate operations (which transform a stream into another stream), and a terminal operation(which produces a result or side-effect).Streams are lazy; computation on the source data is only performed when the terminal operation is initiated, and source elements are consumed only as needed.



  • Head: source Head 流的源头,可以为数组、集合、生成器函数、IO流
  • intermediate operations 中间操作,0个或多个,lazy,从一个流转化为另一个流
  • TerminalOp 结束操作,启动计算






        Stream<Student> stream =list.stream();

2.Intermediate Op

操作 类型 释义
filter StatelessOp Returns a stream consisting of the elements of this stream that match the given predicate.
map StatelessOp Returns a stream consisting of the results of applying the given function to the elements of this stream.
flatMap StatelessOp Returns a stream consisting of the results of replacing each element of this stream with the contents of a mapped stream produced by applying the provided mapping function to each element. Each mapped stream is closed after its contents have been placed into this stream. (If a mapped stream is null an empty stream is used, instead.)
distinct StatefulOp Returns a stream consisting of the distinct elements of this stream.
sorted StatefulOp Returns a stream consisting of the elements of this stream, sorted according to the provided Comparator or natural order.
peek StatelessOp Returns a stream consisting of the elements of this stream, additionally performing the provided action on each element as elements are consumed from the resulting stream.
limit StatefulOp Returns a stream consisting of the elements of this stream, truncated to be no longer than maxSize in length.
skip StatefulOp Returns a stream consisting of the remaining elements of this stream after discarding the first n elements of the stream.If this stream contains fewer than n elements then an empty stream will be returned.

中间操作分为无状态操作StatelessOp和有状态操作StatefulOp。StatelessOp是流中各元素的处理没有关系的操作,比如filter、map对流中每个元素单独处理。StatefulOp是流中的元素处理会考虑前面元素处理结果,比如sorted、distinct等。中间操作是lazy的,即没有Terminal Op的流不会得到计算。

- filter


Stream<T> filter(Predicate<? super T> predicate)
list.stream().filter(s->s.score>80).forEach(s -> System.out.println(s.name));

- map


<R> Stream<R> map(Function<? super T, ? extends R> mapper);
String rst = list.stream().map(Student::getName).collect(Collectors.joining(", "));

- flatMap

The flatMap operation has the effect of applying a one-to-many transformation to the elements of the stream, and then flattening the resulting elements into a new stream.

<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

注意:flatMap的 Function的入参是T类型,即输入流中元素的类型,Function的输出是Stream<? extends R>类型,即Function的类型是一个流,流中的元素类型是输出类型的元素。

long count = list.stream().flatMap(s->Arrays.stream(s.name.split(""))).distinct().count();


- distinct


Stream<T> distinct();

- sorted

排序,sorted()为自然排序(如果类型不支持自然排序,则抛出异常),sorted(Comparator<? super T> comparator)为传入一个比较器进行比较

Stream<T> sorted();
Stream<T> sorted(Comparator<? super T> comparator);

- peek


Stream<T> peek(Consumer<? super T> action);



Stream<T> limit(long maxSize);



Stream<T> skip(long n);

3.Terminal Op

操作 类型 释义
forEach ForEachOps Performs an action for each element of this stream.
forEachOrdered ForEachOps Performs an action for each element of this stream, in the encounter order of the stream if the stream has a defined encounter order.
reduce ReduceOps Performs a reduction on the elements of this stream
collect 详见Collector Performs a mutable reduction operation on the elements of this stream.(注意与reduce的差异,mutable)
min ReduceOps Returns the minimum element of this stream according to the provided Comparator. This is a special case of a reduction.
max ReduceOps Returns the maximum element of this stream according to the provided Comparator. This is a special case of a reduction.
count ReduceOps Returns the count of elements in this stream.
anyMatch MatchOps(短路) Returns whether any elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then false is returned and the predicate is not evaluated.
allMatch MatchOps(短路) Returns whether all elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.
noneMatch MatchOps(短路) Returns whether no elements of this stream match the provided predicate. May not evaluate the predicate on all elements if not necessary for determining the result. If the stream is empty then true is returned and the predicate is not evaluated.
findFirst FindOps(短路) Returns an Optional describing the first element of this stream, or an empty Optional if the stream is empty. If the stream has no encounter order, then any element may be returned.
findAny FindOps(短路) Returns an Optional describing some element of the stream, or an empty Optional if the stream is empty.

Terminal Op分为短路操作和非短路操作,短路操作是指不一定需要处理全部元素就可以返回结果,比如FindOps中的FindFirst找到第一个满足条件的即返回结果,后续元素不再处理,同样MatchOps的allMatch如果有一个元素不满足,直接返回false而不需要判断后续元素是否满足条件。非短路操作是指需要对所有元素进行处理的操作。



void forEach(Consumer<? super T> action);
list.parallelStream().map(s->s.name + ": " + (s.score>=80 ? "A" : "B")).


    default void forEach(Consumer<? super T> action) {
        for (T t : this) {




void forEachOrdered(Consumer<? super T> action);
list.parallelStream().map(s->s.name + ": " + (s.score>=80 ? "A" : "B")).


汇聚操作代码编写稍显复杂,但提供了强大的并行处理能力,在这需要首先了解associative accumulation function的含义。
An operator or function op is associative if the following holds:

     (a op b) op c == a op (b op c)

The importance of this to parallel evaluation can be seen if we expand this to four terms:

     a op b op c op d == (a op b) op (c op d)

So we can evaluate (a op b) in parallel with (c op d), and then invoke op on the results.
汇聚与loop相比强大之处就在于其优异的并行处理能力,而associative 的含义就是数据的计算顺序不影响最后的计算结果。

  • T reduce(T identity, BinaryOperator<T> accumulator);
          T result = identity;
          for (T element : this stream)
             result = accumulator.apply(result, element)
          return result;

JavaDoc:While this may seem a more roundabout way to perform an aggregation compared to simply mutating a running total in a loop, reduction operations parallelize more gracefully, without needing additional synchronization and with greatly reduced risk of data races.这种汇聚操作看似冗杂,但其可以在不额外进行同步的情况下优雅的并行处理数据并且大大降低了数据竞争的风险。

String names = list.stream().map(Student::getName).reduce("names:", String::concat);
int max = list.stream().mapToInt(Student::getScore).reduce(0, Integer::max);


  • Optional<T> reduce(BinaryOperator<T> accumulator);
    JavaDoc:Performs a reduction on the elements of this stream, using an associative accumulation function, and returns an Optional describing the reduced value, if any. This is equivalent to:
          boolean foundAny = false;
          T result = null;
          for (T element : this stream) {
              if (!foundAny) {
                  foundAny = true;
                  result = element;
                  result = accumulator.apply(result, element);
          return foundAny ? Optional.of(result) : Optional.empty();

but is not constrained to execute sequentially。从其等价代码可以看出该reduce的作用。Optional是Java8为了避免NPE问题引入的。后面会单独阐述。

 String name = list.stream().map(Student::getName)
     .reduce((o1,o2)->o1.compareToIgnoreCase(o2)>0 ? o1 :o2).orElse("");


  • U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);
    JavaDoc:Many reductions using this form can be represented more simply by an explicit combination of map and reduce operations. The accumulator function acts as a fused mapper and accumulator, which can sometimes be more efficient than separate mapping and reduction, such as when knowing the previously reduced value allows you to avoid some computation.
      This is equivalent to:
          U result = identity;
          for (T element : this stream)
              result = accumulator.apply(result, element)
          return result;


Integer scoreSequentialSum = list.stream().
reduce(0, (t, s) -> t + s.getScore(), (t1,t2)-> {throw new NullPointerException();});
Integer scoreParallelSum = list.parallelStream().
       reduce(0, (sum, student) -> sum + student.getScore(), Integer::sum);





  • <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator,
    BiConsumer<R, R> combiner);
    JavaDoc:Performs a mutable reduction operation on the elements of this stream. A mutable reduction is one in which the reduced value is a mutable result container,such as an ArrayList, and elements are incorporated by updating the state of the result rather than by replacing the result. This produces a result equivalent to:
          R result = supplier.get();
          for (T element : this stream)
              accumulator.accept(result, element);
          return result;

Like reduce, collect operations can be parallelized without requiring additional synchronization.

R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);
U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);


List<String> names = list.stream().map(Student::getName).
                     collect(ArrayList<String>::new, List::add, List::addAll);
  • <R, A> R collect(Collector<? super T, A, R> collector);
    该重载方法主要与Collectors提供的实现Collector接口的方法结合使用,可以利用Collectors提供的接口方便的实现多级聚合。比如可以借用Collectors的groupingBy方法的叠加实现多级分组。即便对于非并发安全的数据类型(比如ArrayList),并行汇聚也不需要进行额外的同步保护。这块内容将在Collector、Collectors中详细阐述。JavaDoc:Performs a mutable reduction operation on the elements of this stream using a Collector. A Collector encapsulates the functions used as arguments to collect(Supplier, BiConsumer, BiConsumer), allowing for reuse of collection strategies and composition of collect operations such as multiple-level grouping or partitioning. When executed in parallel, multiple intermediate results may be instantiated, populated, and merged so as to maintain isolation of mutable data structures. Therefore, even when executed in parallel with non-thread-safe data structures (such as ArrayList), no additional synchronization is needed for a parallel reduction.
List<String> names = list.stream().map(Student::getName).collect(Collectors.toList());




Optional<T> min(Comparator<? super T> comparator);
public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
        return reduce(BinaryOperator.minBy(comparator));




Optional<T> max(Comparator<? super T> comparator);
public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
    return reduce(BinaryOperator.maxBy(comparator));



long count();
public final long count() {
    return mapToLong(e -> 1L).sum();


boolean anyMatch(Predicate<? super T> predicate);


boolean b = list.stream().anyMatch(s -> s.getAge() <= 20);


boolean allMatch(Predicate<? super T> predicate);


boolean b = list.stream().allMatch(s -> s.getScore() >= 80);


 boolean noneMatch(Predicate<? super T> predicate);


boolean b = list.stream().noneMatch(s -> s.getScore() >= 80);


Optional<T> findFirst();

findFirst获取流中的第一个元素,如果流本身是没有顺序的,则会返回任意一个值。如果流本身有顺序(Ordered),并行流和串行流得到的结果都是一致的。JavaDoc:If the stream has no encounter order, then any element may be returned.



Optional<T> findAny();





