填坑
先来填坑,不知大家还记得我在第五章《串行与并行》中留的坑吗?下面我们就来继续挖它,通过剖析源码,一层一层拨开它的心。
万流之眼 StreamSupport辅助类
为什么只是将parallel标志位设为false或true就可以关闭或开启并行,真正的实现原理是什么呢?我们先来看看集合类的stream方法与parallelStream方法有什么不同,结果可能令人哑然失笑。
//Collection接口中的默认实现
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
default Stream<E> parallelStream() {
return StreamSupport.stream(spliterator(), true);
}
这两个方法都是像我们之前使用Spliterator的时候那样,直接调用了StreamSupport的stream方法,区别只在于最后的并行标志位。类似的代码也出现在了Stream类的of、iterate、generate等方法中,由此可以推断,StreamSupport的stream方法就是是系统类库中产生流的唯一方法,当然这里说的流不包括基本类型流。
追本溯源 Pipeline世家
StreamSupport的stream方法返回了一个ReferencePipeline.Head对象。ReferencePipeline是个抽象类,继承了AbstractPipeline,并实现了其中大多数的方法,而Head是ReferencePipeline的内部类,继承了ReferencePipeline并实现了剩下的两个方法。总结一下就是Head内部类继承了ReferencePipeline抽象类继承了AbstractPipeline抽象类实现了Stream接口,所以Head就是我们最后得到的Stream。
最终实现 Head内部类
Head中对ReferencePipeline留下的两个没有实现的方法的实现看起来很古怪,都是直接抛出了<em style="color:#FF0000;">UnsupportedOperationException</em>,表示不支持该操作。我们也不用关注这些,而是把注意力放在它重写的另外两个方法上,名字大家都很熟悉,forEach和forEachOrdered。这两个方法的重写也很简单,以forEach为例:
if (!isParallel()) {
sourceStageSpliterator().forEachRemaining(action);
}
else {
super.forEach(action);
}
该方法仅仅只是判断一下isParallel,如果是并行的就还是调父类的方法,否则就调用Spliterator的forEachRemaining方法,该方法在Spliterator接口中有默认的实现,只有一行代码:
do { } while (tryAdvance(action));
是不是很绝,什么也不做,等到tryAdvance返回false就结束。tryAdvance我们前面已经讲到,是用来判断分割式迭代是否结束的。也就是说,如果流是串行的,就不会进行分割,这时候Spliterator就变成了普通的迭代器,从头到尾逐个进行操作。
命令执行 TerminalOp众子
对于并行流,Head则会调用父类ReferencePipeline中的同名方法,该方法中又会调用evaluate方法,根据不同的行为如forEach、reduce来选择执行不同的操作。所有的操作类都要实现TerminalOp接口,如forEach操作就对应着ForEachOp类,reduce操作对应着ReduceOp类,这些操作会作为参数传给evaluate方法。evaluate方法最后还是会根据isParallel来选择执行并行操作还是串行操作,两类操作分别对应着需要重写的evaluateParallel方法与evaluateSequentia方法(evaluateParallel有默认实现),两个方法均由操作类来实现,并且都需要传递一个Spliterator参数。
幕后核心 Spliterator接口
不管并行流还是串行流,函数调用的过程中,都无一例外的需要传递一个Spliterator参数。第一次看到这里的时候我觉得很玄,很多时候系统给你提供个类,你不去用,到头来看源码却发现发现原来它才是万物起源。这后面的事情,百川归海,我就不一一细说了,有兴趣的读者可以自行探索。
先遛了
学个屁Java8,哪有多少人用,还总结,总结个屁。我先跑去学Kotlin了,这玩意可比Java8爽多了……