线程池-工作单元ForkJoinTask

1.官方文档

Abstract base class for tasks that run within a ForkJoinPool. A 
ForkJoinTask is a thread-like entity that is much lighter weight than a 
normal thread. Huge numbers of tasks and subtasks may be hosted 
by a small number of actual threads in a ForkJoinPool, at the price of 
some usage limitations.

A "main" ForkJoinTask begins execution when it is explicitly submitted 
to a ForkJoinPool, or, if not already engaged in a ForkJoin 
computation, commenced in the ForkJoinPool.commonPool() via 
fork(), invoke(), or related methods. Once started, it will usually in turn 
start other subtasks. As indicated by the name of this class, many 
programs using ForkJoinTask employ only methods fork() and join(), 
or derivatives such as invokeAll. However, this class also provides a 
number of other methods that can come into play in advanced 
usages, as well as extension mechanics that allow support of new 
forms of fork/join processing.

在ForkJoinPool中运行的任务的抽象基类。ForkJoinTask是一个类线程的实体,比普通线程轻得多。量的任务和子任务可能由ForkJoinPool中的少量实际线程管理,代价是一些使用限制。

一个“main” ForkJoinTask在显式提交给ForkJoinPool时开始执行,或者,如果尚未参与ForkJoin计算,则通过fork()、invoke()或相关方法在ForkJoinPool.commonPool()中开始执行。一旦启动,它通常会依次启动其他子任务。正如此类的名称所示,许多使用ForkJoinTask的程序仅使用方法fork()和join(),或者使用invokeAll等派生类。但是,这个类还提供了许多其他方法,可以在高级用法中发挥作用,以及允许支持新形式的fork / join处理的扩展机制。

A ForkJoinTask is a lightweight form of Future. The efficiency of 
ForkJoinTasks stems from a set of restrictions (that are only partially 
statically enforceable) reflecting their main use as computational 
tasks calculating pure functions or operating on purely isolated 
objects. The primary coordination mechanisms are fork(), that 
arranges asynchronous execution, and join(), that doesn't proceed 
until the task's result has been computed. Computations should 
ideally avoid synchronized methods or blocks, and should minimize 
other blocking synchronization apart from joining other tasks or using
 synchronizers such as Phasers that are advertised to cooperate with 
fork/join scheduling. Subdividable tasks should also not perform 
blocking I/O, and should ideally access variables that are completely 
independent of those accessed by other running tasks. These 
guidelines are loosely enforced by not permitting checked exceptions
 such as IOExceptions to be thrown. However, computations may still 
encounter unchecked exceptions, that are rethrown to callers 
attempting to join them. These exceptions may additionally include 
RejectedExecutionException stemming from internal resource 
exhaustion, such as failure to allocate internal task queues. Rethrown 
exceptions behave in the same way as regular exceptions, but, when 
possible, contain stack traces (as displayed for example using 
ex.printStackTrace()) of both the thread that initiated the computation 
as well as the thread actually encountering the exception; minimally 
only the latter.

ForkJoinTask是Future的轻量级形式。 ForkJoinTasks的效率源于一系列限制(仅部分静态可执行),反映了它们主要用于计算纯函数的计算任务或在纯孤立对象上操作。主要的协调机制是fork(),它安排异步执行、join(),在计算完成任务结果之前不会继续。理想情况下,计算应该避免同步的方法或块,并且应该最小化其他阻塞同步,除了joining其他任务或使用同步器:例如Phasers,配合fork / join调度。子分类任务也不应该执行阻塞I/O,理想情况下访问变量应该完全独立于其他正在运行的任务。松散地强制执行这些准则:不允许抛出IOExceptions之类的受查异常。但是,计算可能仍会遇到未经检查的异常,这些异常会被尝试join它们的调用者重新抛出。这些异常可能还包括源于内部资源耗尽的RejectedExecutionException,例如无法分配内部任务队列。 Rethrown异常的行为方式与常规异常相同,但是,如果可能,包含启动计算的线程以及实际遇到异常的线程的堆栈跟踪(例如使用ex.printStackTrace()显示);至少要包含后者。

