Java 并发编程笔记:如何使用 ForkJoinPool 以及原理

转自 http://blog.dyngr.com/blog/2016/09/15/java-forkjoinpool-internals/

前言

Java 1.7 引入了一种新的并发框架—— Fork/Join Framework。

本文的主要目的是介绍 ForkJoinPool 的适用场景,实现原理,以及示例代码。

TLDR; 如果觉得文章太长的话,以下就是结论

ForkJoinPool 不是为了替代 ExecutorService,而是它的补充,在某些应用场景下性能比 ExecutorService 更好。(见 Java Tip: When to use ForkJoinPool vs ExecutorService )

ForkJoinPool 主要用于实现“分而治之”的算法,特别是分治之后递归调用的函数,例如 quick sort 等。

ForkJoinPool 最适合的是计算密集型的任务,如果存在 I/O,线程间同步,sleep() 等会造成线程长时间阻塞的情况时,最好配合使用 ManagedBlocker。

使用

首先介绍的是大家最关心的 Fork/Join Framework 的使用方法,如果对使用方法已经很熟悉的话,可以跳过这一节,直接阅读原理

用一个特别简单的求整数数组所有元素之和来作为我们现在需要解决的问题吧。

问题

计算1至1000的正整数之和。

解决方法

For-loop

最简单的,显然是不使用任何并行编程的手段,只用最直白的 for-loop 来实现。下面就是具体的实现代码。

不过为了便于横向对比,也为了让代码更加 Java Style,首先我们先定义一个 interface。

publicinterfaceCalculator{longsumUp(long[] numbers);}

这个 interface 非常简单,只有一个函数 sumUp,就是返回数组内所有元素的和。

再写一个 main 方法。

