前言
ForkJoin框架是Java 7 提供的把一个大任务分割成若干个小任务,最终汇总每一个任务结果后得到大任务结果的框架。ForkJoinPool继承AbstractExecutorService,实现了Executor,ExecutorService。ForkJoinPool用来实现 工作窃取 算法。
工作窃取算法
- fork:将一个大任务根据条件不断的分割为互不依赖的小任务,并将这些小任务放入不同的队列,每个工作线程维护一个任务队列,当自己维护的任务队列为空时随机从别的任务队列获取任务执行。
- join:最后再将这些小任务的结果都放在一个统一的队列当中,最后启动一个工作线程从这个结果队列中取数据合并得到最终的结果。
工作窃取算法中分割任务和合并结果的思想和Hadoop的MapReduce编程模型一样,优点是充分提高程序的并行度,有效的减少了线程之间的竞争,但是也会消耗过多的系统资源。
ForkJoinTask及其子类
ForkJoinTask是提交给ForkJoinPool执行的任务类型
ForkJoinTask是一个实现了Future接口的抽象类,在实际开发中,不需要继承ForkJoinTask,而是重写其子类的compute方法即可,在compute方法里实现任务的切分和结果的合并。
两个子类
- RecursiveAction:用于没有返回值的任务。
- RecursiveTask:用于有返回值的任务。
fork方法
此方法将任务放入ForkJoinWorkerThread线程的工作队列,异步执行,立即返回,此方法不会阻塞,异步执行会调用** ForkJoinPool的signalWork方法唤醒或创建一个线程来执行这个任务的compute方法**,如果还是大于切分的阀值,则会继续切分然后调用fork添加任务直到任务不再可切分。
join方法
join方法会阻塞当前线程并等待获取结果。
示例
需要注意的是Future.get方法中会抛出InterruptedException、ExecutionException两种异常,即使在try/catch中尝试捕获ExecutionException异常,任务执行发生异常时也可能不会抛出异常,需要调用isCompletedAbnormally进行判断,然后调用getException获取这个异常。
总结
compute方法有点类似于递归思想,先判断结束分割的条件是否满足,满足则执行计算并返回结果,不满足则切分为多个小任务,然后在调用小任务的fork方法,并等待小任务的结果返回,小任务结果返回则合并小任务的结果并将此任务的最终结果返回,从而再被更大的任务合并。