Java8特性之Stream的原理解析和日常使用

本文是向大家介绍:Java8特性之Stream流的原理和日常使用,可以直观的理解Stream流操作和玩转集合



1、什么是 Stream

1.1、 简介

        java8新添加了一个特性:流Stream。Stream和I/O流不同,它更像具有Iterable的集合类,但行为和集合类又有所不同,它是对集合对象功能的增强,让开发者能够以一种声明的方式处理数据源(集合、数组等),它专注于对数据源进行各种高效的聚合操作(aggregate operation)和大批量数据操作 (bulk data operation)。

      举个例子,只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream 会隐式地在内部进行遍历,做出相应的数据转换。

        Stream是一种对 Java 集合运算和表达的高阶抽象。Stream API将处理的数据源看做一种Stream(流),Stream(流)在Pipeline(管道)中传输和运算,支持的运算包含筛选、排序、聚合等,当到达终点后便得到最终的处理结果。

几个关键概念:

元素 Stream是一个来自数据源的元素队列,Stream本身并不存储元素。

数据源(即Stream的来源)包含集合、数组、I/O channel、generator(发生器)等。

聚合操作 类似SQL中的filter、map、find、match、sorted等操作

管道运算 Stream在Pipeline中运算后返回Stream对象本身,这样多个操作串联成一个Pipeline,并形成fluent风格的代码。这种方式可以优化操作,如延迟执行(laziness)和短路( short-circuiting)。

内部迭代 不同于java8以前对集合的遍历方式(外部迭代),Stream API采用访问者模式(Visitor)实现了内部迭代。

并行运算 Stream API支持串行(stream() )或并行(parallelStream() )的两种操作方式。

1.2、 Stream API的特点以及类型:

  (1)Stream API的使用和同样是java8新特性的lambda表达式密不可分,可以大大提高编码效率和代码可读性。

  (2)Stream API提供串行和并行两种操作,其中并行操作能发挥多核处理器的优势,使用fork/join的方式进行并行操作以提高运行速度。

  (3)Stream API进行并行操作无需编写多线程代码即可写出高效的并发程序,且通常可避免多线程代码出错的问题。

和以前的Collection操作不同, Stream操作有两个基础的特征:

      Pipelining: 中间操作都会返回流对象本身。 这样多个操作可以串联成一个管道, 如同流式风格(fluent style)。 这样做可以对操作进行优化, 比如延迟执行(laziness)和短路( short-circuiting)。

      内部迭代: 以前对集合遍历都是通过Iterator或者For-Each的方式, 显式的在集合外部进行迭代, 这叫做外部迭代。 Stream提供了内部迭代的方式, 通过访问者模式(Visitor)实现。

流的操作可以分为:

    中间操作(Intermediate Operations)

无状态(Stateless)操作:每个数据的处理是独立的,不会影响或依赖之前的数据。如filter()、flatMap()、flatMapToDouble()、flatMapToInt()、flatMapToLong()、map()、mapToDouble()、mapToInt()、mapToLong()、peek()、unordered() 等;

有状态(Stateful)操作:处理时会记录状态,比如处理了几个。后面元素的处理会依赖前面记录的状态,或者拿到所有元素才能继续下去。如distinct()、sorted()、sorted(comparator)、limit()、skip() 等;

      终止操作(Terminal Operations)

非短路操作:处理完所有数据才能得到结果。如collect()、count()、forEach()、forEachOrdered()、max()、min()、reduce()、toArray()等;

短路(short-circuiting)操作:拿到符合预期的结果就会停下来,不一定会处理完所有数据。如anyMatch()、allMatch()、noneMatch()、findFirst()、findAny() 等。

2、源码分析

2.1、结构图如下:


2.2、源码分析如下:

其中,Stream是一个接口,没有操作的默认实现方式。最主要的实现类是ReferencePipeline,它继承自AbstractPipline,而AbstractPipline实现了BaseStream接口。ReferencePipeline内部定义了三个静态内部类,包括:输入流的Head、无状态中间操作StablessOp、有状态StatfulOp,但之后Head不是抽象类。

