1.落笔缘由
由于之前希望对Java异步操作进行一次梳理,碰巧看到了Fork/Join,之前并没有了解过,所以借这次机会来了解一下它的用途。
2.Fork/Join作用
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。
和ExecutorService接口的其他实现一样(其实我正是在整理ExecutorService的SingleThreadExecutor等子类的时候,看到了WorkStealingPool,才进而接触到Fork/Join框架),Fork/Join会将任务分发给线程池中的工作线程。Fork/Join使用工作窃取(work-stealing)算法。那么什么是工作窃取方法?简单来说就是某个线程从其他队列里窃取任务来执行。我们知道Fork/Join的作用就是将一个大任务分为若干小任务,最后将这些小任务的执行结果整合起来得到大任务的结果。而Fork/Join把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。那么这里就会出现一种情况,有些线程的任务队列里的任务已经完成,但其他线程的队列还有任务没完成,这样就造成已完成任务线程闲置,这也太浪费了吧,所以为了提高效率,完成自己的任务而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join框架的核心是ForkJoinPool
类:
//ForkJoinPool继承抽象类AbstractExecutorService
ForkJoinPool extends AbstractExecutorService
//而AbstractExecutorService实现的是ExecutorService接口
AbstractExecutorService implements ExecutorService
//创建ForkJoinPool对象:
//使用Runtime.availableProcessors()获取的数值作为并行级别;使用默认的default thread factory;UncaughtExceptionHandler为空;非异步LIFO模式。
public ForkJoinPool()
//使用指定数值(parallelism)作为并行级别;使用默认的default thread factory;UncaughtExceptionHandler为空;非异步LIFO模式。
public ForkJoinPool(int parallelism)
//通过Executors创建ForkJoinPool对象
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();
ForkJoinPool实现了工作偷取算法,通过执行ForkJoinTask任务来实现。
3.执行过程
Fork/Join的执行过程主要有两步:
第一步分割任务。通过ForkJoinTask对象,调用fork()方法把大任务分割成子任务。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据,而子任务执行的结果是通过ForkJoinTask的join()方法获取的。
下面说一下ForkJoinTask,ForkJoinTask有点像Thread的Runable,都是用来定义要执行的任务的。
ForkJoinTask有两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
具体可以看一下oracle的Fork and Join: Java Can Excel at Painless Parallel Programming Too!
这篇文章,里面有介绍使用Fork/Join框架的例子(是关于计算文档中的单词出现次数),下面这个类就是这个例子中的,他继承了RecursiveTask ,返回一个Long型的结果。
public class FolderSearchTask extends RecursiveTask<Long> {
private final Folder folder;
private final String searchedWord;
FolderSearchTask(Folder folder, String searchedWord) {
super();
this.folder = folder;
this.searchedWord = searchedWord;
}
@Override
protected Long compute() {
long count = 0L;
List<RecursiveTask<Long>> forks = new LinkedList<>();
for (Folder subFolder : folder.getSubFolders()) {
FolderSearchTask task = new FolderSearchTask(subFolder, searchedWord);
forks.add(task);
task.fork();
}
for (Document document : folder.getDocuments()) {
DocumentSearchTask task = new DocumentSearchTask(document, searchedWord);
forks.add(task);
task.fork();
}
for (RecursiveTask<Long> task : forks) {
count = count + task.join();
}
return count;
}
}
4.Fork/Join的基本用法
在Oracle的文档里有关于Fork/Join的用法。基本思维就是给出是否需要将当前任务分成小任务的条件。
if (当前这个任务工作量足够小)
直接完成这个任务
else
将这个任务分解成两个部分
分别触发(invoke)这两个子任务的执行,并等待结果
这个操作是发生在ForkJoinTask里的compute()方法里,不管是继承RecursiveAction还是RecursiveTask ,都要重写compute()。
5.例子解析
1)定义任务
下面是一个关于计算斐波那契数列的例子,由于我吗要计算指定长度斐波那契数列的和,所以我们的任务是需要有返回值的,所以继承RecursiveTask,定义返回值是Long型。而在compute()方法里,就是当斐波那契数列数字的个数小于10就直接返回这10个值的和,而大于10的时候,就将这个任务分成两个小任务。
public class FibonacciTask extends RecursiveTask<Long>
{
private static final long serialVersionUID = 1L;
private List<Long> mList = null;
private int size = 0;
public FibonacciTask(List<Long> list, int size)
{
mList = list;
this.size = size;
System.out.println("num:"+(++Num.num));
}
@Override
protected Long compute()
{
if (mList!=null)
{
if (mList.size()<10)
{
return cal(mList);
}else
{
List<List<Long>> lists = averageAssign(mList,2);
FibonacciTask fibonacciTask1 = new FibonacciTask(lists.get(0), 5);
FibonacciTask fibonacciTask2 = new FibonacciTask(lists.get(1), 5);
fibonacciTask1.fork();
fibonacciTask2.fork();
System.out.println("list0:"+lists.get(0).size()+" list1:"+lists.get(1).size());
return fibonacciTask1.join()+fibonacciTask2.join();
}
}
return null;
}
private long cal(List<Long> list)
{
long total = 0;
for (int i = 0; i < list.size(); i++)
{
total = list.get(i)+total;
}
return total;
}
/**
* 将一个list均分成n个list,主要通过偏移量来实现的
* @param source
* @return
*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){
List<List<T>> result=new ArrayList<List<T>>();
int remaider=source.size()%n; //(先计算出余数)
int number=source.size()/n; //然后是商
int offset=0;//偏移量
for(int i=0;i<n;i++){
List<T> value=null;
if(remaider>0){
value=source.subList(i*number+offset, (i+1)*number+offset+1);
remaider--;
offset++;
}else{
value=source.subList(i*number+offset, (i+1)*number+offset);
}
result.add(value);
}
return result;
}
}
2)执行任务
在执行任务之前,我们先生成通过下面的createFibonacci方法生成一个斐波那契数列。然后通过ForkJoinPool 对象调用submit方法执行FibonacciTask 任务。
/**
* @author LGY
* @time 2017-4-8
* @action
*/
public class TestFibonacci
{
public static void main(String[] args)
{
long startTime;
long stopTime;
long singleThreadTimes;
FibonacciTask fibonacciTask = new FibonacciTask(createFibonacci(0, 1, 100), 5);
ForkJoinPool forkJoinPool = new ForkJoinPool();
startTime = System.currentTimeMillis();
Future<Long> future = forkJoinPool.submit(fibonacciTask);
stopTime = System.currentTimeMillis();
singleThreadTimes = (stopTime - startTime);
System.out.println( " fork / join search took "
+ singleThreadTimes + "ms");
try
{
System.out.println("result:"+future.get());
} catch (Exception e)
{
e.printStackTrace();
}
}
/**
* @author LGY
* @action 生成指定长度斐波那契数列
* @time 2017-4-9
* @param first 第一个值
* @param secend 第一个值
* @param size 斐波那契数列长度
* @return
*/
private static List<Long> createFibonacci(long first , long secend , int size)
{
List<Long> list = new ArrayList<Long>();
long total = 0;
long startTime;
long stopTime;
long singleThreadTimes;
if (size == 1)
{
list.add(first);
}else if (size == 2)
{
list.add(first);
list.add(secend);
}else if (size>2) {
list.add(first);
list.add(secend);
for (int i = 0; i < size-2; i++)
{
list.add(list.get(i)+list.get(i+1));
}
}
System.out.print("[");
startTime = System.currentTimeMillis();
for (int i = 0; i < list.size(); i++)
{
System.out.print(list.get(i)+" ");
total = list.get(i)+total;
}
System.out.print("]"+"result:"+total);
stopTime = System.currentTimeMillis();
singleThreadTimes = (stopTime - startTime);
System.out.println();
System.out.println( " single thread search took "
+ singleThreadTimes + "ms");
return list;
}
}
6.总结
关于Fork/Join的Fork and Join: Java Can Excel at Painless Parallel Programming Too!一文的例子源码可以到http://www.oracle.com/technetwork/articles/java/forkjoinsources-430155.zip下载,也可以在本文文章的源码地址处获取。
在这个源码的WordCounter类中,需要传三个参数给main方法,右击WordCounter类,点击Run As-->Run Configurations,选择页卡Arguments
输入三个参数,第一个是文档地址,第二个是要计算出现了多少次的单词,最后一个是要重复执行任务的次数
7.参考文章
http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Fork and Join: Java Can Excel at Painless Parallel Programming Too!译文资料