Fork/Join框架浅谈

什么是Fork/Join框架

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架,采用类似于分治算法,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。

Fork/Join框架

在这个框架中值得注意的一个重要概念是在理想状态下是没有空闲的工作线程。 它们实现了一种工作窃取算法,闲置的工作线程可以从忙碌的工作线程拿工作执行。

Fork/Join框架处理复杂的线程问题,你只需向框架指出哪些部分工作可以分解并递归处理。伪代码来自Doug Lea's的论文

Result solve(Problem problem) {
    if (problem is small)
        directly solve problem
    else {
        split problem into independent parts
        fork new subtasks to solve each part
        join all subtasks
        compose result from subresults
    }
}

Fork/Join框架的核心类

ForkJoinPool和ForkJoinTask是支持Fork/Join机制的核心类。

ForkJoinPool

ForkjoinPool是实现了ExecutorService和work-stealing(工作窃取)算法。如下所示,新建ForkJoinPool实例,指定并行等级(处理器的个数)。

ForkJoinPool pool = new ForkJoinPool(numberOfProcessors);
Where numberOfProcessors = Runtime.getRunTime().availableProcessors();

如果使用无参构造函数,默认创建pool的大小为上面所示的可用的处理器个数。尽管你可以指定任意大小的pool,但pool会动态调整大小来尝试获得足够的活动线程。与ExecutorService另一个重要的不同,pool不需要在程序退出时显式关闭,因为它的所有线程都处于守护进程模式。

三种提交任务到ForkJoinPool的方法:

  1. execute():期望异步执行,调用其fork方法在多个线程之间拆分工作。
  2. invoke():等待获得结果。
  3. submit():完成时返回一个future对象用于检查状态以及运行结果。
ForkJoinTask

ForkJoinTask是在ForkJoinPool创建工作的抽象类,RecursiveAction 和RecursiveTask是ForkJoinTask的直接子类,都要实现compute方法,两者唯一的不同点是:RecursiveAction没有返回任务的结果,而 RecursiveTask有返回任务的结果(可以自己指定类型的对象)。

ForkJoinTask类提供了几个方法用于检查任务运行的状态. 无论以什么方式结束任务,isDone() 方法返回true;如果完成任务过程中没有被取消或者发生异常,CompletedNormally() 方法返回true;如果任务被取消, isCancelled() 方法返回true;如果任务被取消或者遇到异常,isCompletedabnormally() 方法返回true。
异常处理代码如下:

if(task.isCompletedAbnormally())
{
    System.out.println(task.getException());
}

getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

Fork/Join框架和ExecutorService的区别

Fork/Join框架和ExecutorService最主要的区别是工作窃取算法。与Executor框架不同,当有线程完成了自己的所有子任务,而其他正在执行的线程(称为工作线程)还有子任务等待处理,就去其他线程的队列里窃取一个任务来执行。通过这种方式,线程可以充分利用其运行时间,从而提高应用程序的性能。

Fork/Join框架实践例子

在这个例子中,我们使用ForkJoinPool和ForkJoinTask提供的异步方法来管理任务。我们将实现遍历文件夹查找指定扩展名的文件,ForkJoinTask实现处理一个文件夹内的查找,如果存在子文件夹,为每一个文件夹fork一个新异步任务到ForkJoinPool中去,每个子任务会查找自己文件夹的指定扩展名的文件。一旦任务已经处理了所有的指定文件夹的内容,利用ForkJoinPool的join()方法等待完成所有任务。join方法是等待执行完成并返回compute()方法的计算结果。任务组的所有任务,都将自己的结果返回添加到结果列表中。

详细代码如下:

import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;

public class FolderProcessor extends RecursiveTask<List<String>> {
    private final String path;
    private final String extension;

    public FolderProcessor(String path,String extension) {
        this.extension = extension;
        this.path = path;
    }

    @Override
    protected List<String> compute() {
        List<String> list = new ArrayList<String>();
        List<FolderProcessor> tasks = new ArrayList<FolderProcessor>();
        File file = new File(path);
        File[] content= file.listFiles();
        if(content != null){
            for (File aContent : content) {
                if (aContent.isDirectory()) {
                    FolderProcessor task = 
              new FolderProcessor(aContent.getAbsolutePath(), extension);
                    task.fork();
                    tasks.add(task);
                } else {
                    if (checkFile(aContent.getName())) {
                        list.add(aContent.getAbsolutePath());
                    }
                }
            }
        }

        if (tasks.size() > 50)
        {
            System.out.printf("%s: %d tasks ran.\n", file.getAbsolutePath(), tasks.size());
        }
        addResultsFromTasks(list, tasks);
        return list;
    }