2.2.1ReferencePipeline的理解

Stream只是一个接口,并没有操作的缺省实现。最主要的实现是ReferencePipeline和AbstractPipeline完成的。

定义:

abstract class ReferencePipeline<P_IN, P_OUT>

extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>

implements Stream<P_OUT>  {}

ReferencePipeline类几乎实现了所有的Stream中间操作和最终操作,这里挑选一些典型的代码进行分析。

先看看其中的三个重要的内部类。控制数据流入的 Head ,中间操作 StatelessOp,StatefulOp。

Head是ReferencePipeline数据源,其实内部就是一个集合的并行迭代器。

    static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {

        // 构造器,数据源类型需要进程自Spliterator

        Head(Supplier<? extends Spliterator<?>> source,

            int sourceFlags, boolean parallel) {

            super(source, sourceFlags, parallel);

        }

        // 构造器,数据源类型就是Spliterator

        Head(Spliterator<?> source,

            int sourceFlags, boolean parallel) {

            super(source, sourceFlags, parallel);

        }

        @Override

        final boolean opIsStateful() {

            throw new UnsupportedOperationException();

        }

        @Override

        final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {

            throw new UnsupportedOperationException();

        }

        // Optimized sequential terminal operations for the head of the pipeline

        @Override

        public void forEach(Consumer<? super E_OUT> action) {

            if (!isParallel()) {

                sourceStageSpliterator().forEachRemaining(action);

            }

            else {

                super.forEach(action);

            }

        }

        @Override

        public void forEachOrdered(Consumer<? super E_OUT> action) {

            if (!isParallel()) {

                sourceStageSpliterator().forEachRemaining(action);

            }

            else {

                super.forEachOrdered(action);

            }

        }

    }

无状态的链式加工,会返回一个StatelessOp对象,有状态的加工操作会返回一个StatefulOp对象。

    abstract static class StatelessOp<E_IN, E_OUT>

            extends ReferencePipeline<E_IN, E_OUT> {

      // 构造器,就是将当前的中间操作和旧的Stream组合成一个新的Stream,返回新的Stream,实现链式调用

        StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,

                    StreamShape inputShape,

                    int opFlags) {

            super(upstream, opFlags);

            assert upstream.getOutputShape() == inputShape;

        }

        @Override

        final boolean opIsStateful() {

            return false;

        }

    }

    abstract static class StatefulOp<E_IN, E_OUT>

            extends ReferencePipeline<E_IN, E_OUT> {

// 构造器,和上面一样

        StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,

                  StreamShape inputShape,

                  int opFlags) {

            super(upstream, opFlags);

            assert upstream.getOutputShape() == inputShape;

        }

        @Override

        final boolean opIsStateful() {

            return true;

        }

        @Override

        abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,

                                                      Spliterator<P_IN> spliterator,

                                                      IntFunction<E_OUT[]> generator);

    }

2.2.2.无状态的中间操作

    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {

        Objects.requireNonNull(predicate);

        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,

                                    StreamOpFlag.NOT_SIZED) {

            @Override

            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {

                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {

                    @Override

                    public void begin(long size) {

                        downstream.begin(-1);

                    }

                    @Override

                    public void accept(P_OUT u) {

                        if (predicate.test(u))

                            downstream.accept(u);

                    }

                };

            }

        };

    }

可以看到这个操作只是返回一个StatelessOp对象(此类依然继承于ReferencePipeline),它的一个回调函数opWrapSink会返回一个Sink对象链表。

Sink代表管道操作的每一个阶段, 比如本例的filter阶段。 在调用accept之前,先调用begin通知数据来了,数据发送后调用end。

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {

        Objects.requireNonNull(mapper);

        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,

                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {

            @Override

            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {

                return new Sink.ChainedReference<P_OUT, R>(sink) {

                    @Override

                    public void accept(P_OUT u) {

                        downstream.accept(mapper.apply(u));

                    }

                };

            }

        };

    }

