TODO
持续补充中...
前言
以下内容都是基于JDK1.8介绍
什么是ForkJoin(FJ)
ForkJoinPool 是 JDK7 引入的,由 Doug Lea 编写的高性能线程池。
核心
- 分治算法(Divide-and-Conquer):
把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。

图片来源:https://www.jianshu.com/p/32a15ef2f1bf
- work-stealing(工作窃取)算法:
线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。
以下翻译自 A Java Fork/Join Framework (Doug Lea)
fork/join框架的核心在于它的轻量级调度机制。FJTask采用了Cilk的work - stealing调度器中首创的基本策略 :
- 每个工作线程在自己的线程中维护可运行的任务调度队列。
- 队列被维护为双端队列(deques),支持LIFO,push、pop操作,以及FIFO的poll/take
- 工作线程(FJThread)运行的任务中生成子任务被push自己的deques上。
- 工作线程默认情况下以LIFO的模式处理自己的deque,通过pop获取任务。
- 当工作线程没有本地任务要运行时,它会尝试,从随机选择的其他队列那里,使用FIFO(最早的优先)规则偷取一个任务,。
- 当一个工作线程遇到一个join操作时,它处理其他任务(如果可用),直到目标任务完成。否则,所有任务都会在没有阻塞的情况下运行完成。 TODO
- 当一个工作线程没有任务并且偷取不到任何任务时,对于其他类型,它会back off(通过yield、sleep和/或优先级调节),并稍后再次尝试,除非所有
的worker都空闲,在这种情况下,所有的worker都block,直到从顶层调用另一个任务。

图片来源:A Java Fork/Join Framework (Doug Lea)
核心类介绍
- ForkJoinPool 这是线程池的核心类,也是提供方法供我们使用的入口类。基本上forkJoin的核心操作及线程池的管理方法都由这个类提供。

ForkJoinPool.WorkQueue 这是ForkJoinPool类的内部类。也是线程池核心的组成部分。ForkJoinPool线程池将由WorkQueue数组组成。为了进一步提高性能,与ThreadPoolExecutor不一样的是,这没有采用外部传入的任务队列,而是作者自己实现了一个阻塞队列。奇数位是带工作线程的存放fork出的子任务的队列,偶数队列存放的是外部提交的任务。
ForkJoinWorkerThread 线程池中运行的thread也是作者重新定义的。这个类的目的是在于将外部的各种形式的task都转换为统一的ForkJoinTask格式。
ForkJoinTask 这是ForkJoinPool支持运行的task抽象类,我们一般使用其子类如RecursiveTask(有返回值)或者RecursiveAction(无返回值)。
https://blog.51cto.com/u_14014612/6031659
Fork-join任务运行机制