It is possible to define and use ForkJoinTasks that may block, but 
doing do requires three further considerations: (1) Completion of few 
if any other tasks should be dependent on a task that blocks on 
external synchronization or I/O. Event-style async tasks that are 
never joined (for example, those subclassing CountedCompleter) 
often fall into this category. (2) To minimize resource impact, tasks 
should be small; ideally performing only the (possibly) blocking action. 
(3) Unless the ForkJoinPool.ManagedBlocker API is used, or the 
number of possibly blocked tasks is known to be less than the pool's 
ForkJoinPool.getParallelism() level, the pool cannot guarantee that 
enough threads will be available to ensure progress or good 
performance.

The primary method for awaiting completion and extracting results of 
a task is join(), but there are several variants: The Future.get() 
methods support interruptible and/or timed waits for completion and 
report results using Future conventions. Method invoke() is 
semantically equivalent to fork(); join() but always attempts to begin 
execution in the current thread. The "quiet" forms of these methods 
do not extract results or report exceptions. These may be useful when 
a set of tasks are being executed, and you need to delay processing 
of results or exceptions until all complete. Method invokeAll (available 
in multiple versions) performs the most common form of parallel 
invocation: forking a set of tasks and joining them all.

可以定义和使用可能阻塞的ForkJoinTasks,但这样做需要进一步考虑:(1)如果任何其他任务应该依赖于阻止外部同步或I / O的任务,则完成很少。从未join的事件类型异步任务(例如,那些子类化CountedCompleter)通常属于此类别。 (2)为了尽量减少资源影响,任务应该很小;理想情况下只执行(可能)阻塞action。 (3)除非使用ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的ForkJoinPool.getParallelism()级别,否则池无法保证有足够的线程可用于确保进度或良好性能。

等待完成和提取任务结果的主要方法是join(),但有几种变体:Future.get()方法支持使用Future约定完成和报告结果的可中断和/或定时等待。方法invoke()在语义上等同于fork(); 但join()总是尝试在当前线程中开始执行。这些方法的“quiet”形式不会提取结果或报告异常。这可能很有用:在执行一组任务时,需要延迟处理结果或异常,直到完成所有任务。方法invokeAll(在多个版本中可用)执行最常见的并行调用形式:forking一组任务并将它们全部join起来。

In the most typical usages, a fork-join pair act like a call (fork) and 
return (join) from a parallel recursive function. As is the case with 
other forms of recursive calls, returns (joins) should be performed 
innermost-first. For example, a.fork(); b.fork(); b.join(); a.join(); is likely 
to be substantially more efficient than joining a before b.

The execution status of tasks may be queried at several levels of 
detail: isDone() is true if a task completed in any way (including the 
case where a task was cancelled without executing); 
isCompletedNormally() is true if a task completed without cancellation
 or encountering an exception; isCancelled() is true if the task was 
cancelled (in which case getException() returns a 
CancellationException); and isCompletedAbnormally() is true if a task 
was either cancelled or encountered an exception, in which case 
getException() will return either the encountered exception or 
CancellationException.

The ForkJoinTask class is not usually directly subclassed. Instead, 
you subclass one of the abstract classes that support a particular 
style of fork/join processing, typically RecursiveAction for most 
computations that do not return results, RecursiveTask for those that 
do, and CountedCompleter for those in which completed actions 
trigger other actions. Normally, a concrete ForkJoinTask subclass 
declares fields comprising its parameters, established in a 
constructor, and then defines a compute method that somehow uses 
the control methods supplied by this base class.

在最典型的用法中,fork-join pair就像一个调用(fork),并从并行递归函数返回(join)。与其他形式的递归调用一样,返回(连接)应该在最里面执行。例如,a.fork(); b.fork(); b.join(); a.join();可能比join a在join b之前更有效率。

可在几个详细级别查询任务的执行状态:如果任务以任何方式完成(包括未执行任务被取消的情况),则isDone()为真;如果任务在没有取消或遇到异常的情况下完成,则isCompletedNormally()为true;如果任务被取消,则isCancelled()为true(在这种情况下,getException()返回CancellationException);如果任务被取消或遇到异常,则isCompletedAbnormally()为true,在这种情况下,getException()将返回遇到的异常或CancellationException。