stream.filter(....).map(...)怎么形成一个链的?

filter返回一个StatelessOp,我们记为StatelessOp1, 而map返回另外一个StatelessOp,我们记为StatelessOp2。在调用StatelessOp1.map时, StatelessOp2是这样生成的:return new StatelessOp<P_OUT, R>(StatelessOp1,......);,管道中每个阶段的Stream保留前一个流的引用。

2.2.3有状态的中间操作

    @Override

    public final Stream<P_OUT> distinct() {

        return DistinctOps.makeRef(this);

    }

    @Override

    public final Stream<P_OUT> sorted() {

        return SortedOps.makeRef(this);

    }

不管无状态还是有状态的中间操作都为返回一个StatelessOp或者StatefulOp传递给下一个操作,有点像设计模式中的职责链模式。

2.2.4.最终操作

public final long count() {

        return mapToLong(e -> 1L).sum();

    }

    @Override

    public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {

        Objects.requireNonNull(mapper);

        return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,

                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {

            @Override

            Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {

                return new Sink.ChainedReference<P_OUT, Long>(sink) {

                    @Override

                    public void accept(P_OUT u) {

                        downstream.accept(mapper.applyAsLong(u));

                    }

                };

            }

        };

    }

Stream中的最终操作都是惰性的,是如何实现的呢。

首先找到最后一个操作,也就是最终操作, 执行它的opWrapSink,事实上得到一个链表,最终返回第一个Sink, 执行第一个Sink的accept将触发链式操作, 将管道中的操作在一个迭代中执行一次。

事实上Java是将所有的操作形成一个类似链接的结构(通过Sink的downstream,upstream),在遇到最终操作时触发链式反应, 通过各种数据类型特定的spliterator的一次迭代最终得到结果。

并行操作是通过ForkJoinTask框架实现。

2.3、源码总结

其实还是可以把Stream的源码过程当成流水线来理解。

        (1)流水线的入口,也就是数据源,每个Stream具有一个Head内部对象,而Head中就是一个集合spliterator,通过迭代依次输出一个个数据。常用的集合都实现了 Spliterator 接口以支持 Stream。可以这样理解,Spliterator 定义了数据集合流入流水线的方式。

      (2) 流水线的中间操作组装,不管是有状态的还是无状态的,都会返回一个包含了上一个节点引用的中间节点,就这样把一个个中间操作拼接到了控制数据流入口的Head后面,但是并没有开始所任何数据处理。

      (3) 启动流水线,在最后一个操作的时候回溯链表, 并调用Spliterator的forEachRemaining方法进行一次遍历, 每访问一个数组的元素就会从头开始调用链表的每个节点。

3、Stream流结合Lambda表达式的使用

3.1、Stream创建流

(1)、调用.stream()方法直接返回流:

  public static void test() {

        List<Student> students = StudentData.getStudents();

        Stream<Student> stream = students.stream();//第一种:返回一个顺序流

        Stream<Student> stream1 = students.parallelStream();//第二种:返回一个并行流

    }

(2)Arrays.stream() 方法创建

//通过一个数组创建stream

public static void test(){

    //获取一个整型stream

    int []arr={1,34,2,54,56,34};

    IntStream stream = Arrays.stream(arr );

}

(3)stream.of方式创建

  public static void test(){

        Stream<String> stringStream =Stream .of("1","4","34","23");

        Stream<Student>studentStream =Stream .of(new Student(1,"book",23,89.5),

                new Student(2,"cack",22,90.9));

    }

3.2、Stream流使用之中间操作

  (1)、filter:接收 Lambda , 从流中排除某些元素。

Arrays.asList(1, 2, 3, 4, 5).stream().filter(x->x%2==0).collect(Collectors.toList());

(2)、distinct:筛选,通过流所生成元素的 hashCode() 和 equals() 去除重复元素。

Arrays.asList(1,2,3,4,5,6,7,7,7,7).stream().distinct().collect(Collectors.toList());

