Fork/Join 框架入门

概述

fork/join 框架在 Java 7 中呈现。它提供了一些工具,通过尝试使用所有可用的处理器内核来帮助加速并行处理 - 这是通过分而治之的方法实现的——分治算法。

Java 8 的并行流背后使用的基础架构就是该框架。

在实践中,这意味着框架首先“fork(分叉)”,递归地将任务分解为较小的独立子任务,直到它们足够简单以便异步执行。

之后,“join(并入)”部分开始,其中所有子任务的结果递归地连接成单个结果,或者在返回 void 的任务的情况下,程序只是等待直到执行每个子任务。

为了提供有效的并行执行,fork/join 框架使用一个名为 ForkJoinPool 的线程池,它管理 ForkJoinWorkerThread 类型的工作线程。

ForkJoinPool

ForkJoinPool 是框架的核心。它是 ExecutorService 的一个实现,它管理工作线程并为我们提供工具来获取有关线程池状态和性能的信息。

工作线程当时只能执行一个任务,但 ForkJoinPool 不会为每个子任务创建单独的线程。相反,池中的每个线程都有自己的双端队列(或deque),用于存储任务。

这种架构对于在工作窃取算法的帮助下平衡线程的工作负载至关重要。

工作窃取算法

简单地说 - 空闲线程试图从繁忙线程的双端队列中“窃取”工作。

默认情况下,工作线程从其自己的双端队列头部获取任务。当它为空时,线程从另一个忙线程的双端队列尾部或全局入口队列中获取任务。

这种方法最大限度地减少了线程竞争任务的可能性。它还减少了线程必须寻找工作的次数,因为它首先在最大可用工作块上工作。

ForkJoinPool 实例化

在 Java 8 中,访问 ForkJoinPool 实例的最方便方法是使用其静态方法commonPool()。顾名思义,这将提供对公共池的引用,公共池是每个 ForkJoinTask 的默认线程池。

根据 Oracle 文档,使用预定义的公共池可以减少资源消耗,因为这会阻止为每个任务创建单独的线程池。

ForkJoinPool commonPool = ForkJoinPool.commonPool();

通过创建 ForkJoinPool 并将其分配给工具类的公共静态字段,可以在 Java 7 中实现相同的行为:

public static ForkJoinPool forkJoinPool = new ForkJoinPool();

然后,可以很容易的访问:

ForkJoinPool forkJoinPool = PoolUtil.forkJoinPool;

使用 ForkJoinPool 的构造函数,可以创建具有特定级别的并行性,线程工厂和异常处理程序的自定义线程池。默认使用当前可用的处理器核心。

ForkJoinTask<V>

ForkJoinTask 是 ForkJoinPool 中执行的任务的基本类型。在实践中,应扩展其两个子类中的一个:

  • void 任务的 RecursiveAction
  • 返回值的任务的 RecursiveTask <V>。

它们都有一个抽象方法 compute(),该方法定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便拆分时,生成单个子任务结果的逻辑。该方法的实现类似于下面的伪代码:

if (任务足够小或不可分) {
    顺序计算该任务
} else {
    将任务分成两个子任务
    递归调用本方法,拆分每个子任务,等待所有子任务完成
    合并每个子任务的结果
}

没有确切的标准决定一个任务是否应该再拆分。递归的任务拆分过程如图所示:

fork-join.jpg

RecursiveAction – 例子

为了演示框架的分支行为,如果使用 createSubtask() 方法,workload.length() 大于指定的阈值,则该示例将分割任务。

String 被递归地划分为子串,创建基于这些子串的 CustomRecursiveTask 实例。

因此,该方法返回 List <CustomRecursiveAction>。 该列表使用 invokeAll() 方法提交给 ForkJoinPool:

public class CustomRecursiveAction extends RecursiveAction {
 
    private String workload = "";
    private static final int THRESHOLD = 4;
 
    private static Logger logger = 
      Logger.getAnonymousLogger();
 
    public CustomRecursiveAction(String workload) {
        this.workload = workload;
    }
 
    @Override
    protected void compute() {
        if (workload.length() > THRESHOLD) {
            ForkJoinTask.invokeAll(createSubtasks());
        } else {
           processing(workload);
        }
    }
 