ForkJoinTask类通常不直接子类化。相反,一般子类化一个支持特定样式的fork / join处理的抽象类,对于大多数不返回结果的计算,通常是RecursiveAction,对于那些返回任务的来说是RecursiveTask,而对于那些已完成的操作触发其他操作的,则使用CountedCompleter。通常,具体的ForkJoinTask子类声明包含其参数的字段,在构造函数中建立,然后定义以某种方式使用此基类提供的控制方法的计算方法。

Method join() and its variants are appropriate for use only when 
completion dependencies are acyclic; that is, the parallel computation 
can be described as a directed acyclic graph (DAG). Otherwise, 
executions may encounter a form of deadlock as tasks cyclically wait
 for each other. However, this framework supports other methods and 
techniques (for example the use of Phaser, helpQuiesce(), and 
complete(V)) that may be of use in constructing custom subclasses 
for problems that are not statically structured as DAGs. To support 
such usages, a ForkJoinTask may be atomically tagged with a short 
value using setForkJoinTaskTag(short) or 
compareAndSetForkJoinTaskTag(short, short) and checked using 
getForkJoinTaskTag(). The ForkJoinTask implementation does not 
use these protected methods or tags for any purpose, but they may 
be of use in the construction of specialized subclasses. For example, 
parallel graph traversals can use the supplied methods to avoid 
revisiting nodes/tasks that have already been processed. (Method 
names for tagging are bulky in part to encourage definition of 
methods that reflect their usage patterns.)

Most base support methods are final, to prevent overriding of 
implementations that are intrinsically tied to the underlying lightweight 
task scheduling framework. Developers creating new basic styles of 
fork/join processing should minimally implement protected methods 
exec(), setRawResult(V), and getRawResult(), while also introducing 
an abstract computational method that can be implemented in its 
subclasses, possibly relying on other protected methods provided by 
this class.

方法join()及其变体仅适用于完成依赖关系是非循环的时候;也就是说,并行计算可以描述为有向无环图(DAG)。否则,执行可能会遇到一种形式的死锁,因为任务会循环地等待彼此。但是,此框架支持其他方法和技术(例如,使用Phaser,helpQuiesce()和complete(V)),这些方为法和技术可用于静态结构化不是DAG的问题构造自定义子类。了支持这种用法,可以使用setForkJoinTaskTag(short)或compareAndSetForkJoinTaskTag(short,short)对forkJoinTask进行原子标记,并使用getForkJoinTaskTag()进行检查。 ForkJoinTask实现不会出于任何目的使用这些受保护的方法或标记,但它们可能用于构造专门的子类。例如,并行图遍历可以使用提供的方法来避免重新访问已经处理的节点/任务。 (标记的方法名称很长,部分是为了鼓励定义反映其使用模式的方法。)

大多数基本支持方法都是final,以防止覆盖,因为其实现与底层轻量级任务调度框架有内在联系。创建新的基本样式的fork / join处理的开发人员应该至少实现受保护的方法exec(),setRawResult(V)和getRawResult(),同时还引入一个可以在其子类中实现的抽象计算方法,可能依赖于由本类提供的其他受保护方法。

ForkJoinTasks should perform relatively small amounts of 
computation. Large tasks should be split into smaller subtasks, 
usually via recursive decomposition. As a very rough rule of thumb, a 
task should perform more than 100 and less than 10000 basic 
computational steps, and should avoid indefinite looping. If tasks are 
too big, then parallelism cannot improve throughput. If too small, then 
memory and internal task maintenance overhead may overwhelm 
processing.

This class provides adapt methods for Runnable and Callable, that 
may be of use when mixing execution of ForkJoinTasks with other 
kinds of tasks. When all tasks are of this form, consider using a pool 
constructed in asyncMode.

ForkJoinTasks are Serializable, which enables them to be used in 
extensions such as remote execution frameworks. It is sensible to 
serialize tasks only before or after, but not during, execution. 
Serialization is not relied on during execution itself.

ForkJoinTasks应该执行相对少量的计算。 应将大型任务拆分为较小的子任务,通常通过递归分解。 作为一个非常粗略的经验法则,任务应该执行超过100个且少于10000个基本计算步骤,并且应该避免无限循环。 如果任务太大,那么并行性无法提高吞吐量。 如果太小,那么内存和内部任务维护开销可能会超过处理。

此类为Runnable和Callable提供了适配方法,在将ForkJoinTasks的执行与其他类型的任务混合时可能会有用。 当所有任务都是这种形式时,请考虑使用asyncMode构造的池。

