Java8学习笔记之并行数据处理

在Java 7之前,并行处理数据集合非常麻烦。第一,你得明确地把包含数据的数据结构分成若干子部分。第二,你要给每个子部分分配一个独立的线程。第三,你需要在恰当的时候对它们进行同步来避免不希望出现的竞争条件,等待所有线程完成,最后把这些部分结果合并起来。Java 7引入了一个叫作分支/合并的框架,让这些操作更稳定、更不易出错。在Java 8中可以使用Stream接口更容易的对数据集执行并行操作。

1、并行流

通过对收集源调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。

示例:返回从1到给定参数的所有数字的和

public static long getSum(long n) {

    return Stream.iterate(1L, i -> i + 1) //生成自然数无限流

        .limit(n) //限制到前n个数

        .reduce(0L, Long::sum); //对所有数字求和来归纳流

}

上面代码等价于:

public static long getSum(long n) { 

    long result = 0; 

    for (long i = 1L; i <= n; i++) { result += i; } 

    return result;

}

将顺序流转换为并行流

对顺序流调用parallel方法,转换为并行流:

public static long parallelGetSum(long n) {

    return Stream.iterate(1L, i -> i + 1)

        .limit(n)

        .parallel() //将流转换为并行流

        .reduce(0L, Long::sum);

}

并行归纳操作

注意:对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。你只需要对并行流调用sequential方法就可以把它变成顺序流。你可能以为把这两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如,你可以这样做:

stream.parallel()

    .filter(...)

    .sequential()

    .map(...)

    .parallel()

    .reduce();

实际上最后一次parallel或sequential的调用会影响整个流水线。

并行流内部使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。你可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来改变线程池大小,如下所示:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");

这是一个全局设置,它将影响代码中所有的并行流,目前还无法专为某个并行流指定这个值。一般情况下建议你不要修改此值,除非有充足的理由。

测量流性能

测量对前n个自然数求和的函数的性能:

public long measureSumPerf(Function<Long, Long> adder, long n) {

    long fastest = Long.MAX_VALUE;

    for (int i = 0; i < 10; i++) {

        long start = System.nanoTime();

        long sum = adder.apply(n);

        long duration = (System.nanoTime() - start) / 1_000_000;

        System.out.println("Result: " + sum);

        if (duration < fastest) fastest = duration;

    }

    return fastest;

}

System.out.println("Sequential sum done in:" +  measureSumPerf(ParallelStreams::getSum, 10_000_000) + " msecs"); //测试顺序加法器函数对前一千万个自然数求和耗时

System.out.println("Parallel sum done in: " + measureSumPerf(ParallelStreams::parallelGetSum, 10_000_000) + " msecs" );//测试并行加法器函数对前一千万个自然数求和耗时

注意:这个运行结果会存在差异性,因为影响执行时间的因素有很多,比如你的电脑支持多少个内核。用传统for循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱或拆箱操作。

实际执行结果是并行版本比顺序版本要慢很多,原因是:iterate生成的是装箱的对象,必须拆箱成数字才能求和;iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次应用的结果。把流标记成并行,其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

如何利用多核处理器,用流来高效地执行并行?

可以使用LongStream.rangeClosed的方法,相比iterate于其优点是:直接产生原始类型的long数字,没有装箱拆箱的开销。会生成数字范围,很容易拆分为独立的小块。

测试代码:

public static long rangedSum(long n) {

    return LongStream.rangeClosed(1, n).reduce(0L, Long::sum);

}

public static long parallelRangedSum(long n) {

    return LongStream.rangeClosed(1, n).parallel().reduce(0L, Long::sum);

}

测试方法:

System.out.println("Parallel range sum done in:" +    measureSumPerf(ParallelStreams::parallelRangedSum, 10_000_000) + " msecs");

结果是得到了一个比顺序执行更快的并行归纳。表明使用正确的数据结构然后使其并行工作能够保证最佳的性能。

注意:并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。总而言之,很多情况下不可能或不方便并行化。在使用并行Stream加速代码之前,你必须确保用得对;

流的数据源和可分解性

2、分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

1)使用RecursiveTask

要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型(它可能会更新其他非局部机构)。要定义RecursiveTask,只需实现它唯一的抽象方法compute:

protected abstract R compute();

这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

递归的任务拆分过程

运行ForkJoinSumCalculator

当把ForkJoinSumCalculator任务传给ForkJoinPool时,这个任务就由池中的一个线程执行,这个线程会调用任务的compute方法。该方法会检查任务是否小到足以顺序执行,如果不够小则会把要求和的数组分成两半,分给两个新的ForkJoinSumCalculator,而它们也由ForkJoinPool安排执行。这一过程可以递归重复,把原任务分为更小的任务,直到满足不方便或不可能再进一步拆分的条件。这时会顺序计算每个任务的结果,然后由分支过程创建的(隐含的)任务二叉树遍历回到它的根。接下来会合并每个子任务的部分结果,从而得到总任务的结果。

分支/合并算法

2)使用分支/合并框架的最佳做法

