函数式编程(四)stream源码基础-Spliterator、Fork/Join机制

函数式编程(一) lambda、FunctionalInterface、Method Reference
函数式编程(二) Stream
函数式编程(三) 类型擦除与堆污染、Collector接口与Collectors剖析

Spliterator

Spliterator接口是Java1.8新增接口,不同Iterator的顺序迭代,Spliterator是可分割的迭代器,可以利用多线程对数据进行并行处理提高效率。Spliterator接口有三个重要接口,tryAdvance、forEachRemaining和trySplit。JavaDoc里关于三个接口的简单说明:A Spliterator may traverse elements individually (tryAdvance()) or sequentially in bulk (forEachRemaining()). A Spliterator may also partition off some of its elements (using trySplit) as another Spliterator, to be used in possibly-parallel operations.
Spliterator 声明了 关于它的结构、源、元素的特征(characteristics),包含以下几种:

特征值 释义
ORDERED 表示元素定义的顺序,比如链表List是有序的,而HashSet是无序的
DISTINCT 表示元素都是唯一的,对于任意的两个不同元素x,y都满足x!=y
SORTED 表示集合里的元素是按照大小排序的,注意与ORDERED的区别比如SortedSet是SORTED
SIZED 表示在遍历或分割之前从estimateSize()返回的值表示有限大小,在没有结构源修改的情况下,表示完整遍历所遇到的元素数量的精确计数
NONNULL 表示集合里的元素是非空的
IMMUTABLE 表示在遍历的过程中不能添加、替换、删除元素
CONCURRENT 表示元素可以被多个线程安全并发得修改而不需要外部的同步
SUBSIZED 表示trySplit()返回的结果都是SIZED和SUBSIZED,即该分割器的直接或者非直接的子孙迭代器都是SIZED

