并发(行)之Fork/Join

严格来说,Fork/join是并行而非并发的。之所以放到并发这块,是因为并发和并行大部分情况下是不需要程序员去关心的。大牛Linus Torvalds更是曾经讳言:

推崇并行只不过是浪费大家的时间,“并行更高效”这种理论纯属胡说八道。大容量缓存是高效的,如果缺少缓存,并行一些低等级微内核可以说是毫无意义,除下特定类型上大规模规则计算,比如图形处理。
没有人会回到过去,那些复杂的乱序运行内核不会消失。扩展不可能无休止的进行,人们需求更多的移动性,那些叫嚣扩展到上千核心的论调纯属扯淡,无需理会。
是有多么奇葩的思维才能幻想出这些神奇等等并行算法的用武之地?!
对于并行来说,唯一的用武之地就是图形计算和服务器端,而并行计算在这些领域确实也得到了大量的应用。但是没有任何疑问,并行在其他领域毫无用武之地。
所以,忘掉并行吧,它永远都不可能被大规模推广。
...
放弃吧。“并行就是未来”的说法纯属胡说八道。

我们还能说什么呢?深以为然。
回到主题,Fork/join采用了分治思想,其实分治对我们来说很熟悉了,大学算法课程总会接触到,比如分治排序:


分治

基于这个思想,我们先看下Fork/join的几个重要类。

  • ForkJoinPool :An ExecutorService for running ForkJoinTasks.执行ForkJoinTasks的线程池。整个框架的主导者,它负责创建WorkQueueForkJoinWorkerThread,并进行任务分配。
    构造方法源码:
private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;//任务名称
        this.factory = factory;//ForkJoinWorkerThread的创建工厂,所以可以自己实现?
        this.ueh = handler;//异常处理器
        //后面三个都是记录线程数和控制队列边界等信息的变量
        this.config = (parallelism & SMASK) | mode;//并发数
        long np = (long)(-parallelism); 
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

另外还有个比较重要的变量runState: 用来标记当前线程池的运行状态,使用二进制表示。这就是值得我们学习借鉴的地方

  • WorkQueue: ForkJoinPool的内部类,也就是存储任务的双端队列。
WorkQueue(ForkJoinPool pool, ForkJoinWorkerThread owner) {
            this.pool = pool;//当前队列属于哪个pool
            this.owner = owner;//负责的WorkerThread是哪个,如果当前队列是共享的,设置为null
            // 将索引放在队列数组的中间。INITIAL_QUEUE_CAPACITY 默认8K
            base = top = INITIAL_QUEUE_CAPACITY >>> 1;
        }
  • ForkJoinWorkerThread: 执行任务的线程。它从负责的WorkQueue的top取出执行的ForkJoinTask进行处理(LIFO)。自己的队列empty的时候,会进行work stealing(任务窃取),去别的线程的地盘的base端偷取任务或者去shared queue去拿任务。目的是为了更高效地利用资源。
protected ForkJoinWorkerThread(ForkJoinPool pool) {
        // Use a placeholder until a useful name can be set in registerWorker
        super("aForkJoinWorkerThread");
        this.pool = pool;//所属的线程池
        //维护的任务队列,可以看到,每产生一个线程,就会有一个任务队列。
        this.workQueue = pool.registerWorker(this);
    }
  • ForkJoinTask: 存储在WorkQueue里需要执行的任务。它有两个子类:RecursiveTaskRecursiveAction。区别在于Task任务是有返回值,Action则没有返回值。自定义任务时我们只需要继承二者其一,重写compute()方法即可。
    我们用图来表示他们之间的关系:
    Fork/Join

还有重要的几个方法,我们用示例来说明吧,如下为求和示例:

public class ForkJoinTest {
    private static class SumTask extends RecursiveTask<Integer> {
        static final int THRESHOLD = 10;
        Integer[] array;
        int start;
        int end;
        SumTask(Integer[] array, int start, int end) {
            this.array = array;
            this.start = start;
            this.end = end;
        }
        @Override
        protected Integer compute() {
            if (end - start <= THRESHOLD) {
                // 原子任务,直接结算结果
                Integer sum = 0;
                for (int i = start; i < end; i++) {
                    sum += array[i];
                }
                System.out.println(String.format(Thread.currentThread().getName()+ "-->计算 %d~%d = %d", start, end, sum));
                return sum;
            }
            //还需要再分
            int middle = (end + start) / 2;
            System.out.println(String.format(Thread.currentThread().getName()+ " -->拆分 %d~%d --> %d~%d, %d~%d", start, end, start, middle, middle, end));
            SumTask left = new SumTask(this.array, start, middle);
            SumTask right = new SumTask(this.array, middle, end);
            left.fork();
            right.fork();
            //invokeAll(subtask1, subtask2);//事实上不用分别fork(),这种才是正确的写法
            Integer subresult1 = left.join();
            Integer subresult2 = right.join();
            Integer result = subresult1 + subresult2;
            return result;
        }
    }
    public static void fillArray(Integer[] array) {
        Random random = new Random();
        for (int i = 0; i < array.length; i++) {
            array[i] = i + 1;
        }
    }
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Integer[] array = new Integer[100];
        fillArray(array);
        ForkJoinPool fjp = new ForkJoinPool(4); // 最大并发(行)数4
        ForkJoinTask<Integer> task = new SumTask(array, 0, array.length);
        long startTime = System.currentTimeMillis();
        Integer result = fjp.invoke(task);
        long endTime = System.currentTimeMillis();
        System.out.println("最终结果: " + result + " 花费时间:" + (endTime - startTime) + " ms.");
        fjp.shutdown();
    }
}

结果如下:

结果

总共产生了3个线程执行这个任务,用了32ms,根据结果可以看出,fork()的时候,会先去找空闲线程(空闲WorkQueue),没有的话才会创建线程。

我们看看程序中几个重要的方法:
1、fork()

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            //如果当前线程是ForkJoinWorkerThread,则把任务加入到自身的workQueue中
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);//否则,重新创建一个WorkQueue并push进去
        return this;
    }

2、join()

public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();//返回结果
    }
private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) ://等待任务完成
            externalAwaitDone();
    }

3、invoke()

public <T> T invoke(ForkJoinTask<T> task) {
        if (task == null)
            throw new NullPointerException();
        //和fork()的else调用同样的方法,事实上看源码,
       //这个方法出现的频率很高,作用是给当前线程创建对应的WorkQueue
        externalPush(task);
        return task.join();//实际上也是调用了join()方法。
    }

4、invokeAll()

//可以看出,该方法会给把任务也分配给当前线程一份。不至于当前线程等待,造成资源浪费
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
        int s1, s2;
        t2.fork();//创建新的线程开执行任务
        if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL)
            t1.reportException(s1);
        if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL)
            t2.reportException(s2);
    }
//t1调用此方法,给当前线程的WorkQueue添加该任务。
private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }

以上。


祈祷

希望还有几天就到预产期的我媳妇和尚未出世的儿子都能健健康康。

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

  • 在这篇文章中,将覆盖如下内容: 什么是Fork/Join框架 工作窃取算法 Fork/Join框架的设计 Recu...
    打铁大师阅读 4,039评论 0 3
  • 一、前言 Fork/Join框架是Java 1.7之后引入的基于分治算法的并行框架,官网文档是这么介绍的: For...
    骑着乌龟去看海阅读 5,869评论 1 4
  • 什么是Fork/Join框架 Fork/Join框架是一组允许程序员利用多核处理器支持的并行执行的API。它使用了...
    Ggx的代码之旅阅读 8,368评论 0 12
  • 年少轻狂的我们,从校园里走出来,考上工作岗位,都以为可以在社会上混的如鱼得水,处理好所有错综复杂的社会关系,一如当...
    甫禾阅读 7,921评论 0 0
  • 去年情人节,二宝迫不及待在过年前出来跟我们见面了,一位小千金,第二次当妈,按说起来应该是驾轻就熟,但实际中还...
    桦锜姐妹花阅读 1,760评论 2 2

友情链接更多精彩内容