-对一个任务调用join方法会阻塞调用方,直到该任务做出结果。因此,有必要在两个子任务的计算都开始之后再调用它。否则,你得到的版本会比原始的顺序算法更慢更复杂,因为每个子任务都必须等待另一个子任务完成才能启动。

-不应该在RecursiveTask内部使用ForkJoinPool的invoke方法。相反,你应该始终直接调用compute或fork方法,只有顺序代码才应该用invoke来启动并行计算。

-对子任务调用fork方法可以把它排进ForkJoinPool。同时对左边和右边的子任务调用它似乎很自然,但这样做的效率要比直接对其中一个调用compute低。这样做你可以为其中一个子任务重用同一线程,从而避免在线程池中多分配一个任务造成的开销。

-调试使用分支/合并框架的并行计算可能有点棘手。特别是你平常都在IDE里面看栈跟踪(stack trace)来找问题,但放在分支-合并计算上就不行了,因为调用compute的线程并不是概念上的调用方,后者是调用fork的那个。

-和并行流一样,不应想当然地认为在多核处理器上使用分支/合并框架就比顺序计算快。一个任务可以分解成多个独立的子任务,才能让性能在并行化时有所提升。所有这些子任务的运行时间都应该比分出新任务所花的时间长;一个惯用方法是把输入/输出放在一个子任务里,计算放在另一个里,这样计算就可以和输入/输出同时进行。此外,在比较同一算法的顺序和并行版本的性能时还有别的因素要考虑。就像任何其他Java代码一样,分支/合并框架需要“预热”或者说要执行几遍才会被JIT编译器优化。这就是为什么在测量性能之前跑几遍程序很重要。另外,编译器内置的优化可能会为顺序版本带来一些优势。

-你必须选择一个标准,来决定任务是要进一步拆分还是已小到可以顺序求值。

3)工作窃取

一般来说分出大量的小任务都是一个好选择。在理想情况下,划分并行任务时,应该让每个任务都用完全相同的时间完成,让所有的CPU内核都同样繁忙。但实际中,每个子任务所花的时间可能天差地别,要么是因为划分策略效率低,要么是有不可预知的原因,比如磁盘访问慢,或是需要和外部服务协调执行。

分支/合并框架用一种称为工作窃取(work stealing)的技术来解决这个问题。在实际应用中,这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。某个线程可能早早完成了分配给它的所有任务,它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程之间平衡负载。 这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。

分支/合并框架使用的工作窃取算法

3、可分迭代器Spliterator

Spliterator是Java 8中加入的另一个新接口,含义是可分迭代器。和Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。Java 8已经为集合框架中包含的所有数据结构提供了一个 默认的Spliterator实现。集合实现了Spliterator接口,接口提供了一个spliterator方法。

Spliterator接口定义:

public interface Spliterator<T> {

    boolean tryAdvance(Consumer<? super T> action);

    Spliterator<T> trySplit();

    long estimateSize();

    int characteristics();

}

T是Spliterator遍历的元素的类型。

tryAdvance方法的行为类似于普通的Iterator,它按顺序一个个使用Spliterator中的元素,如果还有其他元素要遍历就返回true。

trySplit是专为Spliterator接口设计的,它可以把一些元素划分给第二个Spliterator(由该方法返回),让它们两个并行处理。

estimateSize方法可以估计还剩下多少元素要遍历,即使不那么确切,能快速算出来是一个值 也有助于让拆分均匀一点。

1)拆分过程

将Stream拆分成多个部分的算法是一个递归过程。

递归拆分过程

第一步:对第一个Spliterator调用trySplit,生成第二个Spliterator。

第二步:对这两个Spliterator调用trysplit,这样就有了四个Spliterator。

第三步:不断对Spliterator调用trySplit直到返回null,表明它处理的数据结构不能再分割。

最后:这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null。

Spliterator的特性

Spliterator接口声明的最后一个抽象方法是characteristics,它将返回一个int,代表Spliterator本身特性集的编码。

Spliterator的特性

2)实现自己的Spliterator

开发一个简单的方法来计数一个String中的单词数。

public int countWordsIteratively(String s) {

    int counter = 0;

    boolean lastSpace = true;

    for (char c : s.toCharArray()) {

        if (Character.isWhitespace(c)) {

            lastSpace = true;

        } else {

            if (lastSpace) counter++; //上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一

            lastSpace = false;

        }

    }

    return counter;

}

以函数式风格重写单词计数器:

首先需要把String转换成一个流。但是原始类型的流仅限于int、long和double, 所以只能用Stream<Character>:

Stream<Character> stream = IntStream.range(0, SENTENCE.length())     .mapToObj(SENTENCE::charAt);

你可以对这个流做归约来计算字数。在归约流时,需要保留由两个变量组成的状态:一个int

用来计算到目前为止数过的字数,还有一个boolean用来记得上一个遇到的Character是不是空 格。你需要创建一个新类WordCounter来把这个状态封装起来。

用来在遍历Character流时计数的类:

class WordCounter {

    private final int counter;

    private final boolean lastSpace;