下面在阐述Spliterator接口时,会结合具体实现类ArraySpliterator进行阐述。

 class ArraySpliterator<T> implements Spliterator<T> {
        private final Object[] array;
        private int index;        // current index, modified on advance/split
        private final int fence;  // one past last index
        private final int characteristics;

        //用于root Spliterator的构建
        public ArraySpliterator(Object[] array, int additionalCharacteristics) {
            this(array, 0, array.length, additionalCharacteristics);
        }
        //用于trySplit中构建子分割器
        public ArraySpliterator(Object[] array, int origin, int fence, int additionalCharacteristics) {
            this.array = array;
            this.index = origin;
            this.fence = fence;
            this.characteristics = additionalCharacteristics | Spliterator.SIZED | Spliterator.SUBSIZED;
        }

-tryAdvance

boolean tryAdvance(Consumer<? super T> action);

如果剩余元素存在,则执行给定的操作,并返回true; 否则返回false。 如果此Spliterator是有序的(ORDERED),则按照遇到的顺序对下一个元素执行操作。 异常被转发给调用者。
ArraySpliterator的tryAdvance()如下:(对当前节点执行action操作,然后游标向后移动)

        @Override
        public boolean tryAdvance(Consumer<? super T> action) {
            if (action == null)
                throw new NullPointerException();
            if (index >= 0 && index < fence) {
                @SuppressWarnings("unchecked") T e = (T) array[index++];
                action.accept(e);
                return true;
            }
            return false;
        }

-forEachRemaining

void forEachRemaining(Consumer<? super T> action);

在当前线程中串行对剩余元素执行action操作,直到所有元素都被处理或抛出异常。 如果Spliterator是ORDERED,则按相关顺序执行操作。 异常被转发给调用者。

    //默认实现,不断调用tryAdvance
    default void forEachRemaining(Consumer<? super T> action) {
        do { } while (tryAdvance(action));
    }

ArraySpliterator的forEachRemaining()如下:

        @Override
        public void forEachRemaining(Consumer<? super T> action) {
            Object[] a; int i, hi; // hoist accesses and checks from loop
            if (action == null)
                throw new NullPointerException();
            if ((a = array).length >= (hi = fence) &&
                (i = index) >= 0 && i < (index = hi)) {
                do { action.accept((T)a[i]); } while (++i < hi);
            }
        }

-estimateSize

long estimateSize();

该接口是返回forEachRemaining遍历所遇到的元素数量的估计值,如果为无穷大,未知数或计算成本太高,则返回Long.MAX_VALUE。
在两种场景下estimateSize必须返回精确计数:

  1. Spliterator是SIZED,并且还没有被部分地遍历或拆分(结合ArraySpliterator的tryAdvance、trySplit、estimateSize理解后半句的意义,因为遍历或者拆分后游标发生变化)
  2. Spliterator是SUBSIZED,并且还没有被部分地遍历

其余情况下该接口返回的值不一定是准确计数,但即便是不精确的估计也通常是有用的。以近似平衡的二叉树为例:

  1. 子分割器estimateSize()可以估计为其父代的数量的一半的值
  2. 如果root分割器没有准确的计数,则可以估计为对应于其最大深度的两倍。
    ArraySpliterator的estimateSize()如下:
        @Override
         public long estimateSize() { return (long)(fence - index); }

-trySplit

Spliterator<T> trySplit();

该接口为Spliterator最核心的接口,其作用是对当前分割器Spliterator(为描述清晰,假设命名为A)进行分割,返回值为其子分割器Spliterator(假设命名为B),此时A,B的关系如下图:可以认为一个分割器Spliterator调用一次trySplit()就分裂为两个分割器Spliterator,这两个分割器Spliterator可以对源集合进行并行处理,当然该两个分割器Spliterator仍然可以调用trySplit()进行继续分割,当分割器Spliterator不能继续分割,则返回null。(除非源Spliterator包含无穷个元素,否则trySplit()的重复调用必须最终返回null。一个理想的trySplit将集合平分为两块以便平衡的并行处理,trySplit分割的较大偏差通常会导致较差的并行性能)

image.png

  1. A分割器的estimateSize()的值必须大于或等于A'或B分割器的estimateSize的值。
  2. 如果A‘和B是SUBSIZED,则A的estimateSize()值等于A‘和B的estimateSize()之和。
    ArraySpliterator的trySplit()如下:(二分法trySplit,调用trySplit后,源Spliterator会产生一个新的Spliterator,并且源Spliterator的区间也会发生变化)
        @Override
        public Spliterator<T> trySplit() {
            int lo = index, mid = (lo + fence) >>> 1;
            return (lo >= mid)
                   ? null
                   : new ArraySpliterator<>(array, lo, index = mid, characteristics);
        }  

Fork/Join框架

-并行与并发

在介绍Fork/Join框架前,首先介绍一下并发与并行的区别:
Concurrency is when two tasks can start, run, and complete in overlapping time periods. Parallelism is when tasks literally run at the same time, eg. on a multi-core processor.
Concurrency is the composition of independently executing processes, while parallelism is the simultaneous execution of (possibly related) computations.
Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.

并发的关键是有处理多个任务的能力,不一定要同时(时间上交错执行)。 并行的关键是利用多核技术有同时处理多个任务的能力。

-分治问题

解决经典的"分治"问题,任务通常被分解为多个任务块,而后每一任务块被\color{red}{独立}并行计算,一旦计算任务完成,结果会被合并。(这个过程往往不需要非常强的同步机制)

"分治"问题可以使用ExecutorService和Callable解决,但Callable实例在本质上是阻塞的。一旦一个Callable实例开始执行,其他所有Callable都会被阻塞。而Fork/Join框架被引入来解决这一并行问题,而Executor解决的是并发问题。

Fork/Join框架是用于并行执行任务的框架,其将一个大任务进行拆分(fork) 成若干个子任务(拆到不能再拆),再将一个个小任务的结果进行join汇总。

-Fork/Join引入背景

Fork/Join是获得良好的并行性能的最简单高效的设计技术。是分治算法的的并行实现,它的典型应用形式:

Result solve(Problem problem) {
  if (problem is small)
    directly solve problem
  else {
    split problem into independent parts
    fork new subtasks to solve each part
    join all subtasks
    compose result from subresults
    }
}

Doug Lea关于ForkJoin的论文: http://gee.cs.oswego.edu/dl/papers/fj.pdf
在这篇文章中,Doug Lea阐述了设计的考虑,java.lang.Thread类(也包括POSIX pthread)对Fork/Join程序来说并不是最优的选择,主要有以下两方面的原因:

  1. Fork/Join任务对同步的要求较简单,对常规的线程来说,Fork/Join任务可以使用更加灵活的调度策略。例如,Fork/Join任务除了等待子任务外,其他情况下是不需要阻塞的。因此传统的用于跟踪记录阻塞线程的代价是一种浪费。
  2. 构建和管理一个线程的代价有时甚至比任务执行本身所花费的代价更大。

即:标准线程框架对于Fork/Join程序来说是一种资源浪费。

-Fork/Join设计

  1. 创建了一个worker线程的线程池。每个工作线程(“重”线程)处理队列中的任务,通常工作线程数和CPU核心一样多。
  2. Fork/Join任务都是轻量级可执行类,它们不是Thread实例。
  3. work−stealing机制
  4. 接口简单:ForkJoinPool

-work−stealing机制

The heart of a fork/join framework lies in its lightweight scheduling mechanics.

  1. Each worker thread maintains runnable tasks in its own scheduling queue.
  2. Queues are maintained as double−ended queues, supporting both LIFO push and pop operations, as well as a FIFO take operation.
  3. Subtasks generated in tasks run by a given worker thread are pushed onto that workers own deque.(对一个特定的worker线程,任务所产生的子任务将会被放入到该worker线程自己的双端队列中)
  4. Worker threads process their own deques in \color{red}{LIFO}(youngest−first) order, by \color{blue}{popping} tasks.(工作线程使用后进先出的顺序,pop方法)
  5. When a worker thread has no local tasks to run, it attempts to \color{blue}{take } ("steal") a task from another randomly chosen worker, using a \color{red}{FIFO}(oldest first) rule.(使用先进先出的顺序偷任务,take方法)
  6. When a worker thread encounters a join operation, it processes other tasks, if available, until the target task is noticed to have completed. All tasks otherwise run to completion without blocking.


    steal.png

    Fork/Join框架采用后进先出(LIFO)处理每个工作线程的自己任务,使用先进先出(FIFO) 窃取别的任务,这样窃取任务的线程从任务队列拥有者相反的方向来操作任务队列,可以减少线程之间的竞争。

-ForkJoinPool

ForkJoinPool是ExecutorService的实现类,本质是一种特殊的线程池。ForkJoinPool两个常用的构造器:

    /**
     * Creates a {@code ForkJoinPool} with parallelism equal to {@link
     * java.lang.Runtime#availableProcessors}, using the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
    /**
     * Creates a {@code ForkJoinPool} with the indicated parallelism
     * level, the {@linkplain
     * #defaultForkJoinWorkerThreadFactory default thread factory},
     * no UncaughtExceptionHandler, and non-async LIFO processing mode.
     *
     * @param parallelism the parallelism level
     * @throws IllegalArgumentException if parallelism less than or
     *         equal to zero, or greater than implementation limit
     * @throws SecurityException if a security manager exists and
     *         the caller is not permitted to modify threads
     *         because it does not hold {@link
     *         java.lang.RuntimePermission}{@code ("modifyThread")}
     */
    public ForkJoinPool(int parallelism) {
        this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
    }

ForkJoinPool有如下三个方法启动线程:

场景 客户端非fork/join调用 内部调用fork/join
异步执行 execute(ForkJoinTask) ForkJoinTask.fork
等待获取结果 invoke(ForkJoinTask) ForkJoinTask.invoke
执行,获取Future submit(ForkJoinTask) ForkJoinTask.fork

-ForkJoinTask

ForkJoinTask三个核心方法:
fork():在任务执行过程中将大任务划分为多个小的子任务,调用子任务的fork()方法可以将任务放到线程池中异步调度。
join():调用子任务的join()方法等待任务返回的结果。这个方法类似于Thread.join(),区别在于前者不受线程中断机制的影响。
invoke():在当前线程同步执行该任务。不受中断机制影响。

ForkJoinTask 是一个抽象类,它有两个抽象子类:RecurisiveTask和RecurisiveAction。
RecurisiveTask<T>代表有返回值的任务。T是返回值的类型。
RecurisiveAction代表没有返回值的任务。

并行流parallelStream就是通过使用ForkJoinPool可以提高多线程任务的处理速度。

在写的过程中发现内容越写越多,而下一篇文章就开始阐述Stream的源码,希望能坚持下去。

WalkeR_ZG

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容