    private void addResultsFromTasks(List<String> list, List<FolderProcessor> tasks) {
        for (FolderProcessor item : tasks)
        {
            list.addAll(item.join());
        }
    }

    private boolean checkFile(String name) {
        return name.endsWith(extension);
    }

    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        FolderProcessor system = new FolderProcessor("/System", "log");
        FolderProcessor library = new FolderProcessor("/Library", "log");
        FolderProcessor users = new FolderProcessor("/Users", "log");
        pool.execute(system);
        pool.execute(library);
        pool.execute(users);
        do
        {
            System.out.printf("******************************************\n");
            System.out.printf("Main: Parallelism: %d\n", pool.getParallelism());
            System.out.printf("Main: Active Threads: %d\n", pool.getActiveThreadCount());
            System.out.printf("Main: Task Count: %d\n", pool.getQueuedTaskCount());
            System.out.printf("Main: Steal Count: %d\n", pool.getStealCount());
            System.out.printf("******************************************\n");
            try
            {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e)
            {
                e.printStackTrace();
            }
        } while ((!system.isDone()) || (!library.isDone()) || (!users.isDone()));
        pool.shutdown();
        List<String> results;
        results = system.join();
        System.out.printf("System: %d files found.\n", results.size());
        results = library.join();
        System.out.printf("Library: %d files found.\n", results.size());
        results = users.join();
        System.out.printf("Users: %d files found.\n", results.size());
    }
}

结果输出类似如下:

******************************************
Main: Parallelism: 8
Main: Active Threads: 60
Main: Task Count: 62370
Main: Steal Count: 81261
******************************************
******************************************
Main: Parallelism: 8
Main: Active Threads: 0
Main: Task Count: 19295
Main: Steal Count: 160629
******************************************

JDK中的使用实现

Java SE中有一些通用的功能,它们已经使用fork/join框架来实现。
1.在Java 8的java.util.Arrays中的parallelSort方法采用了fork/join框架,在多处理器系统上,并行排序大量数据比顺序排序更快
2.在Stream.parallel()中使用并行,更多请参考parallel stream operation in java 8

总结:

设计优秀的多线程算法是非常困难的,fork/join框架并不适用于所有情况,但是在它的适用范围之内,能够轻松的利用多个CPU提供的计算资源来协作完成一个复杂的计算任务。最终还是看你的问题是否符合框架特性,若不符合,你可以使用基于java.util.concurrent包基础工具方法实现自己的解决方案。

参考

  1. Fork/Join Framework Tutorial: ForkJoinPool Example
  2. 聊聊并发(八)——Fork/Join框架介绍
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,332评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,508评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,812评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,607评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,728评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,919评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,071评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,802评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,256评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,576评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,712评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,389评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,032评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,798评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,026评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,473评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,606评论 2 350

推荐阅读更多精彩内容

  • 作者: 一字马胡 转载标志 【2017-11-01】 更新日志 日期更新内容备注2017-11-01新建文章V1...
    一字马胡阅读 7,323评论 9 134
  • 摘要 这篇论文描述了Fork/Join框架的设计、实现以及性能。这个框架通过(递归的)把问题划分为子任务,然后并行...
    itonyli阅读 1,157评论 0 5
  • 一、多线程 说明下线程的状态 java中的线程一共有 5 种状态。 NEW:这种情况指的是,通过 New 关键字创...
    Java旅行者阅读 4,665评论 0 44
  • 秋天我想起了关羽 风扬起麦子 有三粒 落在盆地 漂在湖面 飞向雪山 秋天, 麦城还有马尾 扫着苍蝇 秋天,我想起了...
    西红柿tomato阅读 212评论 0 1
  • 用时:1h 总评: 如果你看过其他的时间管理的书籍,那么本书很值得你快速的看一看,里面提到的时间模式化、格式化的观...
    tuionf阅读 840评论 0 2