(3)、sorted:集合中的元素排序并且是倒序

Arrays.asList(1,2,3,4,5,6).stream().sorted((a,b)->b-a).collect(Collectors.toList());

(4)、limit:截断流,使其元素不超过给定数量。3

Arrays.asList(1,2,3,4,5,6,7).stream().limit().collect(Collectors.toList());

(5)、skip:跳过元素,返回一个扔掉了前 n 个元素的流。若流中元素不足 n 个,则返回一个空流。与 limit(n) 互补

Arrays.asList(1,2,3,3,4,5).stream().skip(1).collect(Collectors.toList());

(6)、map:接收Lambda,将元素转换成其他形式或提取信息。接收一个函数作为参数,该函数会被应用到每个元素上,并将其映射成一个新的元素。

Arrays.asList("1","2","3","5").stream().map(x->Integer.valueOf(x)).collect(Collectors.toList());

(7)、peek:类似于打印日志的功能在进行操作时查看当前值

Arrays.asList("1","2","3","5").stream().peek(System.out::println).collect(Collectors.toList());

3.3、Stream流使用之终止操作

(1)、allMatch–检查是否匹配所有元素

List<Student> list=StudentData.getStudents() ;

//判断所有的学生年龄是否都大于20岁

boolean allMatch=list.stream() .allMatch(student ->student .getAge() >20);

(2)、anyMatch–检查是否至少匹配一个元素

List<Student> list=StudentData.getStudents() ;

//判断是否存在学生的年龄大于20岁

boolean anyMatch=list .stream() .anyMatch(student ->student .getAge() >20);

(3)、noneMatch–检查是否没有匹配所有元素

List<Student> list=StudentData.getStudents() ;

//判断是否不存在学生叫小白

boolean noneMatch=list .stream() .noneMatch(student ->student .getName() .equals("小白") );

(4)、findFirst–返回第一个元素

List<Student> list=StudentData.getStudents() ;

//查找第一个学生

Optional<Student>first=list .stream().findFirst() ;

(5)、findAny–返回当前流中的任意元素

List<Student> list=StudentData.getStudents() ;

//查找当前流中的元素

Optional <Student>any=list .stream().findAny() ;

(6)、count–返回流中元素的总个数

List<Student> list=StudentData.getStudents() ;

//查找所有的学生数量

long count=list .stream() .filter(student->student .getAge() >20).count() ;

(7)、max–返回流中最大值

//查找学生的最高分数

Stream <Double >doubleStream =list .stream() .map(student ->student .getScore() );

Optional <Double > max=doubleStream .max(Double::compareTo );

        System.out.println(max );

(8)、min–返回流中最小值

List<Student> list=StudentData.getStudents() ;

Stream <Double >doubleStream =list .stream() .map(student ->student .getScore() );

Optional <Double > max=doubleStream .min(Double::compareTo );

(9)、归约

reduce–归约操作可以将流中元素反复结合起来,得到一个值

public static void test(){

      //计算数的总和

      List<Integer >list =Arrays .asList(4,5,6,1,8,9,2,3,7) ;

      Integer reduce=list .stream() .reduce(0,Integer::sum );

      System.out.println(reduce );

      //计算学生总分

        List <Student >students =StudentData .getStudents() ;

        Stream <Double >doubleStream =students .stream() .map(Student::getScore );

        Optional <Double >reduce1=doubleStream .reduce(Double ::sum );

        System.out.println(reduce1 .get() );

    }

(10)、收集

collect:将流转换为其他形式,接收一个Collector接口实现,用于给Stream中汇总的方法

  public static void test(){

        //返回一个list

        List <Student >students =StudentData .getStudents() ;

        List <Student >list =students .stream() .filter(student ->student .getScore() >88).collect(Collectors.toList()) ;

        //返回一个set

        Set <Student >set =students .stream() .filter(s->s.getAge() >23).collect(Collectors.toSet()) ;

    }

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容