什么是Fork/Join框架
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
- Fork就是把一个大任务切分为若干子任务并行的执行。
- Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。
工作窃取算法
就是把一个大任务分成若干小任务,放到若干队列,一个队列对应一个线程。有的线程先把一个队列中的任务执行完了,那么这个线程从另一个队列中取任务来执行。
两个线程从一个队列中取任务来执行,那么存在竞争关系,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join框架的介绍
- 分割任务:一个fork类来把大任务分割成子任务,可能子任务还比较大,所以需要不同分割,直到分割的任务足够小。
- 合并结果:子任务执行的结果都放在一个队列中,启动一个线程从队列里拿数据然后合并数据。
怎么实现:
- 首先创建ForkJoin任务。不需要直接继承ForkJoinTask类,只需要继承它的子类。
- RecursiveAction:用于没有返回结果的任务
- RecursiveTask:用于有返回结果的任务
- ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,会随机从其他工作线程的队列尾部获取一个任务。
使用Fork/Join框架
/**
* ClassName: CountTask
* @Description: 计算1+2+3+4+5的结果。
* @author Panyk
* @date 2016年8月9日
*/
package com.forkjoin;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;//阈值
private int start;
private int end;
public CountTask(int start, int end) {
super();
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小,那么执行计算任务
boolean canCompute = (end-start)<THRESHOLD;
if(canCompute){
System.out.println(start + "--" + end);
for(int i=start; i<=end; i++){
sum+=i;
}
}else{
//如果任务大于阈值,分裂成两个子任务计算
int middle = (start + end)/2;
CountTask leftTask = new CountTask(start,middle);
CountTask rightTask = new CountTask(middle+1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
//等待子任务执行完,得到结果
int leftRes = leftTask.join();
int rightRes = rightTask.join();
//合并子任务
sum = leftRes + rightRes;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
//生成一个计算任务,计算1加到4
CountTask task = new CountTask(1,5);
//执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
if(task.isCompletedAbnormally()) {//是否有异常
System.out.println(task.getException());
}
try{
System.out.println(result.get());
}catch(Exception e){
e.printStackTrace();
}
}
}
Fork/Join框架的异常处理
ForkJoinTask在执行的时候可能会抛出异常,但是我们没办法在主线程里直接捕获异常,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。
getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。