ForkJoinTasks是Serializable,它使它们可以用于扩展,例如远程执行框架。 在执行之前或之后,但不是在执行期间序列化任务是明智的。 执行过程中不依赖序列化。

2.域

    /*
     * The status field holds run control status bits packed into a
     * single int to minimize footprint and to ensure atomicity (via
     * CAS).  Status is initially zero, and takes on nonnegative
     * values until completed, upon which status (anded with
     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
     * undergoing blocking waits by other threads have the SIGNAL bit
     * set.  Completion of a stolen task with SIGNAL set awakens any
     * waiters via notifyAll. Even though suboptimal for some
     * purposes, we use basic builtin wait/notify to take advantage of
     * "monitor inflation" in JVMs that we would otherwise need to
     * emulate to avoid adding further per-task bookkeeping overhead.
     * We want these monitors to be "fat", i.e., not use biasing or
     * thin-lock techniques, so use some odd coding idioms that tend
     * to avoid them, mainly by arranging that every synchronized
     * block performs a wait, notifyAll or both.
     *
     * These control bits occupy only (some of) the upper half (16
     * bits) of status field. The lower bits are used for user-defined
     * tags.
     */

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags

status字段将运行控制状态位打包到单个int中,以最小化占用空间并确保原子性(通过CAS)。状态最初为零,并且在完成之前采用非负值,完成状态值为NORMAL、CANCELLED或EXCEPTIONAL。正在阻塞等待的任务设置了SIGNAL位。完成一项SIGNAL的被盗任务通过notifyAll唤醒任何等待线程。尽管对于某些目的而言不是最理想的,但使用基本的内置wait/notify机制来利用JVM中的“监视器膨胀”,否则需要模拟它们以避免进一步增加每个任务的簿记开销。希望这些监视器“fat”,即不使用偏置或瘦锁技术,因此使用一些易于避免它们的奇怪编码习语,主要是通过安排每个同步块执行wait、notifyAll或两者。

这些控制位仅占用状态字段的上半部分(16位)(部分)。较低的位用于用户定义的标记。

3.内部任务提交

  • fork()提交,返回task,可以进行控制:比如获取、取消等操作
  • invoke要等着返回结果
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }
    /**
     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.
     *
     * @return the computed result
     */
    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    /**
     * Implementation for invoke, quietlyInvoke.
     *
     * @return status upon completion
     */
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
            externalAwaitDone();
    }
    /**
     * Primary execution method for stolen tasks. Unless done, calls
     * exec and records status if completed, but doesn't wait for
     * completion otherwise.
     *
     * @return status on exit from this method
     */
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            }
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

doExec()会调用子类实现的保护exec方法完成计算。

3.1 awaitJoin

    /**
     * Helps and/or blocks until the given task is done or timeout.
     *
     * @param w caller
     * @param task the task
     * @param deadline for timed waits, if nonzero
     * @return task status on exit
     */
    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask<?> prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>)task : null;
            for (;;) {
                if ((s = task.status) < 0)
                    break;
                if (cc != null)
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))
                    helpStealer(w, task);
                if ((s = task.status) < 0)
                    break;
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                    break;
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
                if (tryCompensate(w)) {
                    task.internalWait(ms);
                    U.getAndAddLong(this, CTL, AC_UNIT);
                }
            }
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        }
        return s;
    }

  • step1.更新WorkQueue的currentJoin。

  • step2.如果任务已经结束((s = task.status) < 0),则break。
    step3.如果为CountedCompleter类型,则helpComplete
    step4.队列为空或者执行失败,任务可能被偷,帮助偷取者执行任务(互相帮助)helpStealer
    step5.如果任务已经结束((s = task.status) < 0),则break。
    step6.如果超时结束,则break
    step7.执行失败的情况下,执行补偿操作tryCompensate

  • step8.当前任务完成后,替换currentJoin为以前的值

        /**
         * If present, removes from queue and executes the given task,
         * or any other cancelled task. Used only by awaitJoin.
         *
         * @return true if queue empty and task not known to be done
         */
        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; int m, s, b, n;
            if ((a = array) != null && (m = a.length - 1) >= 0 &&
                task != null) {
                while ((n = (s = top) - (b = base)) > 0) {
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                        long j = ((--s & m) << ASHIFT) + ABASE;
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            return s + 1 == top;     // shorter than expected
                        else if (t == task) {
                            boolean removed = false;
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                                }
                            }
                            else if (base == b)      // replace with proxy
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            if (removed)
                                task.doExec();
                            break;
                        }
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;                  // was cancelled
                        }
                        if (--n == 0)
                            return false;
                    }
                    if (task.status < 0)
                        return false;
                }
            }
            return true;
        }
  • 从top遍历到base
  • 如果找到给定任务,在下面执行完后,break;
    1)任务在栈顶,则从队列中弹出任务
    2)任务在栈底,则替换为EmptyTask
    如果删除成功,则执行任务: task.doExec()
  • 栈顶任务不是给定任务,但是已经完成,则从栈中清除出去

