1. 概念
什么时候使用ForkJoinPool线程池
适合能够进行拆分再拆分的计算型(CPU密集型)任务。服务器拥有多CPU,多核,用以提高计算能力。
单核、单CPU,不建议使用该线程池,会带来额外的性能开销,反而比单线程的执行效率低。
为什么不使用ThreadPoolExecutor线程池

ForkJoinPool可以看做是一个并行线程池,即多个线程(线程数一般等于CPU核数),并行的处理一个大任务。
通常来说,ThreadPoolExecutor中每个任务都由单个的线程独立处理。但是若是一个非常耗时的大任务(比如大数组排序),就可能出现线程池中只有一个线程在处理这个大任务。而其他线程却空闲着。最主要的是,若是多核服务器,只有一核在疯狂计算,其他处理器(CPU)无法援助繁忙的处理器(CPU)。
若是把这个大任务拆分成几个小任务,交由多个ThreadPoolExecutor的线程去处理,但是对于一个有明显父子关系的任务(比如大数组排序)来说,只有当它所有子任务执行完毕之后,它才能被执行。
使用ThreadPoolExecutor无法选择优先执行子任务。同时也无法开启太多的线程去处理子任务。
为什么ForkJoinPool适合CPU密集型任务,I/O密集型不可以吗?
通常来说,若是线程I/O阻塞,CPU会抛弃这个线程。而ForkJoinPool的宗旨是使用少量的线程来处理大量的任务。若是I/O密集型任务,该任务执行时间之缓慢便可想而知。
而CPU密集型任务,当一个大任务分解成多个子任务后,多个线程获取到多个处理器的时间分片,可以并行的执行子任务。
那说到底,什么是ForkJoinPool线程池?
该线程池的核心思想:分治法和工作窃取模式。
分治法:就是将一个任务,切分成多个父子关系的子任务。
工作窃取:若线程空闲,将窃取其他线程的任务。即多个线程并行执行任务。
来一个小故事:
若接到一个紧急大需求,在公司有4个人的情况下,不可能将任务只交由一个员工做,而其他三个员工处于空闲状态。并且不能随意中断他们,应该让他们几个专心的处理这件大任务。
当效率高的员工执行完手头的任务后,不应该闲着,应该帮忙去别的同事那领取一些任务。为了避免两个员工同时开发一个功能,公司规定,员工正常是在工作队列的尾部获取任务,而援助的员工,在工作队列的头部获取任务。
工作窃取的理论

