Java并行计算框架之ForkJoin

使用线程池框架可以大大提高资源利用率,做到任务的提交与执行分离,每一个worker线程执行完自己的任务就算结束了,这里面有一种情况,假如某一个线程执行的特别慢,因为它内部又不断的生成新的task,所以任务比较大,在这种情况下就算其他线程池里面有空闲的线程也帮不上忙,在这类问题下TheadPoolEexcutor处理起来就比较乏力,所以在Java7之后,新增了一种并行计算框架ForkJoinPool,当然这个类也归属Java线程池里面的一个实现,作为ForkJoinPool不会为每个子任务创建单独的线程。相反,池中的每个线程都有自己的双端队列(或deque,发音甲板),用于存储任务。
TheadPoolEexcutor一个有力补充,其在解决分治问题场景下的效率比较好,因为这个框架采用了work-steal算法,可以充分利用线程池里面的cpu资源。

什么是ForkJoin

ForkJoin框架的核心思想分治思想,对于非常大的问题,把它的切分成多个小问题,然后在多个小问题里面继续切分成更小的部分,递归切分直到问题的粒度足够小,可以被直接处理。这种切分任务的操作我们就叫fork,然后对于这些小问题因为是可以并行运行在多核处理上,所以我们需要联合这些问题的结果从而得到最终的计算结果,这种方式我们叫做join,这就是Fork/Join的名字含义。


采用伪代码来描述这一过程如下:


ForkJoin的API介绍

在Java的api里面ForkJoinPool线程池是ForkJoin框架的核心,这个线程池首先维护了一个总的任务队列,每个线程会从总的任务队列里面获取任务执行,得到自己的任务之后,由该任务产生的子任务会放在自己线程维护的一个双端队列里面,然后从头部获取任务依次处理,直到完毕,此时如果别的线程处理完了自己队列的任务,那么会帮着还没有处理完任务的线程处理任务,具体是从尾部读取任务,依次处理,这样就达到了work-steal的理念,从而有效的利用了cpu资源。

work stealing算法的优点:
利用了线程进行并行计算,减少了线程间的竞争。

work stealing算法的缺点:
如果双端队列中只有一个任务时,线程间会存在竞争。
额外的开销,例如双端队列

ForkJoin框架的api支持:

  • ForkJoinPool: 管理ForkJoin框架里面线程的执行
  • ForkJoinTask:一个抽象的类定义了task在ForkJoinPool里面的运行
  • RecursiveAction: ForkJoinTask的子类运行一个没有返回值的任务
  • RecursiveTask: ForkJoinTask的子类运行一个有返回值的任务
    其中ForkJoinTask类代表了抽象的切成的任务,而不是实际的执行的线程,
    这种机制允许一小部分数量的线程去管理执行一大部分任务。

ForkJoinTask的关键方法有:

 final   ForkJoinTask < V >  fork () 
 final  V join ()
 final  V invoke ()

fork方法提交一个任务异步的去执行,这个方法返回this会持续的调用线程去运行。
join方法,会等待直到任务结束并返回其结果,对于没有返回值的任务返回的值Void类型。
invoke方法,包装了fork和join方法,我们只需要调用invoke方法,它就会启动任务然后返回结果。
额外的这里面还有两个静态方法:

static   void  invokeAll ( ForkJoinTask <?>  task1 ,   ForkJoinTask <?>  task2 ):  static   void  invokeAll ( ForkJoinTask <?>…  taskList )

前者执行两个任务,后者执行一个任务列表。
ForkJoinTask还有一个compute方法,这个方法用来编写主要的计算逻辑:
ForkJoinPool类主要有两个提交任务的方法:
第一个是invoke会同步等待直到所有的任务返回并得到执行结果。
第二个是异步的提交使用execute方法,对于结果的获取需要调用任务的get方法来阻塞获取。或者使用get超时方法来轮询获取结果。
最后值得一提的是,对于ForkJoinPool方法,我们不需要显式的调用shutdown来关闭,其使用的是守护线程,只要所有的线程运行完毕,线程池会自动退出。

ForkJoinPool组成类:

ForkJoinPool

充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。

ForkJoinWorkerThread

fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。

ForkJoinPool.WorkQueue

双端队列就是它,它负责存储接收的任务。
这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

 Mode bits for ForkJoinPool.config and WorkQueue.config
    static final int MODE_MASK    = 0xffff << 16;  // top half of int
    static final int LIFO_QUEUE   = 0;
    static final int FIFO_QUEUE   = 1 << 16;
    static final int SHARED_QUEUE = 1 << 31;       // must be negative

