ForkJoinPool线程池—独门专访

1. 概念

什么时候使用ForkJoinPool线程池

适合能够进行拆分再拆分的计算型(CPU密集型)任务。服务器拥有多CPU,多核,用以提高计算能力。

单核、单CPU,不建议使用该线程池,会带来额外的性能开销,反而比单线程的执行效率低。

为什么不使用ThreadPoolExecutor线程池

ForkJoinPool的继承结构

cpu个数、核数、线程数、java多线程之间的联系

ForkJoinPool可以看做是一个并行线程池,即多个线程(线程数一般等于CPU核数),并行的处理一个大任务。

通常来说,ThreadPoolExecutor中每个任务都由单个的线程独立处理。但是若是一个非常耗时的大任务(比如大数组排序),就可能出现线程池中只有一个线程在处理这个大任务。而其他线程却空闲着。最主要的是,若是多核服务器,只有一核在疯狂计算,其他处理器(CPU)无法援助繁忙的处理器(CPU)。

若是把这个大任务拆分成几个小任务,交由多个ThreadPoolExecutor的线程去处理,但是对于一个有明显父子关系的任务(比如大数组排序)来说,只有当它所有子任务执行完毕之后,它才能被执行。

使用ThreadPoolExecutor无法选择优先执行子任务。同时也无法开启太多的线程去处理子任务。

为什么ForkJoinPool适合CPU密集型任务,I/O密集型不可以吗?

通常来说,若是线程I/O阻塞,CPU会抛弃这个线程。而ForkJoinPool的宗旨是使用少量的线程来处理大量的任务。若是I/O密集型任务,该任务执行时间之缓慢便可想而知。

而CPU密集型任务,当一个大任务分解成多个子任务后,多个线程获取到多个处理器的时间分片,可以并行的执行子任务。

那说到底,什么是ForkJoinPool线程池?

该线程池的核心思想:分治法工作窃取模式

分治法:就是将一个任务,切分成多个父子关系的子任务。
工作窃取:若线程空闲,将窃取其他线程的任务。即多个线程并行执行任务。

来一个小故事:

若接到一个紧急大需求,在公司有4个人的情况下,不可能将任务只交由一个员工做,而其他三个员工处于空闲状态。并且不能随意中断他们,应该让他们几个专心的处理这件大任务。

当效率高的员工执行完手头的任务后,不应该闲着,应该帮忙去别的同事那领取一些任务。为了避免两个员工同时开发一个功能,公司规定,员工正常是在工作队列的尾部获取任务,而援助的员工,在工作队列的头部获取任务。

工作窃取的理论

ForkJoinPool工作原理.png
  1. ForkJoinPool 的每个工作线程(ForkJoinWorkerThread下文简称worker都维护着一个工作队列(WorkQueue),这是一个双端队列(Deque),里面存放的对象是任务(ForkJoinTask)。

  2. 每个工作线程在运行中产生新的任务(通常是因为调用了fork())时,会放入工作队列的队尾,并且会在队尾取出任务。worker在处理自己的工作队列时,使用的是LIFO方式)。

  3. 每个工作线程在处理自己的工作队列同时,会尝试窃取 steal一个任务(或是来自于刚刚提交到 pool 的任务,或是来自于其他工作线程的工作队列),窃取的任务位于其他线程的工作队列的队首(也就是说工作线程在窃取其他工作线程的任务时,使用的是 FIFO方式)

  4. 在遇到join()时,如果需要 join 的任务尚未完成,则会先处理其他任务,直到目标的任务方法被告知已经结束(通过isDone()方法),所有的任务都是无阻塞的完成。

在既没有自己的任务,也没有可以窃取的任务时,进入休眠。

2. 实践

ForkJoinPool线程池的创建

在JDK1.8中,推荐使用下面方法创建线程池,可以满足大多数场景。

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

但是我们通过静态方法创建的线程池,它内部的参数是怎样的?

commonPool源码解析:

static{
    //...
    common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
    //...
}

private static ForkJoinPool makeCommonPool() {
    //...
    return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
                            "ForkJoinPool.commonPool-worker-");
}

本质上是通过makeCommonPool()方法创建的线程池。

最终调用

    private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode;
        long np = (long)(-parallelism); // offset ctl counts
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }
  • parallelism:(相似度[ˈpærəlelɪzəm])默认为CPU核心数Runtime.getRuntime().availableProcessors(),ForkJoinPool里的线程数量依赖于它,但是它并不代表最大线程数。不要等同于ThreadPoolExecutor中的corePoolSize或者maximumPoolSize。

  • ForkJoinWorkerThreadFactory:线程工厂,默认实现 DefaultForkJoinWorkerThreadFactory。

  • workerNamePrefix:线程池创建线程的前缀,默认使用“ForkJoinPool-*”

  • config:保存了不变的参数,包括了parallelism和mode,供后续读取。mode可取FIFO_QUEUE(先进先出队列)和LIFO_QUEUE(后进先出队列)。默认LIFO_QUEUE。

ForkJoinPool创建子任务

子任务由ForkJoinTask的实例来代表,它是一个抽象类,JDK为我们提供了两个实现:RecursiveTaskRecursiveAction [递归的] [rɪˈkɜːsɪv],分别用于需要和不需要返回计算结果的子任务。

/**
 * @description: 计算从1加到100
 * @create: 2019-08-29 14:10
 */
@Slf4j
public class CountTask extends RecursiveTask<Integer> {

    //拆分任务的阈值
    private static final int threshold = 2;
    private int start;
    private int end;

