并发多个任务执行最终把结果合并,想到了Fork/Join框架。在JDK1.7 Concurrent包提供了一种Fork/Join框架,使用的是线程窃取算法和分而治之的思想,是ExecutorService接口的另一种实现。
分而治之:是Executor框架的思想,也是Fork/Join框架的思想之一,就是把多任务切割为小任务,并发执行每个小任务。需要确定一个任务大小限定值,低于该限定值,就执行任务,大于该限定值,就继续分割任务。
工作窃取算法:当工作线程做完自己的工作,发现还有未被领取执行的任务,则拿过来执行。
Fork/Join框架核心有两个类组成:
- ForkJoinPool: 实现ExecutorService和工作窃取算法,它对工作线程进行管理,也管理ForkJoinTask的执行。
- ForkJoinTask: 实现Future接口的抽象类,是由ForkJoinPool管理和执行,所有子类必须重写compute方法,任务的分割和执行,逻辑的调用都在该方法里面实现。而在fork/join中的在调用fork()方法时,重新进入ForkJoinTask里的compute方法,层层细分直到符合限定条件。 该类有两个子类:RecursiveTask类有返回值,RecursiveAction类无返回值。
ForkJoinTask中的任务是否使用工作窃取算法:
- 如invokeAll(), 使用工作窃取算法,不会闲置线程;
- 如fork()然后join(),也会工作窃取算法,可能会有闲置线程 不能充分利用;
我们来看看真是如此吗?
使用Intellij idea调试,因ForkJoinPool有个toString方法,用于打印线程池的运行参数,参数说明如下:
. parallelism: 当前ForkJoinPool设定的并行级别
. size: 当前ForkJoinPool线程池内部的所有线程数量,包括阻塞状态,运行状态;
. active: 当前线程池内部, 正在进行compute计算的线程(不代表没被阻塞);
. running: 当前线程池内部,正在进程compute计算并且没有被阻塞的线程数量;
. steals: 当前ForkJoinPool线程池内部各个work queue间发生的“工作窃取”操作的总次数。
. tasks: 当前ForkJoinPool线程池内部各个work queue中等待处理的子任务总数;
. submissions: 通过submit方式或者其他方式提交到ForkJoinPool中,准备进行归并计算但ForkJoinPool还没有开始处理的任务数量(ForkJoinTask任务或者其子任务)数量;
invokeAll和fork() & join()方式下,未提交任务前,forkjoinPool的运行参数:
提交任务后,invokeAll()方式调试得出:
提交任务后,fork() & join()方式调试得出:
从得到结果后的forkjoinpool运行参数可知道,在同一环境同一任务情况下,线程池里线程数fork() & join()方法比invokeall()方法多,fork() & join()可能有一个或者两个线程浪费,而invokeall没有浪费;因为fork() & join()方法中线程把子任务分给其他线程,然后自己就不工作了。
join()和get()方法的区别:
这两个方法都是在等待任务结束,获取返回结果。
- join,不能被中断,如中断,抛出InterruptException异常,用在compute方法内部;
- 如果任务抛出运行时异常,get()抛出ExecutionException,而join会抛出RuntimeException异常。
ForkJoinPool类中 execute方法和invoke()方法:
execute方法 没有返回值,非阻塞的;方法如下:
invoke()方法 需要等待最终计算的结果返回,是阻塞的;
参考:
http://blog.csdn.net/chenchaofuck1/article/details/51637202
http://www.jianshu.com/p/0120b3dd255f