Stream

学科归属&背景

  • Java8新特性,能够避免多重循环和反复遍历,导致有许多中间变量,增加内存的消耗。适用于统计/变换等场景。

侧重点/目标

1.中间的操作结果存在哪里?内存的消耗情况怎么样?

  • stream的实现不是对所有的数据依次进行单一操作,而是对一个元素遍历所有操作。内存消耗相比于迭代,大大降低。

2.中间操作的状态&短路是指什么

  • 有状态,依赖上一个操作的结果,不能并行?如,sorted
@Override
 public void end() {
     list.sort(comparator);
     downstream.begin(list.size());
     if (!cancellationWasRequested) {
         list.forEach(downstream::accept);
     }
     else {
         for (T t : list) {
             if (downstream.cancellationRequested()) break;
             downstream.accept(t);//等中间缓存的排序结果都ok end了,对排序后的结果,重新发起后续的操作
         }
     }
     downstream.end();
     list = null;
 }
  • 无状态,操作之间无关联。
  • 短路:执行到这个操作,这个元素有可能就被剔除,不能进行后续的操作了,类似continue。如,filter。=》合理布置stream操作顺序,可以减少计算量。

3.传入的抽象方法实现,是存在哪里?是由哪个类怎么执行的?如何短路执行(中间取消)?

  • 惰性执行(调用终止方法时,才真正执行整个流操作)怎么实现的?Spliterator接口负责将数据转成流水线的输入,Sink接口是消费者,遍历消费操作。
  • 定义数据源,开启流水线:Collection的stream(),实现了Spliterator接口,返回构造出ReferencePipeline.Head实例
    default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }
  • 传入的抽象方法实现,会根据有无状态,构造出StatelessOp或 StatefulOp。一步步构成操作的双向链表,同时并没有实际执行操作。实际操作中,AbstractPipeline双头链表保存前后操作和头结点,方便同一元素进行操作。ChainedReference执行操作流的执行/停止。
    @Override
    public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }
   @Override
   public final Stream<P_OUT> distinct() {
       return DistinctOps.makeRef(this);
   }

static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
       return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
                                                     StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
……
  • 终止方法,构造出TerminalOp,同时发起ReferencePipeline.evaluate,开始惰性执行
    @Override
    public void forEach(Consumer<? super P_OUT> action) {
        evaluate(ForEachOps.makeRef(action, false));
    }

    public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action,
                                                  boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
    }
    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;
// **并行与否**
        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))//ForkJoin框架,将原始数据不断拆分为更小的单元,对每一个单元做上述evaluateSequential类似的动作
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }
       @Override
       public <S> Void evaluateSequential(PipelineHelper<T> helper,
                                          Spliterator<S> spliterator) {
           return helper.wrapAndCopyInto(this, spliterator).get();
       }
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
// 从最后操作往前,包成新的Sink,复合combinedFlags
   final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
       Objects.requireNonNull(sink);

       for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
           sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
       }
       return (Sink<P_IN>) sink;
   }


// 第二个操作的节点,判断combinedFlags,如果短路了,即停止,然后重新反向执行?
// 未短路,执行此次的新Sink反向流的begin,依次执行操作的,递归发起下一个元素的
   @Override
   final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
       Objects.requireNonNull(wrappedSink);

       if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
           wrappedSink.begin(spliterator.getExactSizeIfKnown());// 流水ok,你准备一下
           spliterator.forEachRemaining(wrappedSink);// Collection实现spliterator,开始执行遍历元素,递归执行
           wrappedSink.end();
       }
       else {
           copyIntoWithCancel(wrappedSink, spliterator);
       }
   }

// ArrayList实现
public void forEachRemaining(Consumer<? super E> action) {
           int i, hi, mc; // hoist accesses and checks from loop
           ArrayList<E> lst; Object[] a;
           if (action == null)
               throw new NullPointerException();
           if ((lst = list) != null && (a = lst.elementData) != null) {
               if ((hi = fence) < 0) {
                   mc = lst.modCount;
                   hi = lst.size;
               }
               else
                   mc = expectedModCount;
               if ((i = index) >= 0 && (index = hi) <= a.length) {
                   for (; i < hi; ++i) {
                       @SuppressWarnings("unchecked") E e = (E) a[i];
                       action.accept(e);//各个操作通过Sink接口accept方法依次向下传递执行。
                   }
                   if (lst.modCount == mc)
                       return;
               }
           }
           throw new ConcurrentModificationException();
       }

4.泛型的使用

  • 自己套自己public interface OfPrimitive<T, T_CONS, T_SPLITR extends Spliterator.OfPrimitive<T, T_CONS, T_SPLITR>>            extends Spliterator<T> {
    
interface Builder<T> extends Sink<T> {

        /**
         * Builds the node.  Should be called after all elements have been
         * pushed and signalled with an invocation of {@link Sink#end()}.
         *
         * @return the resulting {@code Node}
         */
        Node<T> build();

        /**
         * Specialized @{code Node.Builder} for int elements
         */
        interface OfInt extends Node.Builder<Integer>, Sink.OfInt {
            @Override
            Node.OfInt build();
        }
Lists.<Person>newArrayList().stream()
        .collect(Collectors.groupingBy(Person::getType, HashMap::new, Collectors.toList()));

5.操作码是如何标识的。位的应用。sourceOrOpFlags

6.设计模式

责任链模式,一个接一个处理事件。

7.典型使用
collector&Collectors

 Map<Integer, Optional<String>> collect = ct.stream()
                    .filter(i -> !Strings.isNullOrEmpty(i.getContent()))
                    .collect(Collectors.groupingBy(ReportRstInfo::getFeatureClassIdx,
                            Collectors.mapping(ReportRstInfo::getContent, Collectors.reducing((o, o2) -> o))));

                            
 Map<String, List<DiagnosisReportDB>> monthResults = diagnosisReportDBS.stream().collect(Collectors.groupingBy(
                function, toSortedList(Comparator.comparing((DiagnosisReportDB::getRecordTime)).reversed())));
                
    private static <T> Collector<T, ?, List<T>> toSortedList(Comparator<? super T> c) {
        return Collectors.collectingAndThen(Collectors.toCollection(() -> new TreeSet<>(c)), ArrayList::new);
    }
    
monthResults = monthResults.entrySet().stream().sorted(Collections.reverseOrder((o1, o2) ->
                            (int) (TimeUtils.parseTime(o1.getKey(), YYYYMM_WITH_SPLIT_CHINESE) / 1000 -
                                    TimeUtils.parseTime(o2.getKey(), YYYYMM_WITH_SPLIT_CHINESE) / 1000)))
                    .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue,
                            (s, s2) -> s, LinkedHashMap::new));

知识迁移

1.并行的ForkJoinPool与多线程的关系,分治的思想如何有序拼接数据,如并行排序?

REF


[原来你是这样的 Stream —— 浅析 Java Stream 实现原理](https://zhuanlan.zhihu.com/p/47478339
Stream的reduce及Collect使用、Collector4接口解释
Collector_复杂多示例
student示例、Collectors静态方法
java stream groupingby分组后排序重构value - 简书
java中对Map进行排序的方法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容