    private List<CustomRecursiveAction> createSubtasks() {
        List<CustomRecursiveAction> subtasks = new ArrayList<>();
 
        String partOne = workload.substring(0, workload.length() / 2);
        String partTwo = workload.substring(workload.length() / 2, workload.length());
 
        subtasks.add(new CustomRecursiveAction(partOne));
        subtasks.add(new CustomRecursiveAction(partTwo));
 
        return subtasks;
    }
 
    private void processing(String work) {
        String result = work.toUpperCase();
        logger.info("This result - (" + result + ") - was processed by "
          + Thread.currentThread().getName());
    }
}

此模式可用于开发自己的 RecursiveAction 类。要执行此操作,请创建一个表示工作总量的对象,选择合适的阈值,定义分割工作的方法,并定义执行工作的方法。

RecursiveTask<V>

对于返回值的任务,此处的逻辑类似,除了每个子任务的结果在一个结果中合并:

public class CustomRecursiveTask extends RecursiveTask<Integer> {
    private int[] arr;
 
    private static final int THRESHOLD = 20;
 
    public CustomRecursiveTask(int[] arr) {
        this.arr = arr;
    }
 
    @Override
    protected Integer compute() {
        if (arr.length > THRESHOLD) {
            return ForkJoinTask.invokeAll(createSubtasks())
              .stream()
              .mapToInt(ForkJoinTask::join)
              .sum();
        } else {
            return processing(arr);
        }
    }
 
    private Collection<CustomRecursiveTask> createSubtasks() {
        List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, 0, arr.length / 2)));
        dividedTasks.add(new CustomRecursiveTask(
          Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
        return dividedTasks;
    }
 
    private Integer processing(int[] arr) {
        return Arrays.stream(arr)
          .filter(a -> a > 10 && a < 27)
          .map(a -> a * 10)
          .sum();
    }
}

在此示例中,工作由存储在 CustomRecursiveTask 类的 arr 字段中的数组表示。createSubtask() 方法递归地将任务划分为较小的工作,直到每个部分小于阈值。然后,invokeAll() 方法将子任务提交给公共拉取并返回 Future 列表。

要触发执行,为每个子任务调用 join() 方法。

在这个例子中,这是使用 Java 8 的 Stream API 完成的; sum() 方法用于将子结果组合到最终结果中。

将任务提交到 ForkJoinPool

要将任务提交到线程池,只有很少的方法可以使用。 submit() 或 execute() 方法(它们的用例是相同的):

forkJoinPool.execute(customRecursiveTask);
int result = customRecursiveTask.join();

invoke() 方法分叉任务并等待结果,不需要任何手动加入:

int result = forkJoinPool.invoke(customRecursiveTask);

invokeAll() 方法是将 ForkJoinTasks 序列提交给 ForkJoinPool 的最方便的方法。 它将任务作为参数(两个任务,var args 或集合),forks 它们按照生成它们的顺序返回 Future 对象的集合。

或者,您可以使用单独的 fork()join() 方法。 fork() 方法将任务提交给池,但它不会触发它的执行。 join() 方法用于此目的。在 RecursiveAction 的情况下,join() 只返回 null;对于 RecursiveTask <V>,它返回任务执行的结果:

customRecursiveTaskFirst.fork();
result = customRecursiveTaskLast.join();

在我们的 RecursiveTask <V> 示例中,我们使用 invokeAll() 方法向池提交一系列子任务。使用 fork() 和 join() 可以完成相同的工作,但这会对结果的排序产生影响。

为避免混淆,使用 invokeAll() 方法向 ForkJoinPool 提交多个任务通常是个好主意。

结论

使用 fork/join 框架可以加速处理大型任务,但要实现这一结果,应遵循一些指导原则:

  • 使用尽可能少的线程池 - 在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池
  • 请使用默认的公共线程池 - 如果不需要特定调整
  • 使用合理的阈值将 ForkJoingTask 拆分为子任务
  • 避免在 ForkJoingTasks 中出现任何阻塞

本文中使用的示例可在链接的GitHub存储库中找到。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容