并行数据处理与性能

  • 并行流parallelStream
    并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让这些处理器都忙起来。
    例如:下面的方法,接收数字n作为参数,返回从1到给定参数的所有数字的和

    public static long sequentialSum(long n) {
         return Stream.iterate(1L, i -> i + 1)//生成自然数无限流
                      .limit(n)//限制前n个数
                      .reduce(0L, Long::sum);//对所有数字求和
    

  • iterate,Stream API提供由两个静态方法操你个函数生成流(常见的还有三种方法构建流,由值构建、由数组构建、由文件构建),Stream.iterateStream.generate可以创建无限流,会根据给定的函数按需创建。注意与generate区别:
    generate不是一次对每个新生成的值应用函数,例如以下使用的供应源是无状态的,即它不会在任何地方记录任何值,以备以后计算使用。

    iterate供应源不一定是无状态的。可以创建存储状态的供应源,它可以修改状态,并在为流生成下一个值时使用

     Stream.generate(Math::random)
           .limit(5)
           .forEach(System.out::println);
    

这时,若n特别大的时候,就可以做并行处理,将顺序流转换为并行流:对顺序流调用parallel即可:在处理时,会把Stream在内部分成几块,在不同的块独立并行归纳操作,最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。

  public static long parallelSum(long n) {
       return Stream.iterate(1L, i -> i + 1)
                    .limit(n)
                    .parallel()//将流转换为并行流
                    .reduce(0L, Long::sum);
  } 
image.png

但是,在实际操作时,对顺序流调用parallel方法并不意味着对流有任何实际的变化。它内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。对并行流调用sequential方法就可以把它变成顺序流。将两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如:

  stream.parallel()
        .filter(...)
        .sequential()
        .map(...)
        .parallel()//最后一次parallel或sequential调用会影响整个流水线
        .reduce(); 

并行流默认使用的线程使用了默认的ForkJoinPool,它默认的线程数量就是你的处理器数量。

  • 性能
    实际上,上面利用并行版本会比顺序版本满很多,主要有两方面原因:

    1. iterate生成的是装箱的对象,必须拆箱成数字才能求和;
    2. 很难把iterate分成多个独立块来并行执行,由于某些流操作会比其他车操作更容易并行化,之所以iterate很难被分成独立执行的小块,是因为每次应用这个函数都要依赖前一次应用的结果,也就说,在这种情况下,归纳进程不像图7.1那样进行,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。

    所以说,并行编程可能很复杂,如果用的不对(iterate),会让整体的性能变差。

  • 解决方法
    使用LongStream.rangeClosed方法,和iterate相比有两个优点:

    1. LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的开销。

    2. LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。

      public static long parallelRangedSum(long n) {
           return LongStream.rangeClosed(1, n)
                            .parallel()
                            .reduce(0L, Long::sum);
       } 
      

并行化是需要付出代价的,并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。


  • 正确使用并行流
    上面直接使用并行化的错误原因就是,使用的算法改变了某些共享状态。
    1. 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStreamLongStreamDoubleStream)来避免这种操作,
    2. 有些操作本身在并行流上的性能就比顺序流差。特别是limitfindFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。
    3. 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
    4. 要考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。
    5. 还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。

根据可分解性总结的一些流数据源适不适合并行

可分解性
ArrayList 极佳
LinkedList
IntStream.range 极差
Stream.iterate
HashSet
TreeSet

分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

  1. 使用RecursiveTask
    把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型,则需要实现它的抽象方法compute:
    protected abstract R compute();
    这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。

     if (任务足够小或不可分) {
         顺序计算该任务
     } else {
          将任务分成两个子任务
          递归调用本方法,拆分每个子任务,等待所有子任务完成
          合并每个子任务的结果
     } 
    

例如:利用分支/合并框架执行并行求和

