Fork/Join框架学习

什么是Fork/Join框架

Fork/Join框架是一组允许程序员利用多核处理器支持的并行执行的API。它使用了“分而治之”策略:把非常大的问题分成更小的部分,反过来,小部分又可以进一步分成更小的部分,递归地直到一个部分可以直接解决。这被叫做“fork”。

然后所有部件在多个处理核心上并行执行。每个部分的结果被“join”在一起以产生最终结果。因此,框架的名称是“Fork/Join”。

下面的为代码展示了分治策略如何与Fork/Join框架一起工作:

if (problemSize < threshold)
    solve problem directly
else {
    break problem into subproblems
    recursively solve each problem
    combine the results
}

Fork/Join框架在JDk7中被加入,并在JDK8中进行了改进。它用了Java语言中的几个新特性,包括并行的Stream API和排序。

Fork/Join框架简化了并行程序的原因有:

  • 它简化了线程的创建,在框架中线程是自动被创建和管理。
  • 它自动使用多个处理器,因此程序可以扩展到使用可用处理器。

由于支持真正的并行执行,Fork/Join框架可以显著减少计算时间,并提高解决图像处理、视频处理、大数据处理等非常大问题的性能。

关于Fork/Join框架的一个有趣的地方是:它使用工作窃取算法来平衡线程之间的负载:如果一个工作线程没有事情要做,它可以从其他仍然忙碌的线程窃取任务。

理解Fork/Join框架API

Fork/Join框架在java.util.concurrent包下被实现。它的核心有4个类:

  • ForkJoinTask<V>: 这是一个抽象任务类,并且运行在ForkJoinPool中。
  • ForkJoinPool:这是一个线程池管理并运行众多ForkJoinTask任务。
  • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
  • RecursiveTask<V>: ForkJoinTask的子类,有返回值。

