[JAVA][Fork/Join实现并行编程]

1.落笔缘由

由于之前希望对Java异步操作进行一次梳理,碰巧看到了Fork/Join,之前并没有了解过,所以借这次机会来了解一下它的用途。

2.Fork/Join作用

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork/Join框架是ExecutorService接口的一种具体实现,目的是为了帮助你更好地利用多处理器带来的好处。它是为那些能够被递归地拆解成子任务的工作类型量身设计的。其目的在于能够使用所有可用的运算能力来提升你的应用的性能。
和ExecutorService接口的其他实现一样(其实我正是在整理ExecutorService的SingleThreadExecutor等子类的时候,看到了WorkStealingPool,才进而接触到Fork/Join框架),Fork/Join会将任务分发给线程池中的工作线程。Fork/Join使用工作窃取(work-stealing)算法。那么什么是工作窃取方法?简单来说就是某个线程从其他队列里窃取任务来执行。我们知道Fork/Join的作用就是将一个大任务分为若干小任务,最后将这些小任务的执行结果整合起来得到大任务的结果。而Fork/Join把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务。那么这里就会出现一种情况,有些线程的任务队列里的任务已经完成,但其他线程的队列还有任务没完成,这样就造成已完成任务线程闲置,这也太浪费了吧,所以为了提高效率,完成自己的任务而处于空闲的工作线程能够从其他仍然处于忙碌(busy)状态的工作线程处窃取等待执行的任务。为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join框架的核心是ForkJoinPool
类:

//ForkJoinPool继承抽象类AbstractExecutorService
ForkJoinPool extends AbstractExecutorService
//而AbstractExecutorService实现的是ExecutorService接口
AbstractExecutorService implements ExecutorService

//创建ForkJoinPool对象:
//使用Runtime.availableProcessors()获取的数值作为并行级别;使用默认的default thread factory;UncaughtExceptionHandler为空;非异步LIFO模式。
public ForkJoinPool()
//使用指定数值(parallelism)作为并行级别;使用默认的default thread factory;UncaughtExceptionHandler为空;非异步LIFO模式。
public ForkJoinPool(int parallelism)

//通过Executors创建ForkJoinPool对象
ForkJoinPool forkJoinPool = (ForkJoinPool) Executors.newWorkStealingPool();

ForkJoinPool实现了工作偷取算法,通过执行ForkJoinTask任务来实现。

3.执行过程

Fork/Join的执行过程主要有两步:
第一步分割任务。通过ForkJoinTask对象,调用fork()方法把大任务分割成子任务。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据,而子任务执行的结果是通过ForkJoinTask的join()方法获取的。
下面说一下ForkJoinTask,ForkJoinTask有点像Thread的Runable,都是用来定义要执行的任务的。
ForkJoinTask有两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
具体可以看一下oracle的Fork and Join: Java Can Excel at Painless Parallel Programming Too!
这篇文章,里面有介绍使用Fork/Join框架的例子(是关于计算文档中的单词出现次数),下面这个类就是这个例子中的,他继承了RecursiveTask ,返回一个Long型的结果。

public class FolderSearchTask extends RecursiveTask<Long> {
    private final Folder folder;
    private final String searchedWord;
    
    FolderSearchTask(Folder folder, String searchedWord) {
        super();
        this.folder = folder;
        this.searchedWord = searchedWord;
    }
    
    @Override
    protected Long compute() {
        long count = 0L;
        List<RecursiveTask<Long>> forks = new LinkedList<>();
        for (Folder subFolder : folder.getSubFolders()) {
            FolderSearchTask task = new FolderSearchTask(subFolder, searchedWord);
            forks.add(task);
            task.fork();
        }
        for (Document document : folder.getDocuments()) {
            DocumentSearchTask task = new DocumentSearchTask(document, searchedWord);
            forks.add(task);
            task.fork();
        }
        for (RecursiveTask<Long> task : forks) {
            count = count + task.join();
        }
        return count;
    }
}

4.Fork/Join的基本用法

在Oracle的文档里有关于Fork/Join的用法。基本思维就是给出是否需要将当前任务分成小任务的条件。

if (当前这个任务工作量足够小)
    直接完成这个任务
else
    将这个任务分解成两个部分
    分别触发(invoke)这两个子任务的执行,并等待结果

这个操作是发生在ForkJoinTask里的compute()方法里,不管是继承RecursiveAction还是RecursiveTask ,都要重写compute()。

5.例子解析

1)定义任务

下面是一个关于计算斐波那契数列的例子,由于我吗要计算指定长度斐波那契数列的和,所以我们的任务是需要有返回值的,所以继承RecursiveTask,定义返回值是Long型。而在compute()方法里,就是当斐波那契数列数字的个数小于10就直接返回这10个值的和,而大于10的时候,就将这个任务分成两个小任务。

public class FibonacciTask extends RecursiveTask<Long>
{
    private static final long serialVersionUID = 1L;
    private List<Long> mList = null;
    private int size = 0;
    