public class ForkJoinSumCalculator
             extends java.util.concurrent.RecursiveTask<Long> {

     private final long[] numbers;
     private final int start;
     private final int end;

     public static final long THRESHOLD = 10_000;//不再将任务分成子任务的大小

     public ForkJoinSumCalculator(long[] numbers) {
           this(numbers, 0, numbers.length);
     }

     private ForkJoinSumCalculator(long[] numbers, int start, int end) {
           this.numbers = numbers;
           this.start = start;
           this.end = end;
     }

   @Override
   protected Long compute() {
         int length = end - start;
         if (length <= THRESHOLD) {//如果大小小于或等于阈值,顺序计算结果
               return computeSequentially();
         }
         ForkJoinSumCalculator leftTask =
                   new ForkJoinSumCalculator(numbers, start, start + length/2);
         leftTask.fork();//利用另一个ForkJoinPool线程异步执行新创建的子任务
         ForkJoinSumCalculator rightTask =
                   new ForkJoinSumCalculator(numbers, start + length/2, end);//创建一个任务为数组的后一半数组求和
         Long rightResult = rightTask.compute();//同步执行第二个子任务,有可能允许进一步递归划分
         Long leftResult = leftTask.join();//读取第一个子任务的结果,如果尚未完成就等待
         return leftResult + rightResult;
   }

    //在子任务不再可分时计算结果的简单算法
   private long computeSequentially() {
         long sum = 0;
         for (int i = start; i < end; i++) {{
             sum += numbers[i];
         }
       return sum;
   }
} 

把数字数组传给ForkJoinSumCalculator的构造函数,即可实现并行对前n个自然数求和:

public static long forkJoinSum(long n) {
     long[] numbers = LongStream.rangeClosed(1, n).toArray();
     //ForkJoinTask为RecursiveTask的父类
     ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
     return new ForkJoinPool().invoke(task);//invoke用来执行某个对象的目标方法
} 
  • 工作窃取:
    分支/合并框架工程用一种称为工作窃取(work stealing)用于在线程池中的工作线程之间重新分配和平衡任务。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空,有助于更好地在工作线程之间平衡负载。

    image.png

  • Spliterator接口
    Iterator一样,Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。

    public interface Spliterator<T> {
         boolean tryAdvance(Consumer<? super T> action);//类似于Iterator,按顺序一个一个使用Spliterator中的元素,如果有其他元素需要遍历就返回true
         Spliterator<T> trySplit();//可以把一些元素划出去分给第二个Spliterator,让他们并行处理
         long estimateSize();//估计剩下还有多少元素需要遍历
         int characteristics();//代表Spliterator本身特征集的编码。可以用这些特征来更好地控制和优化它的使用。
    } 
    

    Stream拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个Spliterator调用trySplit,生成第二个Spliterator。第二步对这两个Spliterator调用trysplit,这样总共就有了四个Spliterator。这个框架不断对Spliterator调用trySplit直到它返回null,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator在调用trySplit时都返回了null

image.png

//7.3.2自定义Spliterator

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,384评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,845评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,148评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,640评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,731评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,712评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,703评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,473评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,915评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,227评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,384评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,063评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,706评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,302评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,531评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,321评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,248评论 2 352

推荐阅读更多精彩内容

  • 并行流:把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流。 先做一个简单的测试,测试传统for循环,...
    墙角的牵牛花阅读 267评论 0 0
  • 并行流 parallel() 如果每次应用函数都要依赖前一次应用的结果,并行只会比顺序处理增加开销。错用并行流的首...
    上海马超23阅读 559评论 0 0
  • 绪论 之前的几章中,我们已经看到了新的Stream接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内...
    浔它芉咟渡阅读 3,227评论 0 2
  • Java8 in action 没有共享的可变数据,将方法和函数即代码传递给其他方法的能力就是我们平常所说的函数式...
    铁牛很铁阅读 1,227评论 1 2
  • 素弦管笛起秋风。 梦巫山,仙子青葱。 舒袖展柔情, 飞天瞬若轻鸿。 肠声断,蹙泪微红。 离别去,非是箫郎弄曲, 月...
    断红尘阅读 152评论 0 0