严格来说,Fork/join是并行而非并发的。之所以放到并发这块,是因为并发和并行大部分情况下是不需要程序员去关心的。大牛Linus Torvalds更是曾经讳言:
推崇并行只不过是浪费大家的时间,“并行更高效”这种理论纯属胡说八道。大容量缓存是高效的,如果缺少缓存,并行一些低等级微内核可以说是毫无意义,除下特定类型上大规模规则计算,比如图形处理。
没有人会回到过去,那些复杂的乱序运行内核不会消失。扩展不可能无休止的进行,人们需求更多的移动性,那些叫嚣扩展到上千核心的论调纯属扯淡,无需理会。
是有多么奇葩的思维才能幻想出这些神奇等等并行算法的用武之地?!
对于并行来说,唯一的用武之地就是图形计算和服务器端,而并行计算在这些领域确实也得到了大量的应用。但是没有任何疑问,并行在其他领域毫无用武之地。
所以,忘掉并行吧,它永远都不可能被大规模推广。
...
放弃吧。“并行就是未来”的说法纯属胡说八道。
我们还能说什么呢?深以为然。
回到主题,Fork/join采用了分治思想,其实分治对我们来说很熟悉了,大学算法课程总会接触到,比如分治排序:

分治
基于这个思想,我们先看下Fork/join的几个重要类。
-
ForkJoinPool :An ExecutorService for running ForkJoinTasks.执行
ForkJoinTasks的线程池。整个框架的主导者,它负责创建WorkQueue和ForkJoinWorkerThread,并进行任务分配。
构造方法源码:
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;//任务名称
this.factory = factory;//ForkJoinWorkerThread的创建工厂,所以可以自己实现?
this.ueh = handler;//异常处理器
//后面三个都是记录线程数和控制队列边界等信息的变量
this.config = (parallelism & SMASK) | mode;//并发数
long np = (long)(-parallelism);
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
另外还有个比较重要的变量runState: 用来标记当前线程池的运行状态,使用二进制表示。这就是值得我们学习借鉴的地方
-
WorkQueue:
ForkJoinPool的内部类,也就是存储任务的双端队列。
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
this.pool = pool;//当前队列属于哪个pool
this.owner = owner;//负责的WorkerThread是哪个,如果当前队列是共享的,设置为null
// 将索引放在队列数组的中间。INITIAL_QUEUE_CAPACITY 默认8K
base = top = INITIAL_QUEUE_CAPACITY >>> 1;
}
-
ForkJoinWorkerThread: 执行任务的线程。它从负责的
WorkQueue的top取出执行的ForkJoinTask进行处理(LIFO)。自己的队列empty的时候,会进行work stealing(任务窃取),去别的线程的地盘的base端偷取任务或者去shared queue去拿任务。目的是为了更高效地利用资源。
protected ForkJoinWorkerThread(ForkJoinPool pool) {
// Use a placeholder until a useful name can be set in registerWorker
super("aForkJoinWorkerThread");
this.pool = pool;//所属的线程池
//维护的任务队列,可以看到,每产生一个线程,就会有一个任务队列。
this.workQueue = pool.registerWorker(this);
}
-
ForkJoinTask: 存储在
WorkQueue里需要执行的任务。它有两个子类:RecursiveTask、RecursiveAction。区别在于Task任务是有返回值,Action则没有返回值。自定义任务时我们只需要继承二者其一,重写compute()方法即可。
我们用图来表示他们之间的关系:
Fork/Join
还有重要的几个方法,我们用示例来说明吧,如下为求和示例:
public class ForkJoinTest {
private static class SumTask extends RecursiveTask<Integer> {
static final int THRESHOLD = 10;
Integer[] array;
int start;
int end;
SumTask(Integer[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if (end - start <= THRESHOLD) {
// 原子任务,直接结算结果
Integer sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
System.out.println(String.format(Thread.currentThread().getName()+ "-->计算 %d~%d = %d", start, end, sum));
return sum;
}
//还需要再分
int middle = (end + start) / 2;
System.out.println(String.format(Thread.currentThread().getName()+ " -->拆分 %d~%d --> %d~%d, %d~%d", start, end, start, middle, middle, end));
SumTask left = new SumTask(this.array, start, middle);
SumTask right = new SumTask(this.array, middle, end);
left.fork();
right.fork();
//invokeAll(subtask1, subtask2);//事实上不用分别fork(),这种才是正确的写法
Integer subresult1 = left.join();
Integer subresult2 = right.join();
Integer result = subresult1 + subresult2;
return result;
}
}
public static void fillArray(Integer[] array) {
Random random = new Random();
for (int i = 0; i < array.length; i++) {
array[i] = i + 1;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
Integer[] array = new Integer[100];
fillArray(array);
ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发(行)数4
ForkJoinTask<Integer> task = new SumTask(array, 0, array.length);
long startTime = System.currentTimeMillis();
Integer result = fjp.invoke(task);
long endTime = System.currentTimeMillis();
System.out.println("最终结果: " + result + " 花费时间:" + (endTime - startTime) + " ms.");
fjp.shutdown();
}
}
结果如下:

结果
总共产生了3个线程执行这个任务,用了32ms,根据结果可以看出,fork()的时候,会先去找空闲线程(空闲
WorkQueue),没有的话才会创建线程。
我们看看程序中几个重要的方法:
1、fork()
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//如果当前线程是ForkJoinWorkerThread,则把任务加入到自身的workQueue中
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);//否则,重新创建一个WorkQueue并push进去
return this;
}
2、join()
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();//返回结果
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) ://等待任务完成
externalAwaitDone();
}
3、invoke()
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
//和fork()的else调用同样的方法,事实上看源码,
//这个方法出现的频率很高,作用是给当前线程创建对应的WorkQueue
externalPush(task);
return task.join();//实际上也是调用了join()方法。
}
4、invokeAll()
//可以看出,该方法会给把任务也分配给当前线程一份。不至于当前线程等待,造成资源浪费
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();//创建新的线程开执行任务
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
//t1调用此方法,给当前线程的WorkQueue添加该任务。
private int doInvoke() {
int s; Thread t; ForkJoinWorkerThread wt;
return (s = doExec()) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(wt = (ForkJoinWorkerThread)t).pool.
awaitJoin(wt.workQueue, this, 0L) :
externalAwaitDone();
}
以上。

祈祷
希望还有几天就到预产期的我媳妇和尚未出世的儿子都能健健康康。