    public FibonacciTask(List<Long> list, int size)
    {
        mList = list;
        this.size = size;
        System.out.println("num:"+(++Num.num));
    }
    
    @Override
    protected Long compute()
    {

        if (mList!=null)
        {
            if (mList.size()<10)
            {
                return cal(mList);
            }else
            {
                List<List<Long>> lists = averageAssign(mList,2);
                FibonacciTask fibonacciTask1 = new FibonacciTask(lists.get(0), 5);
                FibonacciTask fibonacciTask2 = new FibonacciTask(lists.get(1), 5);
                fibonacciTask1.fork();
                fibonacciTask2.fork();
                System.out.println("list0:"+lists.get(0).size()+" list1:"+lists.get(1).size());
                return fibonacciTask1.join()+fibonacciTask2.join();
            }
        }
        return null;
    }

    private long cal(List<Long> list)
    {
        long total = 0;
        for (int i = 0; i < list.size(); i++)
        {
            total = list.get(i)+total;
        }
        return total;
    }
    
    /** 
     * 将一个list均分成n个list,主要通过偏移量来实现的 
     * @param source 
     * @return 
     */  
    public static <T> List<List<T>> averageAssign(List<T> source,int n){  
        List<List<T>> result=new ArrayList<List<T>>();  
        int remaider=source.size()%n;  //(先计算出余数)  
        int number=source.size()/n;  //然后是商  
        int offset=0;//偏移量  
        for(int i=0;i<n;i++){  
            List<T> value=null;  
            if(remaider>0){  
                value=source.subList(i*number+offset, (i+1)*number+offset+1);  
                remaider--;  
                offset++;  
            }else{  
                value=source.subList(i*number+offset, (i+1)*number+offset);  
            }  
            result.add(value);  
        }  
        return result;  
    } 
}

2)执行任务

在执行任务之前,我们先生成通过下面的createFibonacci方法生成一个斐波那契数列。然后通过ForkJoinPool 对象调用submit方法执行FibonacciTask 任务。

/**
 * @author LGY
 * @time 2017-4-8
 * @action 
 */
public class TestFibonacci
{

    public static void main(String[] args)
    {
        long startTime;
        long stopTime;
        long singleThreadTimes;
        FibonacciTask fibonacciTask = new FibonacciTask(createFibonacci(0, 1, 100), 5);
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        startTime = System.currentTimeMillis();
        Future<Long> future = forkJoinPool.submit(fibonacciTask);
        stopTime = System.currentTimeMillis();
        singleThreadTimes = (stopTime - startTime);
        System.out.println(  " fork / join search took "
                + singleThreadTimes + "ms");
        try
        {
            System.out.println("result:"+future.get());
        } catch (Exception e)
        {
            e.printStackTrace();
        }

    }
    
    
    /**
     * @author LGY
     * @action 生成指定长度斐波那契数列
     * @time 2017-4-9
     * @param first 第一个值
     * @param secend 第一个值
     * @param size 斐波那契数列长度
     * @return
     */
    private static List<Long> createFibonacci(long first , long secend , int size)
    {
        List<Long> list = new ArrayList<Long>();
        long total = 0;
        long startTime;
        long stopTime;
        long singleThreadTimes;
        if (size == 1)
        {
            list.add(first);
        }else if (size == 2)
        {
            list.add(first);
            list.add(secend);
        }else if (size>2) {
            list.add(first);
            list.add(secend);
            for (int i = 0; i < size-2; i++)
            {
                list.add(list.get(i)+list.get(i+1));
            }
        }
        System.out.print("[");
        startTime = System.currentTimeMillis();
        for (int i = 0; i < list.size(); i++)
        {
            System.out.print(list.get(i)+" ");
            total = list.get(i)+total;
        }
        System.out.print("]"+"result:"+total);
        stopTime = System.currentTimeMillis();
        singleThreadTimes = (stopTime - startTime);
        System.out.println();
        System.out.println(  " single thread search took "
                + singleThreadTimes + "ms");

        return list;
    }
}

6.总结

关于Fork/Join的Fork and Join: Java Can Excel at Painless Parallel Programming Too!一文的例子源码可以到http://www.oracle.com/technetwork/articles/java/forkjoinsources-430155.zip下载,也可以在本文文章的源码地址处获取。
在这个源码的WordCounter类中,需要传三个参数给main方法,右击WordCounter类,点击Run As-->Run Configurations,选择页卡Arguments
输入三个参数,第一个是文档地址,第二个是要计算出现了多少次的单词,最后一个是要重复执行任务的次数

设置参数

指定文件路径下的文档

7.参考文章

http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
http://www.oracle.com/technetwork/articles/java/fork-join-422606.html
Fork and Join: Java Can Excel at Painless Parallel Programming Too!译文资料

8.源码地址

http://download.csdn.net/detail/lgywsdy/9808408

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容