如果任务完成,则返回false。

    /**
     * Tries to locate and execute tasks for a stealer of the given
     * task, or in turn one of its stealers, Traces currentSteal ->
     * currentJoin links looking for a thread working on a descendant
     * of the given task and with a non-empty queue to steal back and
     * execute tasks from. The first call to this method upon a
     * waiting join will often entail scanning/search, (which is OK
     * because the joiner has nothing better to do), but this method
     * leaves hints in workers to speed up subsequent calls.
     *
     * @param w caller
     * @param task the task to join
     */
    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
        WorkQueue[] ws = workQueues;
        int oldSum = 0, checkSum, m;
        if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
            task != null) {
            do {                                       // restart point
                checkSum = 0;                          // for stability check
                ForkJoinTask<?> subtask;
                WorkQueue j = w, v;                    // v is subtask stealer
                descent: for (subtask = task; subtask.status >= 0; ) {
                    for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                        if (k > m)                     // can't find stealer
                            break descent;
                        if ((v = ws[i = (h + k) & m]) != null) {
                            if (v.currentSteal == subtask) {
                                j.hint = i;
                                break;
                            }
                            checkSum += v.base;
                        }
                    }
                    for (;;) {                         // help v or descend
                        ForkJoinTask<?>[] a; int b;
                        checkSum += (b = v.base);
                        ForkJoinTask<?> next = v.currentJoin;
                        if (subtask.status < 0 || j.currentJoin != subtask ||
                            v.currentSteal != subtask) // stale
                            break descent;
                        if (b - v.top >= 0 || (a = v.array) == null) {
                            if ((subtask = next) == null)
                                break descent;
                            j = v;
                            break;
                        }
                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                             U.getObjectVolatile(a, i));
                        if (v.base == b) {
                            if (t == null)             // stale
                                break descent;
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                v.base = b + 1;
                                ForkJoinTask<?> ps = w.currentSteal;
                                int top = w.top;
                                do {
                                    U.putOrderedObject(w, QCURRENTSTEAL, t);
                                    t.doExec();        // clear local tasks too
                                } while (task.status >= 0 &&
                                         w.top != top &&
                                         (t = w.pop()) != null);
                                U.putOrderedObject(w, QCURRENTSTEAL, ps);
                                if (w.base != w.top)
                                    return;            // can't further help
                            }
                        }
                    }
                }
            } while (task.status >= 0 && oldSum != (oldSum = checkSum));
        }
    }

