使用线程池框架可以大大提高资源利用率,做到任务的提交与执行分离,每一个worker线程执行完自己的任务就算结束了,这里面有一种情况,假如某一个线程执行的特别慢,因为它内部又不断的生成新的task,所以任务比较大,在这种情况下就算其他线程池里面有空闲的线程也帮不上忙,在这类问题下TheadPoolEexcutor处理起来就比较乏力,所以在Java7之后,新增了一种并行计算框架ForkJoinPool,当然这个类也归属Java线程池里面的一个实现,作为ForkJoinPool不会为每个子任务创建单独的线程。相反,池中的每个线程都有自己的双端队列(或deque,发音甲板),用于存储任务。
TheadPoolEexcutor一个有力补充,其在解决分治问题场景下的效率比较好,因为这个框架采用了work-steal算法,可以充分利用线程池里面的cpu资源。
什么是ForkJoin
ForkJoin框架的核心思想分治思想,对于非常大的问题,把它的切分成多个小问题,然后在多个小问题里面继续切分成更小的部分,递归切分直到问题的粒度足够小,可以被直接处理。这种切分任务的操作我们就叫fork,然后对于这些小问题因为是可以并行运行在多核处理上,所以我们需要联合这些问题的结果从而得到最终的计算结果,这种方式我们叫做join,这就是Fork/Join的名字含义。
采用伪代码来描述这一过程如下:
ForkJoin的API介绍
在Java的api里面ForkJoinPool线程池是ForkJoin框架的核心,这个线程池首先维护了一个总的任务队列,每个线程会从总的任务队列里面获取任务执行,得到自己的任务之后,由该任务产生的子任务会放在自己线程维护的一个双端队列里面,然后从头部获取任务依次处理,直到完毕,此时如果别的线程处理完了自己队列的任务,那么会帮着还没有处理完任务的线程处理任务,具体是从尾部读取任务,依次处理,这样就达到了work-steal的理念,从而有效的利用了cpu资源。
work stealing算法的优点:
利用了线程进行并行计算,减少了线程间的竞争。
work stealing算法的缺点:
如果双端队列中只有一个任务时,线程间会存在竞争。
额外的开销,例如双端队列
ForkJoin框架的api支持:
- ForkJoinPool: 管理ForkJoin框架里面线程的执行
- ForkJoinTask:一个抽象的类定义了task在ForkJoinPool里面的运行
- RecursiveAction: ForkJoinTask的子类运行一个没有返回值的任务
- RecursiveTask: ForkJoinTask的子类运行一个有返回值的任务
其中ForkJoinTask类代表了抽象的切成的任务,而不是实际的执行的线程,
这种机制允许一小部分数量的线程去管理执行一大部分任务。
ForkJoinTask的关键方法有:
final ForkJoinTask < V > fork ()
final V join ()
final V invoke ()
fork方法提交一个任务异步的去执行,这个方法返回this会持续的调用线程去运行。
join方法,会等待直到任务结束并返回其结果,对于没有返回值的任务返回的值Void类型。
invoke方法,包装了fork和join方法,我们只需要调用invoke方法,它就会启动任务然后返回结果。
额外的这里面还有两个静态方法:
static void invokeAll ( ForkJoinTask <?> task1 , ForkJoinTask <?> task2 ): static void invokeAll ( ForkJoinTask <?>… taskList )
前者执行两个任务,后者执行一个任务列表。
ForkJoinTask还有一个compute方法,这个方法用来编写主要的计算逻辑:
ForkJoinPool类主要有两个提交任务的方法:
第一个是invoke会同步等待直到所有的任务返回并得到执行结果。
第二个是异步的提交使用execute方法,对于结果的获取需要调用任务的get方法来阻塞获取。或者使用get超时方法来轮询获取结果。
最后值得一提的是,对于ForkJoinPool方法,我们不需要显式的调用shutdown来关闭,其使用的是守护线程,只要所有的线程运行完毕,线程池会自动退出。
ForkJoinPool组成类:
ForkJoinPool
充当fork/join框架里面的管理者,最原始的任务都要交给它才能处理。它负责控制整个fork/join有多少个workerThread,workerThread的创建,激活都是由它来掌控。它还负责workQueue队列的创建和分配,每当创建一个workerThread,它负责分配相应的workQueue。然后它把接到的活都交给workerThread去处理,它可以说是整个frok/join的容器。
ForkJoinWorkerThread
fork/join里面真正干活的"工人",本质是一个线程。里面有一个ForkJoinPool.WorkQueue的队列存放着它要干的活,接活之前它要向ForkJoinPool注册(registerWorker),拿到相应的workQueue。然后就从workQueue里面拿任务出来处理。它是依附于ForkJoinPool而存活,如果ForkJoinPool的销毁了,它也会跟着结束。
ForkJoinPool.WorkQueue
双端队列就是它,它负责存储接收的任务。
这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。
每个工作线程在运行中产生新的任务(通常是因为调用了 fork())时,会放入工作队列的队尾,并且工作线程在处理自己的工作队列时,使用的是 LIFO 方式,也就是说每次从队尾取出任务来执行。
每个工作线程在处理自己的工作队列同时,会尝试窃取一个任务(或是来自于刚刚提交到 pool的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首,也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO 方式。
Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
控制是FIFO还是LIFO
/**
* Takes next task, if one exists, in order specified by mode.
*/
final ForkJoinTask<?> nextLocalTask() {
return (config & FIFO_QUEUE) == 0 ? pop() : poll();
}
- ForkJoinTask:代表fork/join里面任务类型,我们一般用它的两个子类RecursiveTask、RecursiveAction。这两个区别在于RecursiveTask任务是有返回值,RecursiveAction没有返回值。任务的处理逻辑包括任务的切分都集中在compute()方法里面。
配置参数
通过代码指定,必须得在commonPool初始化之前(parallel的stream被调用之前,一般可在系统启动后设置)注入进去,否则无法生效。
通过启动参数指定无此限制,较为安全
- parallelism(即配置线程池个数)
可以通过java.util.concurrent.ForkJoinPool.common.parallelism进行配置,最大值不能超过MAX_CAP,即32767.
static final int MAX_CAP = 0x7fff; //32767
如果没有指定,则默认为Runtime.getRuntime().availableProcessors() - 1.
代码指定(必须得在commonPool初始化之前注入进去,否则无法生效)
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
或者参数指定
-Djava.util.concurrent.ForkJoinPool.common.parallelism=8
Fork Join注意的点
protected Long compute() {
if (任务足够小?) {
return computeDirect();
}
// 任务太大,一分为二:
SumTask subtask1 = new SumTask(...);
SumTask subtask2 = new SumTask(...);
// 分别对子任务调用fork():
subtask1.fork();
subtask2.fork();
// 合并结果:
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
return subresult1 + subresult2;
}
很遗憾,这种写法是错!误!的!这样写没有正确理解Fork/Join模型的任务执行逻辑。
JDK用来执行Fork/Join任务的工作线程池大小等于CPU核心数。在一个4核CPU上,最多可以同时执行4个子任务。对400个元素的数组求和,执行时间应该为1秒。但是,换成上面的代码,执行时间却是两秒。
这是因为执行compute()方法的线程本身也是一个Worker线程,当对两个子任务调用fork()时,这个Worker线程就会把任务分配给另外两个Worker,但是它自己却停下来等待不干活了!这样就白白浪费了Fork/Join线程池中的一个Worker线程,导致了4个子任务至少需要7个线程才能并发执行。
正确的写法:
SumTask subtask1 = new SumTask(this.array, start, middle);
SumTask subtask2 = new SumTask(this.array, middle, end);
invokeAll(subtask1, subtask2);
Long subresult1 = subtask1.join();
Long subresult2 = subtask2.join();
Long result = subresult1 + subresult2;
其实,我们查看JDK的invokeAll()方法的源码就可以发现,invokeAll的N个任务中,其中N-1个任务会使用fork()交给其它线程执行,但是,它还会留一个任务自己执行,这样,就充分利用了线程池,保证没有空闲的不干活的线程。
https://www.liaoxuefeng.com/article/1146802219354112
例子
下面的代码是测试forkjoin和for循环的耗时。
测试结果:首次forkjoin的耗时会远远比for循环的耗时高,这是因为首次需要去初始化资源,比如创建线程池等。所以耗时会远远比for循环高,但是第二次的耗时会比for循环少很多,这是因为所需要的资源都已经创建好了,所以耗时会很低。
结论:forkjoin首次运行会需要创建线程池等资源所以会比实际运行任务的时间会高一些,接下里在使用forkjoin运行任务就会和运行任务本身耗时差不多。
public static void main(String[] args) {
//测试forkjoin耗时
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 8; i++) {
list.add(i);
}
long s = System.currentTimeMillis();
list.parallelStream().forEach(i -> {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long e = System.currentTimeMillis();
System.out.println("forkjoin:" + (e - s));
//测试for循环耗时
s = System.currentTimeMillis();
for (int i : list) {
try {
Thread.sleep(10);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
e = System.currentTimeMillis();
System.out.println("for:" + (e - s));
//再次测试forkjoin耗时
s = System.currentTimeMillis();
list.parallelStream().forEach(i -> {
try {
Thread.sleep(10);
} catch (InterruptedException e2) {
e2.printStackTrace();
}
});
e = System.currentTimeMillis();
System.out.println("forkjoin:" + (e - s));
}
结果
forkjoin:81
for:69
forkjoin:12
上面的代码有一个细节要注意的是,由于我本机的机器是8核,通过以下代码可以获取forkjoin默认启动线程池大小:
ForkJoinPool.getCommonPoolParallelism();
可以得出结果是7,也就是可以同时运行7个任务。
但是我上面的循环次数确实8,也就是有8个任务同时运行,按理来说任务耗时要大于20ms,可实际打印结果确实12ms。这是为什么呢?原因是因为main线程也会运行任务,因为最开始分解任务的线程是main线程,main线程分解任务后自己也会运行任务,这样就不会白白浪费一个线程,所以这也就是为什么 ForkJoinPool.getCommonPoolParallelism();得到可以同时并行运行任务的是 = 本机CPU核数-1 的原因,这是因为main线程已经占用了一个线程运行任务。
我把上面的循环次数改为9,然后再次运行,得到结果:
forkjoin:116
for:98
forkjoin:23
可以看到第二次forkjoin耗费了23s。因为我的CPU是8核,每次最多可以运行8个任务,这次有9个任务,所以只能等其他8个任务运行完了之后,第9个任务在运行,所以耗时会是23ms。
参考
https://mp.weixin.qq.com/s/yvmWS4cCwTPzyEB3AxK5MQ