    public CountTask(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;
        //任务足够小,停止拆分,直接开始计算
        if (end - start < threshold) {
            for (int i = start; i < end + 1; i++) {
                //开始执行业务逻辑,累加
                sum = sum + i;
            }
        } else {
            //如果任务大于阈值,就继续拆分
            int middle = (start + end) / 2;
            CountTask left = new CountTask(start, middle);
            CountTask right = new CountTask(middle + 1, end);
            //执行子任务
            left.fork();
            right.fork();
            //若是不要求得到执行结果,可以无后续的join方法
            //等待子任务执行完毕,并得到子任务结果
            Integer leftResult = left.join();
            Integer rightResult = right.join();
            //子方法运行完毕后,父方法开始汇总
            sum = leftResult + rightResult;
        }
        return sum;
    }
}

ForkJoinPool提交任务

ForkJoinPool和ThreadPoolExecutor相似,也是提供了三类方法来调度子线程。

方法 描述
execute系列 只提交任务
invoke和invokeAll 提交并返回结果
submit系列 提交任务并返回任务

fork方法

源码

    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
  1. 任务被fork()分割之后,判断当前线程是否是Worker线程,如果是,则将该任务放到自己的双端队列的尾部。
  2. 否则的话,会随机提交任务线程池中的某个Worker队列中。

join方法

源码:

    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }
  1. 检查调用 join() 的线程是否是 ForkJoinThread 线程。如果不是(例如 main 线程),则阻塞当前线程,等待任务完成。如果是,则不阻塞。
  2. 查看任务的完成状态,如果已经完成,直接返回结果。
  3. 如果任务尚未完成,但处于自己的工作队列内,则调用doExec()完成它。
  4. 如果任务不再当前队列的top位置(已经被其他的工作线程偷走)调用wt.pool.awaitJoin(w, this, 0L),窃取这个小偷的工作队列内的任务(以 FIFO 方式)执行,以期帮助它早日完成预 join 的任务。
  5. 如果偷走任务的小偷也已经把自己的任务全部做完,正在等待需要 Join 的任务时,则找到小偷的小偷,帮助它完成它的任务。
  6. 递归地执行第 5 步。

fork和join的调用顺序

官方文档建议的ForkJoinPool的调用顺序

a.fork(); 
b.fork(); 
b.join(); 
a.join();

因为任务b是后面进入的队列,并且worker的调用顺序是LIFO(后进先出),于是可以在fork()后直接调用join()就可以直接执行而不会调用ForkJoinPool.awaitJoin 方法去等待。

优化1:避免不必要的fork

原理:fork()的逻辑是:将子任务提交到工作队列,线程又从工作队列中获取。

  • 而invokeAll()实际上,将N-1个任务提交到工作队列,然后留下一个交由线程直接执行。多个任务执行的时候推荐使用invokeAll()
            int middle = (start + end) / 2;
            CountTask left = new CountTask(start, middle);
            CountTask right = new CountTask(middle + 1, end);
            //等待子任务执行完毕,并得到子任务结果
            invokeAll(left,right);
            Integer leftResult = left.join();
            Integer rightResult = right.join();
            sum = leftResult + rightResult;
  • 若分割为两个子任务时,亦可直接调用compute()方法。这样的话,避免了工作线程刚放进队列,立马去队列中获取的尴尬处境。
//工作线程将任务放到队列中。(这里其实是让其他线程 偷取 任务,若无线程偷取,则工作线程计算完左边任务,可继续执行右边任务)
right.fork(); 
//工作线程执行左边的结果。
long leftAns = left.compute();
//工作线程等待右边任务的结果。 
long rightAns = right.join(); 
return leftAns + rightAns;

join()和get()的区别

API文档描述

ForkJoinTaskinvoke()、join()方法及其衍生方法中都没有像 get() 方法那样抛出 ExecutionException 的受检异常。

ForkJoinTask 中把受检异常转换成了运行时异常

static void rethrow(Throwable ex) {
    if (ex != null)
        ForkJoinTask.<RuntimeException>uncheckedThrow(ex);
}

@SuppressWarnings("unchecked")
static <T extends Throwable> void uncheckedThrow(Throwable t) throws T {
    throw (T)t; // rely on vacuous cast
}
  • invoke()、join() 仍可能会抛出运行时异常,所以 ForkJoinTask 还提供了两个不提取结果和异常的方法:
    • quietlyInvoke() (注:执行此任务并等待其完成(如有必要),而不返回其结果或抛出异常。)
    • quietlyJoin()(注:加入此任务,而不返回其结果或抛出异常。 当某些已被取消或以其他方式已知中止时,处理任务集合时,此方法可能是有用的。)

注1:使用 quitelyInvoke() 和 quietlyJoin() 时可以配合 isCompletedAbnormally() 和 isCompletedNormally() 方法使用。

注2:quietly 平静的 [快特里]

案例:

        ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
        ForkJoinTask<Integer> task = forkJoinPool.submit(new CountTask(0, 100));
        //关闭线程池
        forkJoinPool.shutdown();
        if(task.isCompletedAbnormally()){
            System.out.println("异常"+task.getException());
        }

ForkJoinPool阻塞

向ForkJoinPool提交了一堆任务之后,我们会希望等待所有任务执行完成后,继续下一步操作。ForkJoinPool提供了两个阻塞的await方法。

  • awaitQuiescence:等待线程池静止;
  • awaitTermination:等待线程池终止;

推荐阅读

分析jdk-1.8-ForkJoinPool实现原理(上)

分析jdk-1.8-ForkJoinPool实现原理(下)

ForkJoinPool入门篇

工作窃取算法

【小家java】Java线程池之---ForkJoinPool线程池的使用以及原理

【Java 并发笔记】Fork/Join 框架相关整理(上)

【Java 并发笔记】Fork/Join 框架相关整理(下)

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容