    public WordCounter(int counter, boolean lastSpace) {

        this.counter = counter;

        this.lastSpace = lastSpace;

    }

      public WordCounter accumulate(Character c) {//和迭代算法一样,accumulate方法一个个遍历Character

        if (Character.isWhitespace(c)) {

            return lastSpace ? this : new WordCounter(counter, true);

        } else {

            return lastSpace ? new WordCounter(counter + 1, false) : this; //上一个字符是空格,而当前遍历的字符不是空格时,将单词计数器加一

        }

    }

    //合并两个WordCounter,把其计数器加起来

    public WordCounter combine(WordCounter wordCounter) { 

        return new WordCounter(counter + wordCounter.counter, wordCounter.lastSpace);//仅需要计数器的总和,无需关心lastSpace

    }

    public int getCounter() {return counter;}

}

accumulate方法定义了如何更改WordCounter的状态,确切地说是用哪个状态来建立新的WordCounter,因为这个类是不可变的。每次遍历到Stream中的一个新的Character时,就会调用accumulate方法。就如countWords- Iteratively方法一样,当上一个字符是空格,新字符不是空格时,计数器就加一。

调用第二个方法combine时,会对作用于Character流的两个不同子部分的两个WordCounter的部分结果进行汇总,也就是把两个WordCounter内部的计数器加起来。 

遍历到新的Character c时WordCounter的状态转换

归约Character流:

private int countWords(Stream<Character> stream) {

    WordCounter wordCounter = stream.reduce(

        new WordCounter(0, true), WordCounter::accumulate, WordCounter::combine);

    return wordCounter.getCounter();

}

调用方法:

Stream<Character> stream = IntStream.range(0, SENTENCE.length())

    .mapToObj(SENTENCE::charAt);

System.out.println("Found " + countWords(stream) + " words");

让WordCounter并行工作:

在并行执行时必须要确保String不是在随机位置拆开的,而只能在词尾拆开。需要为Character实现一个Spliterator,它只能在两个词之间拆开String,然后由此创建并行流。

class WordCounterSpliterator implements Spliterator<Character> {

    private final String string;

    private int currentChar = 0;

    public WordCounterSpliterator(String string) {this.string = string;}

    //把String中当前位置的Character传给了Consumer,并让位置加一,如果新的指针位置小于String的总长,且还有要遍历的Character,则 tryAdvance返回true。

    @Override   

    public boolean tryAdvance(Consumer<? super Character> action) {

        action.accept(string.charAt(currentChar++));//处理当前字符

        return currentChar < string.length(); //如果还有字符要处理,则返回true

    }

    //定义拆分要遍历的数据结构的逻辑

    @Override

    public Spliterator<Character> trySplit() {

        int currentSize = string.length() - currentChar;

        if (currentSize < 10) {return null;}

        for (int splitPos = currentSize / 2 + currentChar;splitPos < string.length(); splitPos++) { //将试探拆分位置设定为要解析的String的中间

            if (Character.isWhitespace(string.charAt(splitPos))) { //让拆分位置前进直到下 一个空格

                Spliterator<Character> spliterator =

                    new WordCounterSpliterator(string.substring(currentChar, splitPos));//创建一个新WordCounterSpliterator来解析String从开始到拆分位置的部分

                currentChar = splitPos;//将这个WordCounterSpliterator 的起始位置设为拆分位置

                return spliterator;

            }

        }

        return null;

    }

    //Spliterator解析的String的总长度和当前遍历的位置的差

    @Override

    public long estimateSize() {return string.length() - currentChar;}

    @Override

    public int characteristics() {

        return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;

    }

}

tryAdvance方法把String中当前位置的Character传给了Consumer,并让位置加一。作为参数传递的Consumer是一个Java内部类,在遍历流时将要处理的Character传给了一系列要对其执行的函数。

trySplit方法是Spliterator中最重要的一个方法,它定义了拆分要遍历的数据结构的逻辑。就像分支/合并的例子那样,你肯定要用更高的下限来避免生成太多的任务。如果剩余的Character数量低于下限,你就返回null表示无需进一步拆分。相 反,如果你需要执行拆分,就把试探的拆分位置设在要解析的String块的中间。一旦找到了适当的拆分位置,就可以创建一个新的Spliterator来遍历从当前位置到拆分位置的子串;把当前位置this设为拆分位置,因为之前的部分将由新 Spliterator来处理,最后返回。

还需要遍历的元素的estimatedSize就是这个Spliterator解析的String的总长度和当前遍历的位置的差。

最后,characteristic方法告诉框架这个Spliterator是ORDERED(顺序就是String中各个Character的次序)、SIZED(estimatedSize方法的返回值是精确的) 、 SUBSIZED(trySplit方法创建的其他Spliterator也有确切大小)、NONNULL(String中不能有为null的Character)和IMMUTABLE的(在解析String时不能再添加Character,因为String本身是一个不可变类)。

运用WordCounterSpliterator:

Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE); Stream<Character> stream = StreamSupport.stream(spliterator, true); System.out.println("Found " + countWords(stream) + " words");

    --以上示例摘自《Java8实战》

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容