更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》
什么是 Fork/Join 框架
Fork/Join 框架是 Java7 提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
- Fork 就是把一个大任务切分为若干子任务并行的执行
- Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。
比如计算 1+2+...+10000
,可以分割成若干个子任务,每个子任务分别对 10 个数进行求和,最终汇总这若干个子任务的结果。
工作窃取算法
fork-join 最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的 CPU,那么如何利用好这个空闲的 CPU 就成了提高性能的关键。
fork-join 框架通过一种称作工作窃取(work stealing) 的技术减少了工作队列的争用情况。
每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的。当一个任务划分一个新线程时,它将自己推到 deque 的头部。当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成。当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。
可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。
因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用。
Fork/Join 框架的介绍
- 第一步分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
- 第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
Fork/Join 使用两个类来完成以上两件事情:
-
ForkJoinTask
:我们要使用 ForkJoin 框架,必须首先创建一个 ForkJoin 任务。它提供在任务中执行fork()
和join()
操作的机制,通常情况下我们不需要直接继承ForkJoinTask
类,而只需要继承它的子类,Fork/Join 框架提供了以下两个子类:-
RecursiveAction
:用于没有返回结果的任务。 -
RecursiveTask
:用于有返回结果的任务。
-
-
ForkJoinPool
:ForkJoinTask
需要通过ForkJoinPool
来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
示例:计算 1+2+...+10000
public class SumTask extends RecursiveTask<Integer> {
private static int THRESHOLD = 10;
private int start;
private int end;
public SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
if (end - start <= THRESHOLD) {
for (int i = start; i <= end; i++) {
sum = sum + i;
}
}
// 否则,分割成2个子任务的计算
else {
int middle = (start + end) / 2;
SumTask left = new SumTask(start, middle);
SumTask right = new SumTask(middle + 1, end);
// 执行子任务
left.fork();
right.fork();
// 等待子任务执行结束,获得结果
int leftResult = left.join();
int rightResult = right.join();
// 合并子任务的结果
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(1, 10000);
Future<Integer> future = pool.submit(task);
try {
System.out.println(future.get());
} catch (Exception e) {
}
}
}
Fork/Join框架的实现原理
ForkJoinPool
由 ForkJoinTask
数组和 ForkJoinWorkerThread
数组组成:
-
ForkJoinTask
数组负责存放程序提交给ForkJoinPool
的任务public abstract class ForkJoinTask<V> implements Future<V>, Serializable
-
ForkJoinWorkerThread
数组负责执行这些任务
当我们调用 ForkJoinTask
的 fork
方法时,程序会调用 ForkJoinWorkerThread
的 push
方法异步的执行这个任务,然后立即返回结果。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
push
方法把当前任务存放在 ForkJoinTask
数组 queue 里。然后再调用 ForkJoinPool
的 signalWork()
方法唤醒或创建一个工作线程来执行任务。
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
Fork/Join VS ThreadPoolExecutor
ForkJoin 同 ThreadPoolExecutor
一样,也实现了 Executor
和 ExecutorService
接口。
public class ForkJoinPool extends AbstractExecutorService
它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么当前计算机可用的 CPU 数量会被设置为线程数量作为默认值。
ForkJoinPool
主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。
当使用 ThreadPoolExecutor
时,使用分治法会存在问题,因为 ThreadPoolExecutor
中的线程无法向任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用 ForkJoinPool
时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。
使用 ForkJoinPool
能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用 4 个线程来完成超过 200 万个任务。但是,使用 ThreadPoolExecutor
时,是不可能完成的,因为 ThreadPoolExecutor
中的 Thread
无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。
引用:
聊聊并发(八)——Fork/Join框架介绍
应用 fork-join 框架
深入浅出parallelStream