基本上,我们解决问题的代码是在RecursiveAction或者RecursiveTask中进行的,然后将任务提交由ForkJoinPool`执行,ForkJoinPool处理从线程管理到多核处理器的利用等各种事务。

我们先来理解一下这些类中的关键方法。

ForkJoinTask<V>

这是一个运行在ForkJoinPool中的抽象的任务类。类型V指定了任务的返回结果。ForkJoinTask是一个类似线程的实体,它表示任务的轻量级抽象,而不是实际的执行线程。该机制允许由ForkJoinPool中的少量实际线程管理大量任务。其关键方法是:

  • final ForkJoinTask<V> fork()
  • final V join()
  • final V invoke()

fork()方法提交并执行异步任务,该方法返回ForkJoinTask并且调用线程继续运行。

join()方法等待任务直到返回结果。

invoke()方法是组合了fork()join(),它开始一个任务并等待结束返回结果。

此外,ForkJoinTask中还提供了用于一次调用多个任务的两个静态方法

  • static void invokeAll(ForkJoinTask<?> task1, ForkJoinTask<?> task2) :执行两个任务
  • static void invokeAll(ForkJoinTask<?>… taskList):执行任务集合

RecursiveAction

这是一个递归的ForkJoinTask子类,不返回结果。Recursive意思是任务可以通过分治策略分成自己的子任务(在下面的下一节中,您将看到如何划分代码示例)。

我们必须重写compute()方法,并将计算代码写在其中:

protected abstract void compute();

RecursiveTask<V>

RecursiveAction一样,但是RecursiveTask有返回结果,结果类型由V指定。我们仍然需要重写compute()方法:

protected abstract V compute();

ForkJoinPool

这是Fork/Join框架的核心类。它负责线程的管理和ForkJoinTask的执行,为了执行ForkJoinTask,首先需要获取到ForkJoinPool的实例。

有两种构造器方式可以获取ForkJoinPool的实例,第一种使用构造器创建:

  • ForkJoinPool(): 使用默认的构造器创建实例,该构造器创建出的池与系统中可用的处理器数量相等。
  • ForkJoinPool(int parallelism):该构造器指定处理器数量,创建具有自定义并行度级别的池,该级别的并行度必须大于0,且不超过可用处理器的实际数量。

并行性的级别决定了可以并发执行的线程的数量。换句话说,它决定了可以同时执行的任务的数量——但不能超过处理器的数量。

但是,这并不限制池可以管理的任务的数量。ForkJoinPool可以管理比其并行级别多得多的任务。

获取ForkJoinPool实例的第二种方法是使用以下ForkJoinPool的静态方法获取公共池实例:

public static ForkJoinPool commonPool();

这种方式创建的池不受shutdown()或者shutdownNow()方法的影响,但是他会在System.exit()时会自动中止。任何依赖异步任务处理的程序在主体程序中止前都应该调用awaitQuiescence()方法。该方式是静态的,可以自动被使用。

ForkJoinPool中执行ForkJoinTasks

在创建好ForkJoinPool实例之后,可以使用下面的方法执行任务:

  • <T>T invoke(ForkJoinTask<T> task):执行指定任务并返回结果,该方法是异步的,调用的线程会一直等待直到该方法返回结果,对于RecursiveAction任务来说,参数类型是Void.
  • void execute(ForkJoinTask<?> task):异步执行指定的任务,调用的线程一直等待知道任务完成才会继续执行。

另外,也可以通过ForkJoinTask自己拥有的方法fork()invoke()执行任务。在这种情况下,如果任务还没在ForkJoinPool中运行,那么commonPool()将会自动被使用。

值得注意的一点是:ForkJoinPool使用的是守护线程,当所有的用户线程被终止是它也会被终止,这意味着可以不必显示的关闭ForkPoolJoin(虽然这样也可以)。如果是common pool的情况下,调用shutdown没有任何效果,应为这个池总是可用的。

好了,现在来看看一些例子。

案例

使用RecursiveAction

这里例子中,看一下如果使用Fork/Join框架去执行一个没有返回值的任务。

假设要对一个很大的数字数组进行变换,为了简单简单起见,转换只需要将数组中的每个元素乘以指定的数字。下面的代码用于转换任务:

import java.util.concurrent.*;
 
public class ArrayTransform extends RecursiveAction {
    int[] array;
    int number;
    int threshold = 100_000;
    int start;
    int end;
 
    public ArrayTransform(int[] array, int number, int start, int end) {
        this.array = array;
        this.number = number;
        this.start = start;
        this.end = end;
    }
    
    @Override
    protected void compute() {
        if (end - start < threshold) {
            computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
            ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
 
            invokeAll(subTask1, subTask2);
        }
    }
 
    protected void computeDirectly() {
        for (int i = start; i < end; i++) {
            array[i] = array[i] * number;
        }
    }
}

可以看到,这是一个RecursiveAction的子类,我们重写了compute()方法。

数组和数字从它的构造函数传递。参数start和end指定要处理的数组中的元素的范围。如果数组的大小大于阈值,这有助于将数组拆分为子数组,否则直接对整个数组执行计算。

观察else中的代码片段:

protected void compute() {
    if (end - start < threshold) {
        computeDirectly();
    } else {
        int middle = (end + start) / 2;
 
        ArrayTransform subTask1 = new ArrayTransform(array, number, start, middle);
        ArrayTransform subTask2 = new ArrayTransform(array, number, middle, end);
 
        invokeAll(subTask1, subTask2);
    }
}

这里,将数组分成两个部分,并分别创建他们的子任务,反过来,子任务也可以递归的进一步划分为更小的子任务,直到其大小小于直接调用computeDirectly();方法的的阈值。

然后,在main函数中创建ForkJoinPool执行任务:

ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
ForkJoinPool pool = new ForkJoinPool();
pool.invoke(mainTask);

或者使用common pool执行任务:

ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
mainTask.invoke();

这里是全部的测试程序:

import java.util.*;
import java.util.concurrent.*;
 
public class ForkJoinRecursiveActionTest {
    static final int SIZE = 10_000_000;
    static int[] array = randomArray();
 
    public static void main(String[] args) {
 
        int number = 9;
 
        System.out.println("数组中的初始元素: ");
        print();
 
        ArrayTransform mainTask = new ArrayTransform(array, number, 0, SIZE);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(mainTask);
 
        System.out.println("并行计算之后的元素:");
        print();
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
    static void print() {
        for (int i = 0; i < 10; i++) {
            System.out.print(array[i] + ", ");
        }
        System.out.println();
    }
}

如您所见,使用随机生成的1,000万个元素数组进行测试。由于数组太大,我们在计算前后只打印前10个元素,看效果如何:

数组中的初始元素:
42, 98, 43, 14, 9, 92, 33, 18, 18, 76,
并行计算之后的元素:
378, 882, 387, 126, 81, 828, 297, 162, 162, 684,

使用RecursiveTask

这个例子中,展示了如何使用带有返回值的任务,下面的任务计算在一个大数组中出现偶数的次数:

import java.util.concurrent.*;
 
public class ArrayCounter extends RecursiveTask<Integer> {
    int[] array;
    int threshold = 100_000;
    int start;
    int end;
 
    public ArrayCounter(int[] array, int start, int end) {
        this.array = array;
        this.start = start;
        this.end = end;
    }
 
    protected Integer compute() {
        if (end - start < threshold) {
            return computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayCounter subTask1 = new ArrayCounter(array, start, middle);
            ArrayCounter subTask2 = new ArrayCounter(array, middle, end);
 
            invokeAll(subTask1, subTask2);
 
 
            return subTask1.join() + subTask2.join();
        }
    }
 
    protected Integer computeDirectly() {
        Integer count = 0;
 
        for (int i = start; i < end; i++) {
            if (array[i] % 2 == 0) {
                count++;
            }
        }
 
        return count;
    }
}

如你所见,这个类是RecursiveTask的子类并且重写了compute()方法,并且返回了一个整型的结果。

这里还使用了join()方法去合并子任务的结果:

return subTask1.join() + subTask2.join();

测试程序就和RecursiveAction的一样:

import java.util.*;
import java.util.concurrent.*;
 
public class ForkJoinRecursiveTaskTest {
    static final int SIZE = 10_000_000;
    static int[] array = randomArray();
 
    public static void main(String[] args) {
 
        ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE);
        ForkJoinPool pool = new ForkJoinPool();
        Integer evenNumberCount = pool.invoke(mainTask);
 
        System.out.println("偶数的个数: " + evenNumberCount);
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
}

运行程序就会看到如下的结果:

偶数的个数: 5000045

并行性试验

这个例子展示并行性的级别如何影响计算时间:

ArrayCounter类让阈值可以通过构造器传入:

import java.util.concurrent.*;
 
public class ArrayCounter extends RecursiveTask<Integer> {
    int[] array;
    int threshold;
    int start;
    int end;
 
    public ArrayCounter(int[] array, int start, int end, int threshold) {
        this.array = array;
        this.start = start;
        this.end = end;
        this.threshold = threshold;
    }
 
    protected Integer compute() {
        if (end - start < threshold) {
            return computeDirectly();
        } else {
            int middle = (end + start) / 2;
 
            ArrayCounter subTask1 = new ArrayCounter(array, start, middle, threshold);
            ArrayCounter subTask2 = new ArrayCounter(array, middle, end, threshold);
 
            invokeAll(subTask1, subTask2);
 
 
            return subTask1.join() + subTask2.join();
        }
    }
 
    protected Integer computeDirectly() {
        Integer count = 0;
 
        for (int i = start; i < end; i++) {
            if (array[i] % 2 == 0) {
                count++;
            }
        }
 
        return count;
    }
}

测试程序将并行度级别和阈值作为参数传递:

import java.util.*;
import java.util.concurrent.*;
 
public class ParallelismTest {
    static final int SIZE = 10_000_000;
 
    static int[] array = randomArray();
 
    public static void main(String[] args) {
        int threshold = Integer.parseInt(args[0]);
        int parallelism = Integer.parseInt(args[1]);
 
        long startTime = System.currentTimeMillis();
 
        ArrayCounter mainTask = new ArrayCounter(array, 0, SIZE, threshold);
        ForkJoinPool pool = new ForkJoinPool(parallelism);
        Integer evenNumberCount = pool.invoke(mainTask);
 
        long endTime = System.currentTimeMillis();
 
        System.out.println("偶数的个数: " + evenNumberCount);
 
        long time = (endTime - startTime);
        System.out.println("执行时间: " + time + " ms");
    }
 
    static int[] randomArray() {
        int[] array = new int[SIZE];
        Random random = new Random();
 
        for (int i = 0; i < SIZE; i++) {
            array[i] = random.nextInt(100);
        }
 
        return array;
    }
 
}

该程序允许您使用不同的并行度和阈值轻松测试性能。注意,它在最后打印执行时间。尝试用不同的参数多次运行这个程序,并观察执行时间。

结论

  • Fork/Join框架的设计简化了java语言的并行程序
  • ForkJoinPoolFork/Join框架的核心,它允许多个ForkJoinTask请求由少量实际线程执行,每个线程运行在单独的处理核心上
  • 既可以通过构造器也可以通过静态方法common pool去获取ForkJoinPool的实例
  • ForkJoinTask是一个抽象类,它表示的任务比普通线程更轻。通过覆盖其compute()方法实现计算逻辑
  • RecursiveAction是一个没有返回值的ForkJoinTask
  • RecursiveTask是一个有返回值的ForkJoinTask
  • ForkJoinPool与其它池的不同之处在于,它使用了工作窃取算法,该算法允许一个线程完成了可以做的事情,从仍然繁忙的其他线程窃取任务
  • ForkJoinPool中的线程是守护线程,不必显式地关闭池
  • 执行一个ForkJoinTask既可以通过调用它自己的invoke()fork()方法,也可以提交任务给ForkJoinPool并调用它的invoke()或者execute()方法
  • 直接使用ForkJoinTask自身的方法执行任务,如果它还没运行在ForkJoinPool中那么将运行在common pool
  • ForkJoinTask中使用join()方法,可以合并子任务的结果
  • invoke()方法会等待子任务完成,但是execute()方法不会
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容