本文是向大家介绍:Java8特性之Stream流的原理和日常使用,可以直观的理解Stream流操作和玩转集合
1、什么是 Stream
1.1、 简介
java8新添加了一个特性:流Stream。Stream和I/O流不同,它更像具有Iterable的集合类,但行为和集合类又有所不同,它是对集合对象功能的增强,让开发者能够以一种声明的方式处理数据源(集合、数组等),它专注于对数据源进行各种高效的聚合操作(aggregate operation)和大批量数据操作 (bulk data operation)。
举个例子,只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。
Stream是一种对 Java 集合运算和表达的高阶抽象。Stream API将处理的数据源看做一种Stream(流),Stream(流)在Pipeline(管道)中传输和运算,支持的运算包含筛选、排序、聚合等,当到达终点后便得到最终的处理结果。
几个关键概念:
元素 Stream是一个来自数据源的元素队列,Stream本身并不存储元素。
数据源(即Stream的来源)包含集合、数组、I/O channel、generator(发生器)等。
聚合操作 类似SQL中的filter、map、find、match、sorted等操作
管道运算 Stream在Pipeline中运算后返回Stream对象本身,这样多个操作串联成一个Pipeline,并形成fluent风格的代码。这种方式可以优化操作,如延迟执行(laziness)和短路( short-circuiting)。
内部迭代 不同于java8以前对集合的遍历方式(外部迭代),Stream API采用访问者模式(Visitor)实现了内部迭代。
并行运算 Stream API支持串行(stream() )或并行(parallelStream() )的两种操作方式。
1.2、 Stream API的特点以及类型:
(1)Stream API的使用和同样是java8新特性的lambda表达式密不可分,可以大大提高编码效率和代码可读性。
(2)Stream API提供串行和并行两种操作,其中并行操作能发挥多核处理器的优势,使用fork/join的方式进行并行操作以提高运行速度。
(3)Stream API进行并行操作无需编写多线程代码即可写出高效的并发程序,且通常可避免多线程代码出错的问题。
和以前的Collection操作不同, Stream操作有两个基础的特征:
Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。
内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。
流的操作可以分为:
中间操作(Intermediate Operations)
无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如filter()、flatMap()、flatMapToDouble()、flatMapToInt()、flatMapToLong()、map()、mapToDouble()、mapToInt()、mapToLong()、peek()、unordered() 等;
有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()、sorted()、sorted(comparator)、limit()、skip() 等;
终止操作(Terminal Operations)
非短路操作:处理完所有数据才能得到结果。如collect()、count()、forEach()、forEachOrdered()、max()、min()、reduce()、toArray()等;
短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() 等。
2、源码分析
2.1、结构图如下:
2.2、源码分析如下:
其中,Stream是一个接口,没有操作的默认实现方式。最主要的实现类是ReferencePipeline,它继承自AbstractPipline,而AbstractPipline实现了BaseStream接口。ReferencePipeline内部定义了三个静态内部类,包括:输入流的Head、无状态中间操作StablessOp、有状态StatfulOp,但之后Head不是抽象类。
2.2.1ReferencePipeline的理解
Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline和AbstractPipeline完成的。
定义:
abstract class ReferencePipeline<P_IN, P_OUT>
extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
implements Stream<P_OUT> {}
ReferencePipeline类几乎实现了所有的Stream中间操作和最终操作,这里挑选一些典型的代码进行分析。
先看看其中的三个重要的内部类。控制数据流入的 Head ,中间操作 StatelessOp,StatefulOp。
Head是ReferencePipeline数据源,其实内部就是一个集合的并行迭代器。
static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
// 构造器,数据源类型需要进程自Spliterator
Head(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
// 构造器,数据源类型就是Spliterator
Head(Spliterator<?> source,
int sourceFlags, boolean parallel) {
super(source, sourceFlags, parallel);
}
@Override
final boolean opIsStateful() {
throw new UnsupportedOperationException();
}
@Override
final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
throw new UnsupportedOperationException();
}
// Optimized sequential terminal operations for the head of the pipeline
@Override
public void forEach(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
}
@Override
public void forEachOrdered(Consumer<? super E_OUT> action) {
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEachOrdered(action);
}
}
}
无状态的链式加工,会返回一个StatelessOp对象,有状态的加工操作会返回一个StatefulOp对象。
abstract static class StatelessOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
// 构造器,就是将当前的中间操作和旧的Stream组合成一个新的Stream,返回新的Stream,实现链式调用
StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return false;
}
}
abstract static class StatefulOp<E_IN, E_OUT>
extends ReferencePipeline<E_IN, E_OUT> {
// 构造器,和上面一样
StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
StreamShape inputShape,
int opFlags) {
super(upstream, opFlags);
assert upstream.getOutputShape() == inputShape;
}
@Override
final boolean opIsStateful() {
return true;
}
@Override
abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
Spliterator<P_IN> spliterator,
IntFunction<E_OUT[]> generator);
}
2.2.2.无状态的中间操作
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink对象链表。
Sink代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。
public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
Objects.requireNonNull(mapper);
return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
return new Sink.ChainedReference<P_OUT, R>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.apply(u));
}
};
}
};
}
stream.filter(....).map(...)怎么形成一个链的?
filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2。在调用StatelessOp1.map时, StatelessOp2是这样生成的:return new StatelessOp<P_OUT, R>(StatelessOp1,......);,管道中每个阶段的Stream保留前一个流的引用。
2.2.3有状态的中间操作
@Override
public final Stream<P_OUT> distinct() {
return DistinctOps.makeRef(this);
}
@Override
public final Stream<P_OUT> sorted() {
return SortedOps.makeRef(this);
}
不管无状态还是有状态的中间操作都为返回一个StatelessOp或者StatefulOp传递给下一个操作,有点像设计模式中的职责链模式。
2.2.4.最终操作
public final long count() {
return mapToLong(e -> 1L).sum();
}
@Override
public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
return new Sink.ChainedReference<P_OUT, Long>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsLong(u));
}
};
}
};
}
Stream中的最终操作都是惰性的,是如何实现的呢。
首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept将触发链式操作, 将管道中的操作在一个迭代中执行一次。
事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。
并行操作是通过ForkJoinTask框架实现。
2.3、源码总结
其实还是可以把Stream的源码过程当成流水线来理解。
(1)流水线的入口,也就是数据源,每个Stream具有一个Head内部对象,而Head中就是一个集合spliterator,通过迭代依次输出一个个数据。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。
(2) 流水线的中间操作组装,不管是有状态的还是无状态的,都会返回一个包含了上一个节点引用的中间节点,就这样把一个个中间操作拼接到了控制数据流入口的Head后面,但是并没有开始所任何数据处理。
(3) 启动流水线,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。
3、Stream流结合Lambda表达式的使用
3.1、Stream创建流
(1)、调用.stream()方法直接返回流:
public static void test() {
List<Student> students = StudentData.getStudents();
Stream<Student> stream = students.stream();//第一种:返回一个顺序流
Stream<Student> stream1 = students.parallelStream();//第二种:返回一个并行流
}
(2)Arrays.stream() 方法创建
//通过一个数组创建stream
public static void test(){
//获取一个整型stream
int []arr={1,34,2,54,56,34};
IntStream stream = Arrays.stream(arr );
}
(3)stream.of方式创建
public static void test(){
Stream<String> stringStream =Stream .of("1","4","34","23");
Stream<Student>studentStream =Stream .of(new Student(1,"book",23,89.5),
new Student(2,"cack",22,90.9));
}
3.2、Stream流使用之中间操作
(1)、filter:接收 Lambda , 从流中排除某些元素。
Arrays.asList(1, 2, 3, 4, 5).stream().filter(x->x%2==0).collect(Collectors.toList());
(2)、distinct:筛选,通过流所生成元素的 hashCode() 和 equals() 去除重复元素。
Arrays.asList(1,2,3,4,5,6,7,7,7,7).stream().distinct().collect(Collectors.toList());
(3)、sorted:集合中的元素排序并且是倒序
Arrays.asList(1,2,3,4,5,6).stream().sorted((a,b)->b-a).collect(Collectors.toList());
(4)、limit:截断流,使其元素不超过给定数量。3
Arrays.asList(1,2,3,4,5,6,7).stream().limit().collect(Collectors.toList());
(5)、skip:跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补
Arrays.asList(1,2,3,3,4,5).stream().skip(1).collect(Collectors.toList());
(6)、map:接收Lambda,将元素转换成其他形式或提取信息。接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。
Arrays.asList("1","2","3","5").stream().map(x->Integer.valueOf(x)).collect(Collectors.toList());
(7)、peek:类似于打印日志的功能在进行操作时查看当前值
Arrays.asList("1","2","3","5").stream().peek(System.out::println).collect(Collectors.toList());
3.3、Stream流使用之终止操作
(1)、allMatch–检查是否匹配所有元素
List<Student> list=StudentData.getStudents() ;
//判断所有的学生年龄是否都大于20岁
boolean allMatch=list.stream() .allMatch(student ->student .getAge() >20);
(2)、anyMatch–检查是否至少匹配一个元素
List<Student> list=StudentData.getStudents() ;
//判断是否存在学生的年龄大于20岁
boolean anyMatch=list .stream() .anyMatch(student ->student .getAge() >20);
(3)、noneMatch–检查是否没有匹配所有元素
List<Student> list=StudentData.getStudents() ;
//判断是否不存在学生叫小白
boolean noneMatch=list .stream() .noneMatch(student ->student .getName() .equals("小白") );
(4)、findFirst–返回第一个元素
List<Student> list=StudentData.getStudents() ;
//查找第一个学生
Optional<Student>first=list .stream().findFirst() ;
(5)、findAny–返回当前流中的任意元素
List<Student> list=StudentData.getStudents() ;
//查找当前流中的元素
Optional <Student>any=list .stream().findAny() ;
(6)、count–返回流中元素的总个数
List<Student> list=StudentData.getStudents() ;
//查找所有的学生数量
long count=list .stream() .filter(student->student .getAge() >20).count() ;
(7)、max–返回流中最大值
//查找学生的最高分数
Stream <Double >doubleStream =list .stream() .map(student ->student .getScore() );
Optional <Double > max=doubleStream .max(Double::compareTo );
System.out.println(max );
(8)、min–返回流中最小值
List<Student> list=StudentData.getStudents() ;
Stream <Double >doubleStream =list .stream() .map(student ->student .getScore() );
Optional <Double > max=doubleStream .min(Double::compareTo );
(9)、归约
reduce–归约操作可以将流中元素反复结合起来,得到一个值
public static void test(){
//计算数的总和
List<Integer >list =Arrays .asList(4,5,6,1,8,9,2,3,7) ;
Integer reduce=list .stream() .reduce(0,Integer::sum );
System.out.println(reduce );
//计算学生总分
List <Student >students =StudentData .getStudents() ;
Stream <Double >doubleStream =students .stream() .map(Student::getScore );
Optional <Double >reduce1=doubleStream .reduce(Double ::sum );
System.out.println(reduce1 .get() );
}
(10)、收集
collect:将流转换为其他形式,接收一个Collector接口实现,用于给Stream中汇总的方法
public static void test(){
//返回一个list
List <Student >students =StudentData .getStudents() ;
List <Student >list =students .stream() .filter(student ->student .getScore() >88).collect(Collectors.toList()) ;
//返回一个set
Set <Student >set =students .stream() .filter(s->s.getAge() >23).collect(Collectors.toSet()) ;
}