workequeues为什么区分奇偶slot
外部提交任务放在pool.workequeues偶数slot
内部提交任务放在pool.workequeues奇数slot
在我看来是一般外部提交的任务初始化,即递归起始点,会是一个大任务(相对于fork之后的小任务),FJWorkerThread会把fork之后的小任务放在自己队列的top(即工作线程自己的队列都是相对于外部提交任务而言的小任务),在FJWorkerThread启动之后是通过scan即窃取其他队列的任务,通过poll即使用FIFO的方式窃取base位,即“大任务”开始,相当于“广度优先”,而FJWorker是通过LIFO方式pop本地队列,类似与一种“深度优先”的方式,两者结合有助于发挥多线程优势。
work-steal的优势
通过
List<Future<?>> list = new ArrayList<>();
long start = System.currentTimeMillis();
for (int j = 1; j <= 2; j++) {
int i = j;
Future<?> submit = threadPool.submit(() -> {
System.out.println(Thread.currentThread().getName() + ", level1 task " + i);
Future innerTask = threadPool.submit(() ->
System.out.println(Thread.currentThread().getName() + ", level2 task" + i));
try {
innerTask.get();
} catch (Exception e) {
e.printStackTrace();
}
});
list.add(submit);
}
Future[] outerTasks = list.toArray(new Future[0]);
System.out.println("waiting...");
try {
for (Future outerTask : outerTasks) {
outerTask.get();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("done");
System.out.println("cost:" + (System.currentTimeMillis() - start));
分别使用
//会死锁
testThreadPool(new ThreadPoolExecutor(2,2,10,TimeUnit.DAYS,new ArrayBlockingQueue<>(20)));
testThreadPool(Executors.newFixedThreadPool(2));
//下面两个一样,都是使用ForkJoin进行工作窃取
testThreadPool(new ForkJoinPool(2));
testThreadPool(Executors.newWorkStealingPool(2));
- 首先提交的两个任务把线程池中的两个线程都占满了,而它们又分别提交了子任务,并等待子任务完成才退出
子任务在工作队列中等待线程池中释放出空闲线程来执行,这是不可能的,所以两边互相等待,死锁了 - ForkJoinPool 与普通线程池的主要区别是它实现了工作窃取算法。明显的内部区别是:
- 普通线程池所有线程共享一个工作队列,有空闲线程时工作队列中的任务才能得到执行
- ForkJoinPool 中的每个线程有自己独立的工作队列,每个工作线程运行中产生新的任务,放在队尾,某个工作线程会尝试窃取别个工作线程队列中的任务,从队列头部窃取,遇到 join() 时,如前面的 future.get(),如果 join 的任务尚未完成,则可先处理其他任务,这就是 ForkJoinPool 不会像普通线程池那样被死锁的秘诀。
https://blog.csdn.net/weixin_42593549/article/details/114613629
FJ执行图示
public class AddNumTask extends RecursiveTask<Integer> {
private List<Integer> ints;
public AddNumTask(List<Integer> ints) {
this.ints = ints;
}
@Override
protected Integer compute() {
if (ints.size() <= 2) {
int sum = 0;
for (int in : ints) {
sum += in;
}
return sum;
} else {
AddNumTask task1 = new AddNumTask(ints.subList(0, ints.size() / 2));
AddNumTask task2 = new AddNumTask(ints.subList(ints.size() / 2, ints.size()));
invokeAll(task1,task2);
Integer join = task1.join();
join += task2.join();
return join;
}
}
}
public class ForkJoinMain {
public static void main(String[] args) {
List<Integer> ints = new ArrayList<>();
for (int i = 0; i <= 10; i++) {
ints.add(i);
}
ForkJoinPool pool = ForkJoinPool.commonPool();
ForkJoinTask<Integer> submit = pool.submit(new AddNumTask(ints));
try {
Integer integer = submit.get();
System.out.println(integer);
} catch (Exception e) {
e.printStackTrace();
}
}
- compute方法执行对半切为两个小task,t1(0-5),t2(6-10)后,使用invokeAll(t1,t2)的执行流程。
下去简要描述了在并发数2的情况下的FJ的运行情况

- 首先在 pool.submit(new AddNumTask(ints))之后,把任务0-10放在了外部提交队列,create了worker1
- worker1获取到任务0-10,执行compute,切分为t1(0-5),t2(6-10)后调用invokeAll
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
int s1, s2;
t2.fork();
if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
t1.reportException(s1);
if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
t2.reportException(s2);
}
- 首先t2.fork,把t2放到当前worker(FJWorkerThread)的top
- t1.doInvoke直接执行,递归处理compute方法
FJ与普通线程池效果对别
public class Test {
private static ExecutorService executorService = new ThreadPoolExecutor(20, 20, 120, TimeUnit.SECONDS, new LinkedBlockingQueue<>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) {
int count = 0;
int nums = 100;
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "0");
ForkJoinPool.commonPool();
for(int i = 0;i < nums;i++){
long start = System.currentTimeMillis();
testFJ(3000000);
// testThreadPool(3000000);
count += (System.currentTimeMillis() - start);
}
System.out.println(nums + "次,平均耗时:" + (count / nums) + "毫秒");
Scanner scanner = new Scanner(System.in);
scanner.nextLine();
}
//测试forkjoin
private static void testFJ(int size){
List<Integer> ints = new ArrayList<>();
for (int i = 0; i < size; i++) {
ints.add(i);
}
ForkJoinPool pool = ForkJoinPool.commonPool();
ForkJoinTask<Long> submit = pool.submit(new InvokeInterfaceTask(ints));
try {
Long aLong = submit.get();
System.out.println("count:" + aLong);
} catch (Exception e) {
e.printStackTrace();
}
}
//测试线程池
private static void testThreadPool(int size){
CountDownLatch latch = new CountDownLatch(size);
AtomicLong count = new AtomicLong();
for(int i = 0; i< size;i++){
executorService.execute(()->{
try{
TestTools.testMethod();
}finally{
latch.countDown();
}
});
}
try {
latch.await();
// System.out.println("count:" + count);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public class TestTools {
public static long testMethod(){
// try {
// Thread.sleep(1);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
return 1L<<12 & 0xfff | 1 + 1;
}
}
public class InvokeInterfaceTask extends RecursiveTask<Long> {
private List<Integer> ints;
public InvokeInterfaceTask(List<Integer> ints) {
this.ints = ints;
}
@Override
protected Long compute() {
// System.out.println(Thread.currentThread().getName() + "," + ints.get(0) + "," + ints.get(ints.size() - 1));
if (ints.size() <= 1) {
long sum = 0;
sum = TestTools.testMethod(enterpriseDictService);
return sum;
} else {
InvokeInterfaceTask task1 = new InvokeInterfaceTask(ints.subList(0, ints.size() / 2));
InvokeInterfaceTask task2 = new InvokeInterfaceTask(ints.subList(ints.size() / 2, ints.size()));
// System.out.println(Thread.currentThread().getName() + "---invokeAll begin");
invokeAll(task1, task2);
// System.out.println(Thread.currentThread().getName() + "---invokeAll end");
Long join = task1.join();
// System.out.println(Thread.currentThread().getName() + "---task1.join");
join += task2.join();
// System.out.println(Thread.currentThread().getName() + "---task2.join");
return join;
}
}
}
上述代码是测试在size数目(最细任务,因为FJ的任务肯定大于size,ThreadPool的任务=size)时,运行nums次,求平均耗时。
本机FJ的并发数为7
如下,random 10表示testMethod中使用了new Random.nextInt(10),随机休眠,模拟接口耗时

- 1.把ThreadPool的core和max线程数设置为7,队列长度2000时,运行结果二者耗时基本相等
- 2.把ThreadPool的core和max线程数设置为7,队列长度3000000时,二者并发线程数相等的情况下,FJ的耗时明显低于ThreadPool,即便增加ThreadPool的线程数,队列长度,耗时依然没有明显降低甚至增加。
总结
- 1.FJ对于“很多”的小任务处理优势是明显的。
- 2.FJ对于递归处理,执行结果进行join的支持友好,不必需要额外代码进行汇总结果,也无需特别注意并发问题
- 3.FJ是对于一般线程池的补充,对于多线程情况多提供了一种解决方案的选择
普通线程池使用问题
参考
Java并发编程——ForkJoinPool之外部提交及worker执行过程
https://blog.51cto.com/u_14014612/6031659
JUC源码分析-线程池篇(五):ForkJoinPool - 2
https://www.jianshu.com/p/6a14d0b54b8d
扩展阅读
CAS原理解析
https://blog.csdn.net/yyqhwr/article/details/106965444
volatile
https://www.cnblogs.com/dolphin0520/p/3920373.html
wait/sleep
https://blog.csdn.net/qq_45731021/article/details/116502429
Java线程池实现原理及其在美团业务中的实践
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html
ThreadLocalRandom.getProbe() 线程的探针哈希值
线程的探针哈希值说明
使用 ThreadLocalRandom.getProbe() 得到线程的探针哈希值,探针哈希值和 map 里使用的哈希值的区别是,当线程发生数组元素争用后,可以改变线程的探针哈希值,让线程去使用另一个数组元素,而 map 中 key 对象的哈希值,由于有定位 value 的需求,所以它是一定不能变的。
private static final long PROBE
= U.objectFieldOffset(Thread.class, "threadLocalRandomProbe");
//实际是获取的 Thread中的 threadLocalRandomProbe 属性
//可以看出这个属性是静态的,即类级别的,所有对象共用
static final int getProbe() {
return U.getInt(Thread.currentThread(), PROBE);
}
//更新线程探针哈希值
static final int advanceProbe(int probe) {
probe ^= probe << 13; // xorshift
probe ^= probe >>> 17;
probe ^= probe << 5;
U.putInt(Thread.currentThread(), PROBE, probe);
return probe;
}
守护线程
在Java中有两类线程:用户线程 (User Thread)、守护线程 (Daemon Thread)。
所谓守护 线程,是指在程序运行的时候在后台提供一种通用服务的线程,比如垃圾回收线程就是一个很称职的守护者,并且这种线程并不属于程序中不可或缺的部分。因此,当所有的非守护线程结束时,程序也就终止了,同时会杀死进程中的所有守护线程。反过来说,只要任何非守护线程还在运行,程序就不会终止。
用户线程和守护线程两者几乎没有区别,唯一的不同之处就在于虚拟机的离开:如果用户线程已经全部退出运行了,只剩下守护线程存在了,虚拟机也就退出了。 因为没有了被守护者,守护线程也就没有工作可做了,也就没有继续运行程序的必要了。
将线程转换为守护线程可以通过调用Thread对象的setDaemon(true)方法来实现。在使用守护线程时需要注意一下几点:
(1) thread.setDaemon(true)必须在thread.start()之前设置,否则会跑出一个IllegalThreadStateException异常。你不能把正在运行的常规线程设置为守护线程。
(2) 在Daemon线程中产生的新线程也是Daemon的。
(3) 守护线程应该永远不去访问固有资源,如文件、数据库,因为它会在任何时候甚至在一个操作的中间发生中断。
>> 是 有符号的 右移 操作符。
符号为正,高位插入 0
符号为负,高位插入 1
>>> 是 无符号的 右移 操作符。
不管符号为啥,高位插入0
负数左右移
https://blog.csdn.net/qq_41675265/article/details/126002069