java concurrent 之 ForkJoinPool

java concurrent 之 ForkJoinPool

ForkJoinPool在Java 7中被引入。ForkJoinPool类似于Java ExecutorService,但有一个区别。 ForkJoinPool可以轻松将任务分解成较小的任务,然后将其提交给ForkJoinPool。 任务可以将其工作分成较小的子任务,只要它能够分解任务即可。 它可能听起来有点抽象,所以在这个fork和join教程中,我将解释ForkJoinPool如何工作,以及分裂任务如何工作。

解释Fork和Join

在我们看看ForkJoinPool之前,我想解释一下fork和join的原理。

Fork和Join原则由递归执行的两个步骤组成。 这两个步骤是fork步骤和join步骤。

Fork

使用fork和join原理的任务可以将(自己)分割成更小的子任务

image

通过将其自身分解为子任务,每个子任务可以由不同的CPU或同一CPU上的不同线程并行执行。

如果任务给出的工作足够大,任务只会分解成子任务,这样才有意义。 将任务分解为子任务有一个开销,因此对于少量工作,此开销可能大于通过并发执行子任务而实现的加速。

将任务分解为子任务的时间限制也称为阈值。 每个任务都由决定一个明智的门槛决定。 这在很大程度上取决于正在做的工作。

其实阈值的问题就是在递归算法中的退出条件相似

Join

当一个任务已经分裂成子任务时,任务等待直到子任务完成执行。

子任务完成执行后,任务可以将所有结果加入(合并)为一个结果。 如下图所示:

image

ForkJoinPool

ForkJoinPool是一个特殊的线程池,旨在使用fork-and-join任务拆分工作。 ForkJoinPool位于java.util.concurrent包中,因此完整的类名称为java.util.concurrent.ForkJoinPool。

创建ForkJoinPool


ForkJoinPool forkJoinPool = new ForkJoinPool(4);

ForkJoinTask任务分为两类

  • RecursiveAction 用于没有返回结果的任务。

  • RecursiveTask 用于有返回结果的任务

ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。

使用Fork/Join框架 Demo

package com.viashare.forkjoin;

import java.util.concurrent.*;

/**
 * Created by Jeffy on 16/01/12.
 */
public class ForkJoinMain {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> future = forkJoinPool.submit(new CountTask(1, 5));
        System.err.println(future.get());

    }

    static class CountTask extends RecursiveTask<Integer> {

        private static final int Threshold = 3;

        private int start;

        private int end;

        public CountTask(int start, int end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Integer compute() {
            int sum = 0;
            int temp = end - start;
            System.err.println(temp);
            if (temp <= Threshold) {
                for (int i = start; i <=end; i++) {
                    sum += i;
                }
            } else {
                int millde = (end+start)/Threshold;
                CountTask task1 = new CountTask(start, millde);
                CountTask task2 = new CountTask(millde+1, end);
                task1.fork();
                task2.fork();

                int sum1 = task1.join();
                int sum2 = task2.join();
                System.err.println("sum1  "+sum1);
                System.err.println("sum2  "+sum2);
                sum = sum1 + sum2;
            }

            return sum;
        }
    }
}


Demo2


package com.viashare.forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

/**
 * Created by Jeffy on 16/01/12.
 */
public class ForkJoinmain2 {

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool(4);
        MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);

        long mergedResult = forkJoinPool.invoke(myRecursiveTask);

        System.out.println("mergedResult = " + mergedResult);
    }

    static class MyRecursiveTask extends RecursiveTask<Long> {

        private long workLoad = 0;

        public MyRecursiveTask(long workLoad) {
            this.workLoad = workLoad;
        }

        protected Long compute() {
            //if work is above threshold, break tasks up into smaller tasks
            if (this.workLoad > 16) {
                System.out.println("Splitting workLoad : " + this.workLoad);
                List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
                subtasks.addAll(createSubtasks());

                for (MyRecursiveTask subtask : subtasks) {
                    subtask.fork();
                }

                long result = 0;
                for (MyRecursiveTask subtask : subtasks) {
                    result += subtask.join();
                }
                return result;

            } else {
                System.out.println("Doing workLoad myself: " + this.workLoad);
                return workLoad * 3;
            }
        }

        private List<MyRecursiveTask> createSubtasks() {
            List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();

            MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
            MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);

            subtasks.add(subtask1);
            subtasks.add(subtask2);

            return subtasks;
        }
    }
}

Fork/Join框架的异常处理

ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

Fork/Join框架的实现原理

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352

推荐阅读更多精彩内容

  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,676评论 0 44
  • 进程和线程 进程 所有运行中的任务通常对应一个进程,当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中...
    小徐andorid阅读 2,806评论 3 53
  • 鸡年第一场雪来得如此早,润雪兆丰年,颇有大吉的征兆。 韩愈的一首春雪,把那种二月雪的欣喜与惊奇刻画得妙...
    4af85c91ffc8阅读 223评论 0 1
  • 如果你已经很好的记下你的发现,以及对产生的想法进行批判性的思考,为了交流而进行的写作会变得更简单也会更享受。...
    张添雅阅读 265评论 0 1
  • 永远都不会对你说 我爱你 亦或者,我只说一次。 我不怕错过你 因为面对你,我从里到外满是羞涩 如果你懂我, 你知道...
    小小的田阅读 96评论 0 1