java concurrent 之 ForkJoinPool
ForkJoinPool在Java 7中被引入。ForkJoinPool类似于Java ExecutorService,但有一个区别。 ForkJoinPool可以轻松将任务分解成较小的任务,然后将其提交给ForkJoinPool。 任务可以将其工作分成较小的子任务,只要它能够分解任务即可。 它可能听起来有点抽象,所以在这个fork和join教程中,我将解释ForkJoinPool如何工作,以及分裂任务如何工作。
解释Fork和Join
在我们看看ForkJoinPool之前,我想解释一下fork和join的原理。
Fork和Join原则由递归执行的两个步骤组成。 这两个步骤是fork步骤和join步骤。
Fork
使用fork和join原理的任务可以将(自己)分割成更小的子任务
通过将其自身分解为子任务,每个子任务可以由不同的CPU或同一CPU上的不同线程并行执行。
如果任务给出的工作足够大,任务只会分解成子任务,这样才有意义。 将任务分解为子任务有一个开销,因此对于少量工作,此开销可能大于通过并发执行子任务而实现的加速。
将任务分解为子任务的时间限制也称为阈值。 每个任务都由决定一个明智的门槛决定。 这在很大程度上取决于正在做的工作。
其实阈值的问题就是在递归算法中的退出条件相似
Join
当一个任务已经分裂成子任务时,任务等待直到子任务完成执行。
子任务完成执行后,任务可以将所有结果加入(合并)为一个结果。 如下图所示:
ForkJoinPool
ForkJoinPool是一个特殊的线程池,旨在使用fork-and-join任务拆分工作。 ForkJoinPool位于java.util.concurrent包中,因此完整的类名称为java.util.concurrent.ForkJoinPool。
创建ForkJoinPool
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
ForkJoinTask任务分为两类
RecursiveAction 用于没有返回结果的任务。
RecursiveTask 用于有返回结果的任务
ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
使用Fork/Join框架 Demo
package com.viashare.forkjoin;
import java.util.concurrent.*;
/**
* Created by Jeffy on 16/01/12.
*/
public class ForkJoinMain {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> future = forkJoinPool.submit(new CountTask(1, 5));
System.err.println(future.get());
}
static class CountTask extends RecursiveTask<Integer> {
private static final int Threshold = 3;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
int temp = end - start;
System.err.println(temp);
if (temp <= Threshold) {
for (int i = start; i <=end; i++) {
sum += i;
}
} else {
int millde = (end+start)/Threshold;
CountTask task1 = new CountTask(start, millde);
CountTask task2 = new CountTask(millde+1, end);
task1.fork();
task2.fork();
int sum1 = task1.join();
int sum2 = task2.join();
System.err.println("sum1 "+sum1);
System.err.println("sum2 "+sum2);
sum = sum1 + sum2;
}
return sum;
}
}
}
Demo2
package com.viashare.forkjoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
/**
* Created by Jeffy on 16/01/12.
*/
public class ForkJoinmain2 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
MyRecursiveTask myRecursiveTask = new MyRecursiveTask(128);
long mergedResult = forkJoinPool.invoke(myRecursiveTask);
System.out.println("mergedResult = " + mergedResult);
}
static class MyRecursiveTask extends RecursiveTask<Long> {
private long workLoad = 0;
public MyRecursiveTask(long workLoad) {
this.workLoad = workLoad;
}
protected Long compute() {
//if work is above threshold, break tasks up into smaller tasks
if (this.workLoad > 16) {
System.out.println("Splitting workLoad : " + this.workLoad);
List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
subtasks.addAll(createSubtasks());
for (MyRecursiveTask subtask : subtasks) {
subtask.fork();
}
long result = 0;
for (MyRecursiveTask subtask : subtasks) {
result += subtask.join();
}
return result;
} else {
System.out.println("Doing workLoad myself: " + this.workLoad);
return workLoad * 3;
}
}
private List<MyRecursiveTask> createSubtasks() {
List<MyRecursiveTask> subtasks = new ArrayList<MyRecursiveTask>();
MyRecursiveTask subtask1 = new MyRecursiveTask(this.workLoad / 2);
MyRecursiveTask subtask2 = new MyRecursiveTask(this.workLoad / 2);
subtasks.add(subtask1);
subtasks.add(subtask2);
return subtasks;
}
}
}
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。使用如下代码:
if(task.isCompletedAbnormally())
{
System.out.println(task.getException());
}
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。