试图找到并执行给定任务的窃取者的任务,跟踪currentSteal - > currentJoin链接查找在给定任务的后代上工作的并使用非空队列的线程,从中窃取任务并执行。 在等待joini时对此方法的第一次调用通常需要扫描/搜索(这是可以的,因为joiner没有更好的事情可做),但是这种方法在工作线程中留下提示以加速后续调用。

  • step1.遍历workQueues中的WorkQueue,找到其currentSteal为subtask(初始时为给定的task)的WorkQueue
  • step2.获取偷取者的currentJoin任务
    有两个条件判断:
    1)如果当前状态已经变换了,例如subtask已经完成,workQueue的currentJoin已经不是subtask或者偷取者的currentSteal已经不是subtask,要break
    2)如果偷取者的为空,若currentJoin为null,则break;否则,偷取者任务为空,可能任务也被偷走了,需要继续查找偷取者。
  • step3.从偷取者队列的栈底进行操作
    从偷取者栈顶弹出任务;
    然后更新给定workQueue的currentSteal为偷取者的base任务,并执行该任务;
    在给定任务没有完成并且给定workQueue中有任务时,则依次弹出任务(LIFO)->更新currentSteal->执行该任务(注意这里是自己偷自己的任务执行);
    还原给定workQueue的currentSteal
    /**
     * Tries to decrement active count (sometimes implicitly) and
     * possibly release or create a compensating worker in preparation
     * for blocking. Returns false (retryable by caller), on
     * contention, detected staleness, instability, or termination.
     *
     * @param w caller
     */
    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws; long c; int m, pc, sp;
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);
        else {
            int ac = (int)(c >> AC_SHIFT) + pc;
            int tc = (short)(c >> TC_SHIFT) + pc;
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    if ((v.scanState & SCANNING) != 0)
                        break;
                    ++nbusy;
                }
            }
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                           (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
            }
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT)));
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                canBlock = add && createWorker(); // throws on exception
            }
        }
        return canBlock;
    }

尝试减少活动计数(有时是隐式)并可能释放或创建补偿工作线程以准备阻塞。 在争用、已经过时、不稳定性或终止时返回false(调用者可重试)

config是ForkJoinPool的域,其低16位表示并行度,高16位表示mode。pc = config & SMASK获取的就是并行度。

  • step1.如果调用者终止,或者workQueues为空,或者线程池的并行度为0,即被终止,则赋值canBlock = false
  • step2.如果有空闲的工作线程,释放它
  • step3.没有空闲线程
    1)遍历两遍奇数索引的WorkQueue,查找scanState为SCANNING的队列。
    2)总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空,不需要补偿
    3)更新总线程数,(活跃工作线程不足)并创建一个新的工作线程来补偿

需要补偿:

  • 调用者队列不为空,并且有空闲工作线程,这种情况会唤醒空闲线程(调用tryRelease方法)
  • 池尚未停止,活跃线程数不足,这时会新建一个工作线程(调用createWorker方法)

不需要补偿:

  • 调用者已终止或池处于不稳定状态
  • 总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空
    /**
     * Signals and releases worker v if it is top of idle worker
     * stack.  This performs a one-shot version of signalWork only if
     * there is (apparently) at least one idle worker.
     *
     * @param c incoming ctl value
     * @param v if non-null, a worker
     * @param inc the increment to active count (zero when compensating)
     * @return true if successful
     */
    private boolean tryRelease(long c, WorkQueue v, long inc) {
        int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
        if (v != null && v.scanState == sp) {          // v is at top of stack
            long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
            if (U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;
                if ((p = v.parker) != null)
                    U.unpark(p);
                return true;
            }
        }
        return false;
    }

3.2 externalAwaitDone

    /**
     * Blocks a non-worker-thread until completion.
     * @return status upon completion
     */
    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                 ForkJoinPool.common.externalHelpComplete(
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                                wait(0L);
                            } catch (InterruptedException ie) {
                                interrupted = true;
                            }
                        }
                        else
                            notifyAll();
                    }
                }
            } while ((s = status) >= 0);
            if (interrupted)
                Thread.currentThread().interrupt();
        }
        return s;
    }
  • 如果是CountedCompleter,则调用ForkJoinPool.common.externalHelpComplete
  • 其他类型,则调用ForkJoinPool.common.tryExternalUnpush(this),如果在栈顶,则弹出并后续执行doExec()
  • 如果执行失败,进入等待

4.join方法

    /**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }
    /**
     * Implementation for join, get, quietlyJoin. Directly handles
     * only cases of already-completed, external wait, and
     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
     *
     * @return status upon completion
     */
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :
            externalAwaitDone();
    }

        /**
         * Pops the given task only if it is at the current top.
         * (A shared version is available only via FJP.tryExternalUnpush)
        */
        final boolean tryUnpush(ForkJoinTask<?> t) {
            ForkJoinTask<?>[] a; int s;
            if ((a = array) != null && (s = top) != base &&
                U.compareAndSwapObject
                (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
                U.putOrderedInt(this, QTOP, s);
                return true;
            }
            return false;
        }

如果给定任务位于栈顶,则弹出任务。top也要减一;并且随后要执行该任务&& s = doExec()。

5.RecursiveTask、RecursiveAction及CountedCompleter


5.1 RecursiveTask

 class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     f1.fork();
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();
   }
 }

