Stream

学科归属&背景

  • Java8新特性,能够避免多重循环和反复遍历,导致有许多中间变量,增加内存的消耗。适用于统计/变换等场景。

侧重点/目标

1.中间的操作结果存在哪里?内存的消耗情况怎么样?

  • stream的实现不是对所有的数据依次进行单一操作,而是对一个元素遍历所有操作。内存消耗相比于迭代,大大降低。

2.中间操作的状态&短路是指什么

  • 有状态,依赖上一个操作的结果,不能并行?如,sorted
@Override
 public void end() {
     list.sort(comparator);
     downstream.begin(list.size());
     if (!cancellationWasRequested) {
         list.forEach(downstream::accept);
     }
     else {
         for (T t : list) {
             if (downstream.cancellationRequested()) break;
             downstream.accept(t);//等中间缓存的排序结果都ok end了,对排序后的结果,重新发起后续的操作
         }
     }
     downstream.end();
     list = null;
 }
  • 无状态,操作之间无关联。
  • 短路:执行到这个操作,这个元素有可能就被剔除,不能进行后续的操作了,类似continue。如,filter。=》合理布置stream操作顺序,可以减少计算量。

3.传入的抽象方法实现,是存在哪里?是由哪个类怎么执行的?如何短路执行(中间取消)?

  • 惰性执行(调用终止方法时,才真正执行整个流操作)怎么实现的?Spliterator接口负责将数据转成流水线的输入,Sink接口是消费者,遍历消费操作。
  • 定义数据源,开启流水线:Collection的stream(),实现了Spliterator接口,返回构造出ReferencePipeline.Head实例
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
  • 传入的抽象方法实现,会根据有无状态,构造出StatelessOp或 StatefulOp。一步步构成操作的双向链表,同时并没有实际执行操作。实际操作中,AbstractPipeline双头链表保存前后操作和头结点,方便同一元素进行操作。ChainedReference执行操作流的执行/停止。
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
   @Override
   public final Stream<P_OUT> distinct() {
       return DistinctOps.makeRef(this);
   }

static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
       return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                     StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
……
  • 终止方法,构造出TerminalOp,同时发起ReferencePipeline.evaluate,开始惰性执行
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
// **并行与否**
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))//ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
       @Override
       public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                          Spliterator<S> spliterator) {
           return helper.wrapAndCopyInto(this, spliterator).get();
       }
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
// 从最后操作往前,包成新的Sink,复合combinedFlags
   final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
       Objects.requireNonNull(sink);

       for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
           sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
       }
       return (Sink<P_IN>) sink;
   }


// 第二个操作的节点,判断combinedFlags,如果短路了,即停止,然后重新反向执行?
// 未短路,执行此次的新Sink反向流的begin,依次执行操作的,递归发起下一个元素的
   @Override
   final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
       Objects.requireNonNull(wrappedSink);

       if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
           wrappedSink.begin(spliterator.getExactSizeIfKnown());// 流水ok,你准备一下
           spliterator.forEachRemaining(wrappedSink);// Collection实现spliterator,开始执行遍历元素,递归执行
           wrappedSink.end();
       }
       else {
           copyIntoWithCancel(wrappedSink, spliterator);
       }
   }

// ArrayList实现
public void forEachRemaining(Consumer<? super E> action) {
           int i, hi, mc; // hoist accesses and checks from loop
           ArrayList<E> lst; Object[] a;
           if (action == null)
               throw new NullPointerException();
           if ((lst = list) != null && (a = lst.elementData) != null) {
               if ((hi = fence) < 0) {
                   mc = lst.modCount;
                   hi = lst.size;
               }
               else
                   mc = expectedModCount;
               if ((i = index) >= 0 && (index = hi) <= a.length) {
                   for (; i < hi; ++i) {
                       @SuppressWarnings("unchecked") E e = (E) a[i];
                       action.accept(e);//各个操作通过Sink接口accept方法依次向下传递执行。
                   }
                   if (lst.modCount == mc)
                       return;
               }
           }
           throw new ConcurrentModificationException();
       }

4.泛型的使用

  • 自己套自己public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>            extends Spliterator<T> {
    
interface Builder<T> extends Sink<T> {

        /**
         * Builds the node.  Should be called after all elements have been
         * pushed and signalled with an invocation of {@link Sink#end()}.
         *
         * @return the resulting {@code Node}
         */
        Node<T> build();

        /**
         * Specialized @{code Node.Builder} for int elements
         */
        interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
            @Override
            Node.OfInt build();
        }
Lists.<Person>newArrayList().stream()
        .collect(Collectors.groupingBy(Person::getType, HashMap::new, Collectors.toList()));

5.操作码是如何标识的。位的应用。sourceOrOpFlags

6.设计模式

责任链模式,一个接一个处理事件。

7.典型使用
collector&Collectors

 Map<Integer, Optional<String>> collect = ct.stream()
                    .filter(i -> !Strings.isNullOrEmpty(i.getContent()))
                    .collect(Collectors.groupingBy(ReportRstInfo::getFeatureClassIdx,
                            Collectors.mapping(ReportRstInfo::getContent, Collectors.reducing((o, o2) -> o))));

                            
 Map<String, List<DiagnosisReportDB>> monthResults = diagnosisReportDBS.stream().collect(Collectors.groupingBy(
                function, toSortedList(Comparator.comparing((DiagnosisReportDB::getRecordTime)).reversed())));
                
    private static <T> Collector<T, ?, List<T>> toSortedList(Comparator<? super T> c) {
        return Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(c)), ArrayList::new);
    }
    
monthResults = monthResults.entrySet().stream().sorted(Collections.reverseOrder((o1, o2) ->
                            (int) (TimeUtils.parseTime(o1.getKey(), YYYYMM_WITH_SPLIT_CHINESE) / 1000 -
                                    TimeUtils.parseTime(o2.getKey(), YYYYMM_WITH_SPLIT_CHINESE) / 1000)))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
                            (s, s2) -> s, LinkedHashMap::new));

知识迁移

1.并行的ForkJoinPool与多线程的关系,分治的思想如何有序拼接数据,如并行排序?

REF


[原来你是这样的 Stream —— 浅析 Java Stream 实现原理](https://zhuanlan.zhihu.com/p/47478339
Stream的reduce及Collect使用、Collector4接口解释
Collector_复杂多示例
student示例、Collectors静态方法
java stream groupingby分组后排序重构value - 简书
java中对Map进行排序的方法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,826评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,968评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,234评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,562评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,611评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,482评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,271评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,166评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,608评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,814评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,926评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,644评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,249评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,866评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,991评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,063评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,871评论 2 354

推荐阅读更多精彩内容