publicclassMain{publicstaticvoidmain(String[] args){long[] numbers = LongStream.rangeClosed(1,1000).toArray();        Calculator calculator =newMyCalculator();        System.out.println(calculator.sumUp(numbers));// 打印结果500500}}

接下来就是我们的 Plain Old For-loop Calculator,简称 POFLC 的实现了。(这其实是个段子,和主题完全无关,感兴趣的请见文末的彩蛋

publicclassForLoopCalculatorimplementsCalculator{publiclongsumUp(long[] numbers){longtotal =0;for(longi : numbers) {            total += i;        }returntotal;    }}

这段代码毫无出奇之处,也就不多解释了,直接跳入下一节——并行计算。

ExecutorService

在 Java 1.5 引入 ExecutorService 之后,基本上已经不推荐直接创建 Thread 对象,而是统一使用 ExecutorService。毕竟从接口的易用程度上来说 ExecutorService 就远胜于原始的 Thread,更不用提 java.util.concurrent 提供的数种线程池,Future 类,Lock 类等各种便利工具。

使用 ExecutorService 的实现

publicclassExecutorServiceCalculatorimplementsCalculator{privateintparallism;privateExecutorService pool;publicExecutorServiceCalculator(){        parallism = Runtime.getRuntime().availableProcessors();// CPU的核心数pool = Executors.newFixedThreadPool(parallism);    }privatestaticclassSumTaskimplementsCallable{privatelong[] numbers;privateintfrom;privateintto;publicSumTask(long[] numbers,intfrom,intto){this.numbers = numbers;this.from = from;this.to = to;        }@OverridepublicLongcall()throwsException{longtotal =0;for(inti = from; i <= to; i++) {                total += numbers[i];            }returntotal;        }    }@OverridepubliclongsumUp(long[] numbers){        List> results =newArrayList<>();// 把任务分解为 n 份,交给 n 个线程处理intpart = numbers.length / parallism;for(inti =0; i < parallism; i++) {intfrom = i * part;intto = (i == parallism -1) ? numbers.length -1: (i +1) * part -1;            results.add(pool.submit(newSumTask(numbers, from, to)));        }// 把每个线程的结果相加,得到最终结果longtotal =0L;for(Future f : results) {try{                total += f.get();            }catch(Exception ignore) {}        }returntotal;    }}

如果对 ExecutorService 不太熟悉的话,推荐阅读《七天七并发模型》的第二章,对 Java 的多线程编程基础讲解得比较清晰。当然著名的《Java并发编程实战》也是不可多得的好书。

ForkJoinPool

前面花了点时间讲解了 ForkJoinPool 之前的实现方法,主要为了在代码的编写难度上进行一下对比。现在就列出本篇文章的重点——ForkJoinPool 的实现方法。

publicclassForkJoinCalculatorimplementsCalculator{privateForkJoinPool pool;privatestaticclassSumTaskextendsRecursiveTask{privatelong[] numbers;privateintfrom;privateintto;publicSumTask(long[] numbers,intfrom,intto){this.numbers = numbers;this.from = from;this.to = to;        }@OverrideprotectedLongcompute(){// 当需要计算的数字小于6时,直接计算结果if(to - from <6) {longtotal =0;for(inti = from; i <= to; i++) {                    total += numbers[i];                }returntotal;// 否则,把任务一分为二,递归计算}else{intmiddle = (from + to) /2;                SumTask taskLeft =newSumTask(numbers, from, middle);                SumTask taskRight =newSumTask(numbers, middle+1, to);                taskLeft.fork();                taskRight.fork();returntaskLeft.join() + taskRight.join();            }        }    }publicForkJoinCalculator(){// 也可以使用公用的 ForkJoinPool:// pool = ForkJoinPool.commonPool()pool =newForkJoinPool();    }@OverridepubliclongsumUp(long[] numbers){returnpool.invoke(newSumTask(numbers,0, numbers.length-1));    }}

可以看出,使用了 ForkJoinPool 的实现逻辑全部集中在了 compute() 这个函数里,仅用了14行就实现了完整的计算过程。特别是,在这段代码里没有显式地“把任务分配给线程”,只是分解了任务,而把具体的任务到线程的映射交给了 ForkJoinPool 来完成。

原理

如果你除了 ForkJoinPool 的用法以外,对 ForkJoinPoll 的原理也感兴趣的话,那么请接着阅读这一节。在这一节中,我会结合 ForkJoinPool 的作者 Doug Lea 的论文——《A Java Fork/Join Framework》,尽可能通俗地解释 Fork/Join Framework 的原理。

我一直以为,要理解一样东西的原理,最好就是自己尝试着去实现一遍。根据上面的示例代码,可以看出 fork() 和 join() 是 Fork/Join Framework “魔法”的关键。我们可以根据函数名假设一下 fork() 和 join() 的作用:

fork():开启一个新线程(或是重用线程池内的空闲线程),将任务交给该线程处理。

join():等待该任务的处理线程处理完毕,获得返回值。

以上模型似乎可以(?)解释 ForkJoinPool 能够多线程执行的事实,但有一个很明显的问题

当任务分解得越来越细时,所需要的线程数就会越来越多,而且大部分线程处于等待状态。

但是如果我们在上面的示例代码加入以下代码

System.out.println(pool.getPoolSize());

这会显示当前线程池的大小,在我的机器上这个值是4,也就是说只有4个工作线程。甚至即使我们在初始化 pool 时指定所使用的线程数为1时,上述程序也没有任何问题——除了变成了一个串行程序以外。

publicForkJoinCalculator(){    pool =newForkJoinPool(1);}

这个矛盾可以导出,我们的假设是错误的,并不是每个 fork() 都会促成一个新线程被创建,而每个 join() 也不是一定会造成线程被阻塞。Fork/Join Framework 的实现算法并不是那么“显然”,而是一个更加复杂的算法——这个算法的名字就叫做 work stealing 算法。

work stealing 算法在 Doung Lea 的论文中有详细的描述,以下是我在结合 Java 1.8 代码的阅读以后——现有代码的实现有一部分相比于论文中的描述发生了变化——得到的相对通俗的解释:

基本思想

ForkJoinPool 的每个工作线程都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。

每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。

在遇到 join() 时,如果需要 join 的任务尚未完成,则会先处理其他任务,并等待其完成。

在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

下面来介绍一下关键的两个函数:fork() 和 join() 的实现细节,相比来说 fork() 比 join() 简单很多,所以先来介绍 fork()。

fork

fork() 做的工作只有一件事,既是把任务推入当前工作线程的工作队列里。可以参看以下的源代码:

publicfinalForkJoinTaskfork(){    Thread t;if((t = Thread.currentThread())instanceofForkJoinWorkerThread)        ((ForkJoinWorkerThread)t).workQueue.push(this);elseForkJoinPool.common.externalPush(this);returnthis;}

join

join() 的工作则复杂得多,也是 join() 可以使得线程免于被阻塞的原因——不像同名的 Thread.join()。

检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。

查看任务的完成状态,如果已经完成,直接返回结果。

如果任务尚未完成,但处于自己的工作队列内,则完成它。

如果任务已经被其他的工作线程偷走,则窃取这个小偷的工作队列内的任务(以 FIFO 方式),执行,以期帮助它早日完成欲 join 的任务。

如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 join 的任务时,则找到小偷的小偷,帮助它完成它的任务。

递归地执行第5步。

将上述流程画成序列图的话就是这个样子:

以上就是 fork() 和 join() 的原理,这可以解释 ForkJoinPool 在递归过程中的执行逻辑,但还有一个问题

最初的任务是 push 到哪个线程的工作队列里的?

这就涉及到 submit() 函数的实现方法了

submit

其实除了前面介绍过的每个工作线程自己拥有的工作队列以外,ForkJoinPool 自身也拥有工作队列,这些工作队列的作用是用来接收由外部线程(非 ForkJoinThread 线程)提交过来的任务,而这些工作队列被称为 submitting queue 。

submit() 和 fork() 其实没有本质区别,只是提交对象变成了 submitting queue 而已(还有一些同步,初始化的操作)。submitting queue 和其他 work queue 一样,是工作线程”窃取“的对象,因此当其中的任务被一个工作线程成功窃取时,就意味着提交的任务真正开始进入执行阶段。

总结

在了解了 Fork/Join Framework 的工作原理之后,相信很多使用上的注意事项就可以从原理中找到原因。例如:为什么在 ForkJoinTask 里最好不要存在 I/O 等会阻塞线程的行为?,这个我姑且留作思考题吧 :)

还有一些延伸阅读的内容,在此仅提及一下:

ForkJoinPool 有一个 Async Mode ,效果是工作线程在处理本地任务时也使用 FIFO 顺序。这种模式下的 ForkJoinPool 更接近于是一个消息队列,而不是用来处理递归式的任务。

在需要阻塞工作线程时,可以使用 ManagedBlocker。

Java 1.8 新增加的 CompletableFuture 类可以实现类似于 Javascript 的 promise-chain,内部就是使用 ForkJoinPool 来实现的。

彩蛋

之所以煞有介事地取名为 POFLC,显然是为了模仿 POJO 。而 POJO —— Plain Old Java Object 这个词是如何产生的,在 stackoverflow 上有个帖子讨论过,摘录一下就是

I’ve come to the conclusion that people forget about regular Java objects because they haven’t got a fancy name. That’s why, while preparing for a talk in 2000, Rebecca Parsons, Josh Mackenzie, and I gave them one: POJOs (plain old Java objects).

我得出一个结论:人们之所以总是忘记使用标准的 Java 对象是因为缺少一个足够装逼的名字(译注:类似于 Java Bean 这样的名字)。因此,在准备2000年的演讲时,Rebecca Parsons,Josh Mackenzie 和我给他们起了一个名字叫做 POJO (平淡无奇的 Java 对象)。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容