概述
fork/join 框架在 Java 7 中呈现。它提供了一些工具,通过尝试使用所有可用的处理器内核来帮助加速并行处理 - 这是通过分而治之的方法实现的——分治算法。
Java 8 的并行流背后使用的基础架构就是该框架。
在实践中,这意味着框架首先“fork(分叉)”,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。
之后,“join(并入)”部分开始,其中所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待直到执行每个子任务。
为了提供有效的并行执行,fork/join 框架使用一个名为 ForkJoinPool
的线程池,它管理 ForkJoinWorkerThread
类型的工作线程。
ForkJoinPool
ForkJoinPool 是框架的核心。它是 ExecutorService
的一个实现,它管理工作线程并为我们提供工具来获取有关线程池状态和性能的信息。
工作线程当时只能执行一个任务,但 ForkJoinPool 不会为每个子任务创建单独的线程。相反,池中的每个线程都有自己的双端队列(或deque),用于存储任务。
这种架构对于在工作窃取算法的帮助下平衡线程的工作负载至关重要。
工作窃取算法
简单地说 - 空闲线程试图从繁忙线程的双端队列中“窃取”工作。
默认情况下,工作线程从其自己的双端队列头部获取任务。当它为空时,线程从另一个忙线程的双端队列尾部或全局入口队列中获取任务。
这种方法最大限度地减少了线程竞争任务的可能性。它还减少了线程必须寻找工作的次数,因为它首先在最大可用工作块上工作。
ForkJoinPool 实例化
在 Java 8 中,访问 ForkJoinPool 实例的最方便方法是使用其静态方法commonPool()。顾名思义,这将提供对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。
根据 Oracle 文档,使用预定义的公共池可以减少资源消耗,因为这会阻止为每个任务创建单独的线程池。
ForkJoinPool commonPool = ForkJoinPool.commonPool();
通过创建 ForkJoinPool 并将其分配给工具类的公共静态字段,可以在 Java 7 中实现相同的行为:
public static ForkJoinPool forkJoinPool = new ForkJoinPool();
然后,可以很容易的访问:
ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;
使用 ForkJoinPool 的构造函数,可以创建具有特定级别的并行性,线程工厂和异常处理程序的自定义线程池。默认使用当前可用的处理器核心。
ForkJoinTask<V>
ForkJoinTask 是 ForkJoinPool 中执行的任务的基本类型。在实践中,应扩展其两个子类中的一个:
- void 任务的 RecursiveAction
- 返回值的任务的 RecursiveTask <V>。
它们都有一个抽象方法 compute(),该方法定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便拆分时,生成单个子任务结果的逻辑。该方法的实现类似于下面的伪代码:
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}
没有确切的标准决定一个任务是否应该再拆分。递归的任务拆分过程如图所示:
RecursiveAction – 例子
为了演示框架的分支行为,如果使用 createSubtask() 方法,workload.length() 大于指定的阈值,则该示例将分割任务。
String 被递归地划分为子串,创建基于这些子串的 CustomRecursiveTask 实例。
因此,该方法返回 List <CustomRecursiveAction>。 该列表使用 invokeAll() 方法提交给 ForkJoinPool:
public class CustomRecursiveAction extends RecursiveAction {
private String workload = "";
private static final int THRESHOLD = 4;
private static Logger logger =
Logger.getAnonymousLogger();
public CustomRecursiveAction(String workload) {
this.workload = workload;
}
@Override
protected void compute() {
if (workload.length() > THRESHOLD) {
ForkJoinTask.invokeAll(createSubtasks());
} else {
processing(workload);
}
}
private List<CustomRecursiveAction> createSubtasks() {
List<CustomRecursiveAction> subtasks = new ArrayList<>();
String partOne = workload.substring(0, workload.length() / 2);
String partTwo = workload.substring(workload.length() / 2, workload.length());
subtasks.add(new CustomRecursiveAction(partOne));
subtasks.add(new CustomRecursiveAction(partTwo));
return subtasks;
}
private void processing(String work) {
String result = work.toUpperCase();
logger.info("This result - (" + result + ") - was processed by "
+ Thread.currentThread().getName());
}
}
此模式可用于开发自己的 RecursiveAction 类。要执行此操作,请创建一个表示工作总量的对象,选择合适的阈值,定义分割工作的方法,并定义执行工作的方法。
RecursiveTask<V>
对于返回值的任务,此处的逻辑类似,除了每个子任务的结果在一个结果中合并:
public class CustomRecursiveTask extends RecursiveTask<Integer> {
private int[] arr;
private static final int THRESHOLD = 20;
public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}
@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {
return ForkJoinTask.invokeAll(createSubtasks())
.stream()
.mapToInt(ForkJoinTask::join)
.sum();
} else {
return processing(arr);
}
}
private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(
Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}
private Integer processing(int[] arr) {
return Arrays.stream(arr)
.filter(a -> a > 10 && a < 27)
.map(a -> a * 10)
.sum();
}
}
在此示例中,工作由存储在 CustomRecursiveTask 类的 arr 字段中的数组表示。createSubtask() 方法递归地将任务划分为较小的工作,直到每个部分小于阈值。然后,invokeAll() 方法将子任务提交给公共拉取并返回 Future 列表。
要触发执行,为每个子任务调用 join() 方法。
在这个例子中,这是使用 Java 8 的 Stream API 完成的; sum() 方法用于将子结果组合到最终结果中。
将任务提交到 ForkJoinPool
要将任务提交到线程池,只有很少的方法可以使用。 submit() 或 execute() 方法(它们的用例是相同的):
forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();
invoke() 方法分叉任务并等待结果,不需要任何手动加入:
int result = forkJoinPool.invoke(customRecursiveTask);
invokeAll() 方法是将 ForkJoinTasks 序列提交给 ForkJoinPool 的最方便的方法。 它将任务作为参数(两个任务,var args 或集合),forks 它们按照生成它们的顺序返回 Future 对象的集合。
或者,您可以使用单独的 fork()
和 join()
方法。 fork() 方法将任务提交给池,但它不会触发它的执行。 join() 方法用于此目的。在 RecursiveAction 的情况下,join() 只返回 null;对于 RecursiveTask <V>,它返回任务执行的结果:
customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();
在我们的 RecursiveTask <V> 示例中,我们使用 invokeAll() 方法向池提交一系列子任务。使用 fork() 和 join() 可以完成相同的工作,但这会对结果的排序产生影响。
为避免混淆,使用 invokeAll() 方法向 ForkJoinPool 提交多个任务通常是个好主意。
结论
使用 fork/join 框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:
- 使用尽可能少的线程池 - 在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池
- 请使用默认的公共线程池 - 如果不需要特定调整
- 使用合理的阈值将 ForkJoingTask 拆分为子任务
- 避免在 ForkJoingTasks 中出现任何阻塞
本文中使用的示例可在链接的GitHub存储库中找到。