ForkJoinPool 的每个工作线程(
ForkJoinWorkerThread下文简称worker都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。每个工作线程在运行中产生新的任务(通常是因为调用了
fork())时,会放入工作队列的队尾,并且会在队尾取出任务。(worker在处理自己的工作队列时,使用的是LIFO方式)。每个工作线程在处理自己的工作队列同时,会尝试
窃取 steal一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首(也就是说工作线程在窃取其他工作线程的任务时,使用的是FIFO方式)在遇到
join()时,如果需要 join 的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(通过isDone()方法),所有的任务都是无阻塞的完成。
在既没有自己的任务,也没有可以窃取的任务时,进入休眠。
2. 实践
ForkJoinPool线程池的创建
在JDK1.8中,推荐使用下面方法创建线程池,可以满足大多数场景。
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
但是我们通过静态方法创建的线程池,它内部的参数是怎样的?
commonPool源码解析:
static{
//...
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
//...
}
private static ForkJoinPool makeCommonPool() {
//...
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
本质上是通过makeCommonPool()方法创建的线程池。
最终调用
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
parallelism:(相似度
[ˈpærəlelɪzəm])默认为CPU核心数Runtime.getRuntime().availableProcessors(),ForkJoinPool里的线程数量依赖于它,但是它并不代表最大线程数。不要等同于ThreadPoolExecutor中的corePoolSize或者maximumPoolSize。ForkJoinWorkerThreadFactory:线程工厂,默认实现 DefaultForkJoinWorkerThreadFactory。
workerNamePrefix:线程池创建线程的前缀,默认使用“ForkJoinPool-*”
config:保存了不变的参数,包括了parallelism和mode,供后续读取。mode可取FIFO_QUEUE(先进先出队列)和LIFO_QUEUE(后进先出队列)。默认LIFO_QUEUE。
ForkJoinPool创建子任务
子任务由ForkJoinTask的实例来代表,它是一个抽象类,JDK为我们提供了两个实现:RecursiveTask和RecursiveAction [递归的] [rɪˈkɜːsɪv],分别用于需要和不需要返回计算结果的子任务。
/**
* @description: 计算从1加到100
* @create: 2019-08-29 14:10
*/
@Slf4j
public class CountTask extends RecursiveTask<Integer> {
//拆分任务的阈值
private static final int threshold = 2;
private int start;
private int end;
public CountTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//任务足够小,停止拆分,直接开始计算
if (end - start < threshold) {
for (int i = start; i < end + 1; i++) {
//开始执行业务逻辑,累加
sum = sum + i;
}
} else {
//如果任务大于阈值,就继续拆分
int middle = (start + end) / 2;
CountTask left = new CountTask(start, middle);
CountTask right = new CountTask(middle + 1, end);
//执行子任务
left.fork();
right.fork();
//若是不要求得到执行结果,可以无后续的join方法
//等待子任务执行完毕,并得到子任务结果
Integer leftResult = left.join();
Integer rightResult = right.join();
//子方法运行完毕后,父方法开始汇总
sum = leftResult + rightResult;
}
return sum;
}
}
ForkJoinPool提交任务
ForkJoinPool和ThreadPoolExecutor相似,也是提供了三类方法来调度子线程。
| 方法 | 描述 |
|---|---|
| execute系列 | 只提交任务 |
| invoke和invokeAll | 提交并返回结果 |
| submit系列 | 提交任务并返回任务 |
fork方法
源码
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
- 任务被fork()分割之后,判断当前线程是否是Worker线程,如果是,则将该任务放到自己的双端队列的尾部。
- 否则的话,会随机提交任务线程池中的某个Worker队列中。
join方法
源码:
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
- 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
- 查看任务的完成状态,如果已经完成,直接返回结果。
- 如果任务尚未完成,但处于自己的工作队列内,则调用
doExec()完成它。 -
如果任务不再当前队列的top位置(已经被其他的工作线程偷走)调用
wt.pool.awaitJoin(w, this, 0L),窃取这个小偷的工作队列内的任务(以 FIFO 方式)执行,以期帮助它早日完成预 join 的任务。 - 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 Join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
- 递归地执行第 5 步。
fork和join的调用顺序
官方文档建议的ForkJoinPool的调用顺序
a.fork();
b.fork();
b.join();
a.join();
因为任务b是后面进入的队列,并且worker的调用顺序是LIFO(后进先出),于是可以在fork()后直接调用join()就可以直接执行而不会调用ForkJoinPool.awaitJoin 方法去等待。
优化1:避免不必要的fork
原理:fork()的逻辑是:将子任务提交到工作队列,线程又从工作队列中获取。
- 而invokeAll()实际上,将N-1个任务提交到工作队列,然后留下一个交由线程直接执行。多个任务执行的时候推荐使用invokeAll()
int middle = (start + end) / 2;
CountTask left = new CountTask(start, middle);
CountTask right = new CountTask(middle + 1, end);
//等待子任务执行完毕,并得到子任务结果
invokeAll(left,right);
Integer leftResult = left.join();
Integer rightResult = right.join();
sum = leftResult + rightResult;
- 若分割为两个子任务时,亦可直接调用
compute()方法。这样的话,避免了工作线程刚放进队列,立马去队列中获取的尴尬处境。
//工作线程将任务放到队列中。(这里其实是让其他线程 偷取 任务,若无线程偷取,则工作线程计算完左边任务,可继续执行右边任务)
right.fork();
//工作线程执行左边的结果。
long leftAns = left.compute();
//工作线程等待右边任务的结果。
long rightAns = right.join();
return leftAns + rightAns;
join()和get()的区别

在
ForkJoinTask的invoke()、join()方法及其衍生方法中都没有像 get() 方法那样抛出 ExecutionException 的受检异常。
ForkJoinTask 中把受检异常转换成了运行时异常
static void rethrow(Throwable ex) {
if (ex != null)
ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}
@SuppressWarnings("unchecked")
static <T extends Throwable> void uncheckedThrow(Throwable t) throws T {
throw (T)t; // rely on vacuous cast
}
- invoke()、join() 仍可能会抛出运行时异常,所以 ForkJoinTask 还提供了两个不提取结果和异常的方法:
-
quietlyInvoke()
(注:执行此任务并等待其完成(如有必要),而不返回其结果或抛出异常。) -
quietlyJoin()
(注:加入此任务,而不返回其结果或抛出异常。 当某些已被取消或以其他方式已知中止时,处理任务集合时,此方法可能是有用的。)
-
quietlyInvoke()
注1:使用 quitelyInvoke() 和 quietlyJoin() 时可以配合 isCompletedAbnormally() 和 isCompletedNormally() 方法使用。
注2:quietly 平静的 [快特里]
案例:
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> task = forkJoinPool.submit(new CountTask(0, 100));
//关闭线程池
forkJoinPool.shutdown();
if(task.isCompletedAbnormally()){
System.out.println("异常"+task.getException());
}
ForkJoinPool阻塞
向ForkJoinPool提交了一堆任务之后,我们会希望等待所有任务执行完成后,继续下一步操作。ForkJoinPool提供了两个阻塞的await方法。
- awaitQuiescence:等待线程池静止;
- awaitTermination:等待线程池终止;
推荐阅读
【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理