控制是FIFO还是LIFO

        /**
         * Takes next task, if one exists, in order specified by mode.
         */
        final ForkJoinTask<?> nextLocalTask() {
            return (config & FIFO_QUEUE) == 0 ? pop() : poll();
        }
  • ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。

配置参数

通过代码指定,必须得在commonPool初始化之前(parallel的stream被调用之前,一般可在系统启动后设置)注入进去,否则无法生效。
通过启动参数指定无此限制,较为安全

  • parallelism(即配置线程池个数)
    可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.
    static final int MAX_CAP = 0x7fff; //32767
    如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1.
    代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)

System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

或者参数指定

-Djava.util.concurrent.ForkJoinPool.common.parallelism=8

Fork Join注意的点

protected Long compute() {
    if (任务足够小?) {
        return computeDirect();
    }
    // 任务太大,一分为二:
    SumTask subtask1 = new SumTask(...);
    SumTask subtask2 = new SumTask(...);
    // 分别对子任务调用fork():
    subtask1.fork();
    subtask2.fork();
    // 合并结果:
    Long subresult1 = subtask1.join();
    Long subresult2 = subtask2.join();
    return subresult1 + subresult2;
}


很遗憾,这种写法是错!误!的!这样写没有正确理解Fork/Join模型的任务执行逻辑。

JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。对400个元素的数组求和,执行时间应该为1秒。但是,换成上面的代码,执行时间却是两秒。

这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程,导致了4个子任务至少需要7个线程才能并发执行。

正确的写法:

SumTask subtask1 = new SumTask(this.array, start, middle);
        SumTask subtask2 = new SumTask(this.array, middle, end);
        invokeAll(subtask1, subtask2);
        Long subresult1 = subtask1.join();
        Long subresult2 = subtask2.join();
        Long result = subresult1 + subresult2;

其实,我们查看JDK的invokeAll()方法的源码就可以发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。

https://www.liaoxuefeng.com/article/1146802219354112

例子

下面的代码是测试forkjoin和for循环的耗时。
测试结果:首次forkjoin的耗时会远远比for循环的耗时高,这是因为首次需要去初始化资源,比如创建线程池等。所以耗时会远远比for循环高,但是第二次的耗时会比for循环少很多,这是因为所需要的资源都已经创建好了,所以耗时会很低。
结论:forkjoin首次运行会需要创建线程池等资源所以会比实际运行任务的时间会高一些,接下里在使用forkjoin运行任务就会和运行任务本身耗时差不多。

public static void main(String[] args) {

        //测试forkjoin耗时
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 8; i++) {
            list.add(i);
        }
        long s = System.currentTimeMillis();
        list.parallelStream().forEach(i -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        long e = System.currentTimeMillis();
        System.out.println("forkjoin:" + (e - s));

        //测试for循环耗时
        s = System.currentTimeMillis();
        for (int i : list) {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
        }
        e = System.currentTimeMillis();
        System.out.println("for:" + (e - s));

        //再次测试forkjoin耗时
        s = System.currentTimeMillis();
        list.parallelStream().forEach(i -> {
            try {
                Thread.sleep(10);
            } catch (InterruptedException e2) {
                e2.printStackTrace();
            }
        });
        e = System.currentTimeMillis();
        System.out.println("forkjoin:" + (e - s));
    }

结果

forkjoin:81
for:69
forkjoin:12

上面的代码有一个细节要注意的是,由于我本机的机器是8核,通过以下代码可以获取forkjoin默认启动线程池大小:

ForkJoinPool.getCommonPoolParallelism();

可以得出结果是7,也就是可以同时运行7个任务。
但是我上面的循环次数确实8,也就是有8个任务同时运行,按理来说任务耗时要大于20ms,可实际打印结果确实12ms。这是为什么呢?原因是因为main线程也会运行任务,因为最开始分解任务的线程是main线程,main线程分解任务后自己也会运行任务,这样就不会白白浪费一个线程,所以这也就是为什么 ForkJoinPool.getCommonPoolParallelism();得到可以同时并行运行任务的是 = 本机CPU核数-1 的原因,这是因为main线程已经占用了一个线程运行任务。

我把上面的循环次数改为9,然后再次运行,得到结果:

forkjoin:116
for:98
forkjoin:23

可以看到第二次forkjoin耗费了23s。因为我的CPU是8核,每次最多可以运行8个任务,这次有9个任务,所以只能等其他8个任务运行完了之后,第9个任务在运行,所以耗时会是23ms。

参考

https://mp.weixin.qq.com/s/yvmWS4cCwTPzyEB3AxK5MQ

https://kaimingwan.com/post/java/forkjoinpooljie-du

https://segmentfault.com/a/1190000008470012

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容