然而,除了作为计算Fibonacci函数的性能不好的愚蠢方法(实践中使用简单的快速线性算法),因为最小的子任务太小而不值得拆分。 相反,就像几乎所有fork / join应用程序的情况一样,选择一些最小粒度大小(例如此处为10),不要继续在细分下去。

5.2 RecursiveAction

一个递归无结果的ForkJoinTask。约定Void ForkJoinTasks。 因为null是Void类型的唯一有效值,所以诸如join之类的方法在完成时始终返回null。

对给定的long []数组进行排序的示例程序:

 static class SortTask extends RecursiveAction {
   final long[] array; final int lo, hi;
   SortTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   SortTask(long[] array) { this(array, 0, array.length); }
   protected void compute() {
     if (hi - lo < THRESHOLD)
       sortSequentially(lo, hi);
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new SortTask(array, lo, mid),
                 new SortTask(array, mid, hi));
       merge(lo, mid, hi);
     }
   }
   // implementation details follow:
   static final int THRESHOLD = 1000;
   void sortSequentially(int lo, int hi) {
     Arrays.sort(array, lo, hi);
   }
   void merge(int lo, int mid, int hi) {
     long[] buf = Arrays.copyOfRange(array, lo, mid);
     for (int i = 0, j = lo, k = mid; i < buf.length; j++)
       array[j] = (k == hi || buf[i] < array[k]) ?
         buf[i++] : array[k++];
   }
 }

对数组的每个元素加1:

 class IncrementTask extends RecursiveAction {
   final long[] array; final int lo, hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   }
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
         array[i]++;
     }
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));
     }
   }
 }

以下示例说明了可能导致更好性能的一些改进和习惯用法:RecursiveActions不需要完全递归,只要它们保持基本的分治算法即可。这里分为两部分后,左边是直接在任务中进行计算,而没有继续进行细分。

 double sumOfSquares(ForkJoinPool pool, double[] array) {
   int n = array.length;
   Applyer a = new Applyer(array, 0, n, null);
   pool.invoke(a);
   return a.result;
 }

 class Applyer extends RecursiveAction {
   final double[] array;
   final int lo, hi;
   double result;
   Applyer next; // keeps track of right-hand-side tasks
   Applyer(double[] array, int lo, int hi, Applyer next) {
     this.array = array; this.lo = lo; this.hi = hi;
     this.next = next;
   }

   double atLeaf(int l, int h) {
     double sum = 0;
     for (int i = l; i < h; ++i) // perform leftmost base step
       sum += array[i] * array[i];
     return sum;
   }

   protected void compute() {
     int l = lo;
     int h = hi;
     Applyer right = null;
     while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
       int mid = (l + h) >>> 1;
       right = new Applyer(array, mid, h, right);
       right.fork();
       h = mid;
     }
     double sum = atLeaf(l, h);
     while (right != null) {
       if (right.tryUnfork()) // directly calculate if not stolen
         sum += right.atLeaf(right.lo, right.hi);
       else {
         right.join();
         sum += right.result;
       }
       right = right.next;
     }
     result = sum;
   }
 }

5.3 CountedCompleter

CountedCompleter使用普通树的结构存放动作,但是它又是另类的树,因为子节点能找到父节点,父节点却找不到子节点,而只知道子节点代表的动作未执行的数量,因此或许从访问方式的角度来看还是用栈来理解更好。

并行流是ForkJoin框架的一个典型应用,JAVA8 Stream api中的并行流定义了大量的以CountedCompleter为基础的操作。利用分割/合并和周边组件实现了基于ForkJoin框架的并行计算调度。

6.总结

  • 任务状态
    1)当状态完成时,为负数,表示正常完成、取消或者异常
    2)阻塞等待的任务设置了SIGNAL
  • 内部提交
    fork——直接加入到当前线程的workQueue中
    invoke——提交任务等待任务完成并获取结果(当前线程执行)
    join——等待任务完成并获取结果,尝试在当前线程中开始执行。
  • 任务类型
    RecursiveAction——不返回结果的计算
    RecursiveTask——返回结果
    CountedCompleter——已完成的操作触发其他操作
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352