ForkJoinPool
背景描述
过去我们在线程池解决问题时,通常维护了一个阻塞的任务队列。每个工作线程在任务完成后,就会去任务队列里面寻找任务。这种方式在我们执行数量较多且不互相依赖的任务时非常方便且高效。但是当我们需要执行一个很大的任务时,普通的线程池似乎就很难有什么帮助了。
在JDK7中新增了ForkJoinPool。ForkJoinPool采用分治+work-stealing的思想。可以让我们很方便地将一个大任务拆散成小任务,并行地执行,提高CPU的使用率。关于ForkJoinPool的精妙之处,我们将在后面的使用中慢慢说明。
如何使用
构造方法
Android官方文档中给出了三个构造方法。我们注意到在构造方法中,我们可以设置ForkJoinPool的最大工作线程数、工作线程工厂、拒绝任务的Handler和同步模式。
执行任务
ForkJoinPool提供了两套执行任务的API,它们的区别主要是返回的结果类型不同。invoke方法返回执行的结果,而submit方法返回执行的任务。
使用示例
需求
遍历系统所有文件,得到系统中文件的总数。
思路
通过递归的方法。任务在遍历中如果发现文件夹就创建新的任务让线程池执行,将返回的文件数加起来,如果发现文件则将计数加一,最终将该文件夹下的文件数返回。
代码实现
CountingTask countingTask = new CountingTask(Environment.getExternalStorageDirectory());
forkJoinPool.invoke(countingTask);
class CountingTask extends RecursiveTask<Integer> {
private File dir;
public CountingTask(File dir) {
this.dir = dir;
}
@Override
protected Integer compute() {
int count = 0;
File files[] = dir.listFiles();
if(files != null){
for (File f : files){
if(f.isDirectory()){
// 对每个子目录都新建一个子任务。
CountingTask countingTask = new CountingTask(f);
countingTask.fork();
count += countingTask.join();
}else {
Log.d("tag" , "current path = "+f.getAbsolutePath());
count++;
}
}
}
return count;
}
}
原理说明
所谓work-stealing模式,即每个工作线程都会有自己的任务队列。当工作线程完成了自己所有的工作后,就会去“偷”别的工作线程的任务。
那么这样的工作模式,有什么好处呢?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
上面的需求,如果我们用普通的线程池该如何完成?
如果我们使用newFixedThreadPool,当核心线程的路径下都有子文件夹时,它们会将路径下的子文件夹抛给任务队列,最终变成所有的核心线程都在等待子文件夹的返回结果,从而造成死锁。最终任务无法完成。
如果我们使用newCachedThreadPool,依然用上面的思路可以完成任务。但是每次子文件夹就会创建一个新的工作线程,这样消耗过大。
因此,在这样的情况下,ForkJoinPool的work-stealing的方式就体现出了优势。每个任务分配的子任务也由自己执行,只有自己的任务执行完成时,才会去执行别的工作线程的任务。
再来个例子
N项的Fibonacci数列求和,我们不再只能仰仗单个线程为我们执行任务。
package com.example;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MyClass {
static int computeCount = 0;
static class Fibonacci extends RecursiveTask<Integer> {
int n;
Fibonacci(int n) {
this.n = n;
}
@Override
protected Integer compute() {
computeCount ++;
System.out.printf("Current thread is " + Thread.currentThread()
+ "\n n = " + n + "\n");
if (n <= 2)
return 1;
Fibonacci f1 = new Fibonacci(n - 1);
f1.fork();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
Fibonacci f2 = new Fibonacci(n - 2);
f2.fork();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("wati temp answer is :" + n + "\n");
int answer = f1.join() + f2.join();
System.out.printf("temp answer is :" + answer + ", n is :" +n +"\n");
return answer;
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool(2);
Fibonacci task = new Fibonacci(5);
int answer = 0;
answer = pool.invoke(task);
System.out.printf("Hello answer is :" + answer + " , compute count is :" + computeCount);
}
}
结语
实测下来,当情况足够复杂时,ForkJoinPool的优势会愈加明显。但是,就像快排一样,最优策略并不是一个思路走到死,当分治的区域较小时,可以将小区域改用插入排序进行排序。同理,当我们递归到情况不再复杂时,就可以转而用别的线程池进行处理。
以上