今天来介绍一下java并发框架之Fork/Join
初探Fork/Join:
- 分割任务
使用一个fork类来把大任务分割成子任务,当子任务还较大时,不停的分割,直到分割到子任务足够小。(类似于归并排序的分化阶段)
- 执行任务合并结果
分割后的子任务分别放在双端队列里,然后开启多线程分别从双端队列获取任务执行,子任务执行完的结果都放在一个队列里,然后启动一个线程从队列里获取数据,最后合并这些数据。(类似于归并排序的合并阶段)
- 举一个栗子
一个周末,哥们都来家里做客,我们一共七个人,而只有我会做饭,用了两个小时才做好了所有的饭菜,大家都快饿的半死才吃上了饭。
但是如果我把每一个菜的做法告诉其中两个人,而那两个人又去分别告诉两个人,这样他们四个人同时去做菜(假设我家比较富裕~有很多锅),我就可以去愉快的编程了,并且只用了二十分钟所有的菜就都做好了,最后大家很开心的坐在一起吃饭。
上面的两个小例子:当我把要做很多个菜的任务分进行分发的时候,就相当于我们今天要讲的Fork/Join框架。
- 下面通过一幅图来更加深刻的理解
看完上图是不是觉得更加像归并排序了呢?
当然与递归也是同一个道理,都是为了划分大问题。
- 实现原理:
ForkJoinTask的fork的实现原理
当我们调用fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步的执行任务,然后立即返回结果
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
push方法把当前任务存放在ForkJoinTask数组队列里,然后调用ForkJoinPool的signalWork()方法唤醒或创建一个工作线程来执行任务
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
ForkJoinTask的join方法的实现原理
调用join方法时阻塞当前线程并等待获取结果
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
通过调用doJoin方法来获取当前任务的状态
已完成:NORMAL,直接返回任务结果
被取消:CANCELLED,直接抛出CancellationException
信号:SIGNAL
出现异常:EXCEPTIONAL,抛出对应异常
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
在doJoin()方法中,首先查看任务是否已经完成,如果执行完成则直接返回状态,没有完成,则从任务数组中取出任务并执行,顺利完成,则返回NORMAL状态,出现异常则返回EXCEPTIONAL
Fork/Join的使用
当我们遇到一个大的耗时操作时,我们就可以对任务进行拆分,最后进行汇总,下面给出一个使用的Demo
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
/**
* ForkJoinDemo
*
* @author lirongqian
* @since 2018/02/08
*/
public class ForkJoinDemo extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2;
private int start;
private int end;
public ForkJoinDemo(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
try {
// 睡一秒来模拟耗时操作
TimeUnit.SECONDS.sleep(1);
sum += i;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
int mid = (start + end) / 2;
ForkJoin.Wait waitLeft = new ForkJoin.Wait(start, mid);
ForkJoin.Wait waitRight = new ForkJoin.Wait(mid + 1, end);
waitLeft.fork();
waitRight.fork();
int leftResult = waitLeft.join();
int rightResult = waitRight.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
int start = 1;
int end = 10;
long start2 = System.currentTimeMillis();
int sum = 0;
for (int i = start; i <= end; i++) {
try {
// 睡一秒来模拟耗时操作
TimeUnit.SECONDS.sleep(1);
sum += i;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(sum);
System.out.println("for time: " + (System.currentTimeMillis() - start2));
long start1 = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个任务
ForkJoinDemo task = new ForkJoinDemo(start, end);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
System.out.println("Fork/Join time: " + (System.currentTimeMillis() - start1));
} catch (Exception e) {
e.printStackTrace();
}
}
}
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
/**
* ForkJoin
*
* @author lirongqian
* @since 2018/02/08
*/
public class ForkJoin extends RecursiveTask<String> {
static final int THRESHOLD = 2;
private int start;
private int end;
public ForkJoin(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected String compute() {
String sum = "";
// 判断任务是否大于阈值
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum = sum + "ForkJoin: " + "第" + i + "次请求结果为:" + i + "\n";
}
} else {
// 大于阈值时,分裂成两个自任务进行计算
int mid = (start + end) / 2;
ForkJoin left = new ForkJoin(start, mid);
ForkJoin right = new ForkJoin(mid + 1, end);
// 执行两个自任务
left.fork();
right.fork();
// 获取子任务结果
String leftResult = left.join();
String rightResult = right.join();
// 合并结果
sum = leftResult + rightResult;
}
return sum;
}
static class Wait extends RecursiveTask<Integer> {
private int start;
private int end;
public Wait(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
try {
TimeUnit.SECONDS.sleep(1);
sum += i;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} else {
int mid = (start + end) / 2;
Wait waitLeft = new Wait(start, mid);
Wait waitRight = new Wait(mid + 1, end);
waitLeft.fork();
waitRight.fork();
int leftResult = waitLeft.join();
int rightResult = waitRight.join();
sum = leftResult + rightResult;
}
return sum;
}
}
}
执行结果:
55
for time: 10041
55
Fork/Join time: 3014
我们可以发现,使用Fork/Join大大缩短了任务执行的时间,是不是很牛逼?大家快去自己动手试一试吧。
编程的乐趣就在于接触到新事物时的眼前一亮