

Abstract base class for tasks that run within a ForkJoinPool. A 
ForkJoinTask is a thread-like entity that is much lighter weight than a 
normal thread. Huge numbers of tasks and subtasks may be hosted 
by a small number of actual threads in a ForkJoinPool, at the price of 
some usage limitations.

A "main" ForkJoinTask begins execution when it is explicitly submitted 
to a ForkJoinPool, or, if not already engaged in a ForkJoin 
computation, commenced in the ForkJoinPool.commonPool() via 
fork(), invoke(), or related methods. Once started, it will usually in turn 
start other subtasks. As indicated by the name of this class, many 
programs using ForkJoinTask employ only methods fork() and join(), 
or derivatives such as invokeAll. However, this class also provides a 
number of other methods that can come into play in advanced 
usages, as well as extension mechanics that allow support of new 
forms of fork/join processing.


一个“main” ForkJoinTask在显式提交给ForkJoinPool时开始执行,或者,如果尚未参与ForkJoin计算,则通过fork()、invoke()或相关方法在ForkJoinPool.commonPool()中开始执行。一旦启动,它通常会依次启动其他子任务。正如此类的名称所示,许多使用ForkJoinTask的程序仅使用方法fork()和join(),或者使用invokeAll等派生类。但是,这个类还提供了许多其他方法,可以在高级用法中发挥作用,以及允许支持新形式的fork / join处理的扩展机制。

A ForkJoinTask is a lightweight form of Future. The efficiency of 
ForkJoinTasks stems from a set of restrictions (that are only partially 
statically enforceable) reflecting their main use as computational 
tasks calculating pure functions or operating on purely isolated 
objects. The primary coordination mechanisms are fork(), that 
arranges asynchronous execution, and join(), that doesn't proceed 
until the task's result has been computed. Computations should 
ideally avoid synchronized methods or blocks, and should minimize 
other blocking synchronization apart from joining other tasks or using
 synchronizers such as Phasers that are advertised to cooperate with 
fork/join scheduling. Subdividable tasks should also not perform 
blocking I/O, and should ideally access variables that are completely 
independent of those accessed by other running tasks. These 
guidelines are loosely enforced by not permitting checked exceptions
 such as IOExceptions to be thrown. However, computations may still 
encounter unchecked exceptions, that are rethrown to callers 
attempting to join them. These exceptions may additionally include 
RejectedExecutionException stemming from internal resource 
exhaustion, such as failure to allocate internal task queues. Rethrown 
exceptions behave in the same way as regular exceptions, but, when 
possible, contain stack traces (as displayed for example using 
ex.printStackTrace()) of both the thread that initiated the computation 
as well as the thread actually encountering the exception; minimally 
only the latter.

ForkJoinTask是Future的轻量级形式。 ForkJoinTasks的效率源于一系列限制(仅部分静态可执行),反映了它们主要用于计算纯函数的计算任务或在纯孤立对象上操作。主要的协调机制是fork(),它安排异步执行、join(),在计算完成任务结果之前不会继续。理想情况下,计算应该避免同步的方法或块,并且应该最小化其他阻塞同步,除了joining其他任务或使用同步器:例如Phasers,配合fork / join调度。子分类任务也不应该执行阻塞I/O,理想情况下访问变量应该完全独立于其他正在运行的任务。松散地强制执行这些准则:不允许抛出IOExceptions之类的受查异常。但是,计算可能仍会遇到未经检查的异常,这些异常会被尝试join它们的调用者重新抛出。这些异常可能还包括源于内部资源耗尽的RejectedExecutionException,例如无法分配内部任务队列。 Rethrown异常的行为方式与常规异常相同,但是,如果可能,包含启动计算的线程以及实际遇到异常的线程的堆栈跟踪(例如使用ex.printStackTrace()显示);至少要包含后者。

It is possible to define and use ForkJoinTasks that may block, but 
doing do requires three further considerations: (1) Completion of few 
if any other tasks should be dependent on a task that blocks on 
external synchronization or I/O. Event-style async tasks that are 
never joined (for example, those subclassing CountedCompleter) 
often fall into this category. (2) To minimize resource impact, tasks 
should be small; ideally performing only the (possibly) blocking action. 
(3) Unless the ForkJoinPool.ManagedBlocker API is used, or the 
number of possibly blocked tasks is known to be less than the pool's 
ForkJoinPool.getParallelism() level, the pool cannot guarantee that 
enough threads will be available to ensure progress or good 

The primary method for awaiting completion and extracting results of 
a task is join(), but there are several variants: The Future.get() 
methods support interruptible and/or timed waits for completion and 
report results using Future conventions. Method invoke() is 
semantically equivalent to fork(); join() but always attempts to begin 
execution in the current thread. The "quiet" forms of these methods 
do not extract results or report exceptions. These may be useful when 
a set of tasks are being executed, and you need to delay processing 
of results or exceptions until all complete. Method invokeAll (available 
in multiple versions) performs the most common form of parallel 
invocation: forking a set of tasks and joining them all.

可以定义和使用可能阻塞的ForkJoinTasks,但这样做需要进一步考虑:(1)如果任何其他任务应该依赖于阻止外部同步或I / O的任务,则完成很少。从未join的事件类型异步任务(例如,那些子类化CountedCompleter)通常属于此类别。 (2)为了尽量减少资源影响,任务应该很小;理想情况下只执行(可能)阻塞action。 (3)除非使用ForkJoinPool.ManagedBlocker API,或者已知可能阻塞的任务数小于池的ForkJoinPool.getParallelism()级别,否则池无法保证有足够的线程可用于确保进度或良好性能。

等待完成和提取任务结果的主要方法是join(),但有几种变体:Future.get()方法支持使用Future约定完成和报告结果的可中断和/或定时等待。方法invoke()在语义上等同于fork(); 但join()总是尝试在当前线程中开始执行。这些方法的“quiet”形式不会提取结果或报告异常。这可能很有用:在执行一组任务时,需要延迟处理结果或异常,直到完成所有任务。方法invokeAll(在多个版本中可用)执行最常见的并行调用形式:forking一组任务并将它们全部join起来。

In the most typical usages, a fork-join pair act like a call (fork) and 
return (join) from a parallel recursive function. As is the case with 
other forms of recursive calls, returns (joins) should be performed 
innermost-first. For example, a.fork(); b.fork(); b.join(); a.join(); is likely 
to be substantially more efficient than joining a before b.

The execution status of tasks may be queried at several levels of 
detail: isDone() is true if a task completed in any way (including the 
case where a task was cancelled without executing); 
isCompletedNormally() is true if a task completed without cancellation
 or encountering an exception; isCancelled() is true if the task was 
cancelled (in which case getException() returns a 
CancellationException); and isCompletedAbnormally() is true if a task 
was either cancelled or encountered an exception, in which case 
getException() will return either the encountered exception or 

The ForkJoinTask class is not usually directly subclassed. Instead, 
you subclass one of the abstract classes that support a particular 
style of fork/join processing, typically RecursiveAction for most 
computations that do not return results, RecursiveTask for those that 
do, and CountedCompleter for those in which completed actions 
trigger other actions. Normally, a concrete ForkJoinTask subclass 
declares fields comprising its parameters, established in a 
constructor, and then defines a compute method that somehow uses 
the control methods supplied by this base class.

在最典型的用法中,fork-join pair就像一个调用(fork),并从并行递归函数返回(join)。与其他形式的递归调用一样,返回(连接)应该在最里面执行。例如,a.fork(); b.fork(); b.join(); a.join();可能比join a在join b之前更有效率。


ForkJoinTask类通常不直接子类化。相反,一般子类化一个支持特定样式的fork / join处理的抽象类,对于大多数不返回结果的计算,通常是RecursiveAction,对于那些返回任务的来说是RecursiveTask,而对于那些已完成的操作触发其他操作的,则使用CountedCompleter。通常,具体的ForkJoinTask子类声明包含其参数的字段,在构造函数中建立,然后定义以某种方式使用此基类提供的控制方法的计算方法。

Method join() and its variants are appropriate for use only when 
completion dependencies are acyclic; that is, the parallel computation 
can be described as a directed acyclic graph (DAG). Otherwise, 
executions may encounter a form of deadlock as tasks cyclically wait
 for each other. However, this framework supports other methods and 
techniques (for example the use of Phaser, helpQuiesce(), and 
complete(V)) that may be of use in constructing custom subclasses 
for problems that are not statically structured as DAGs. To support 
such usages, a ForkJoinTask may be atomically tagged with a short 
value using setForkJoinTaskTag(short) or 
compareAndSetForkJoinTaskTag(short, short) and checked using 
getForkJoinTaskTag(). The ForkJoinTask implementation does not 
use these protected methods or tags for any purpose, but they may 
be of use in the construction of specialized subclasses. For example, 
parallel graph traversals can use the supplied methods to avoid 
revisiting nodes/tasks that have already been processed. (Method 
names for tagging are bulky in part to encourage definition of 
methods that reflect their usage patterns.)

Most base support methods are final, to prevent overriding of 
implementations that are intrinsically tied to the underlying lightweight 
task scheduling framework. Developers creating new basic styles of 
fork/join processing should minimally implement protected methods 
exec(), setRawResult(V), and getRawResult(), while also introducing 
an abstract computational method that can be implemented in its 
subclasses, possibly relying on other protected methods provided by 
this class.

方法join()及其变体仅适用于完成依赖关系是非循环的时候;也就是说,并行计算可以描述为有向无环图(DAG)。否则,执行可能会遇到一种形式的死锁,因为任务会循环地等待彼此。但是,此框架支持其他方法和技术(例如,使用Phaser,helpQuiesce()和complete(V)),这些方为法和技术可用于静态结构化不是DAG的问题构造自定义子类。了支持这种用法,可以使用setForkJoinTaskTag(short)或compareAndSetForkJoinTaskTag(short,short)对forkJoinTask进行原子标记,并使用getForkJoinTaskTag()进行检查。 ForkJoinTask实现不会出于任何目的使用这些受保护的方法或标记,但它们可能用于构造专门的子类。例如,并行图遍历可以使用提供的方法来避免重新访问已经处理的节点/任务。 (标记的方法名称很长,部分是为了鼓励定义反映其使用模式的方法。)

大多数基本支持方法都是final,以防止覆盖,因为其实现与底层轻量级任务调度框架有内在联系。创建新的基本样式的fork / join处理的开发人员应该至少实现受保护的方法exec(),setRawResult(V)和getRawResult(),同时还引入一个可以在其子类中实现的抽象计算方法,可能依赖于由本类提供的其他受保护方法。

ForkJoinTasks should perform relatively small amounts of 
computation. Large tasks should be split into smaller subtasks, 
usually via recursive decomposition. As a very rough rule of thumb, a 
task should perform more than 100 and less than 10000 basic 
computational steps, and should avoid indefinite looping. If tasks are 
too big, then parallelism cannot improve throughput. If too small, then 
memory and internal task maintenance overhead may overwhelm 

This class provides adapt methods for Runnable and Callable, that 
may be of use when mixing execution of ForkJoinTasks with other 
kinds of tasks. When all tasks are of this form, consider using a pool 
constructed in asyncMode.

ForkJoinTasks are Serializable, which enables them to be used in 
extensions such as remote execution frameworks. It is sensible to 
serialize tasks only before or after, but not during, execution. 
Serialization is not relied on during execution itself.

ForkJoinTasks应该执行相对少量的计算。 应将大型任务拆分为较小的子任务,通常通过递归分解。 作为一个非常粗略的经验法则,任务应该执行超过100个且少于10000个基本计算步骤,并且应该避免无限循环。 如果任务太大,那么并行性无法提高吞吐量。 如果太小,那么内存和内部任务维护开销可能会超过处理。

此类为Runnable和Callable提供了适配方法,在将ForkJoinTasks的执行与其他类型的任务混合时可能会有用。 当所有任务都是这种形式时,请考虑使用asyncMode构造的池。

ForkJoinTasks是Serializable,它使它们可以用于扩展,例如远程执行框架。 在执行之前或之后,但不是在执行期间序列化任务是明智的。 执行过程中不依赖序列化。


     * The status field holds run control status bits packed into a
     * single int to minimize footprint and to ensure atomicity (via
     * CAS).  Status is initially zero, and takes on nonnegative
     * values until completed, upon which status (anded with
     * DONE_MASK) holds value NORMAL, CANCELLED, or EXCEPTIONAL. Tasks
     * undergoing blocking waits by other threads have the SIGNAL bit
     * set.  Completion of a stolen task with SIGNAL set awakens any
     * waiters via notifyAll. Even though suboptimal for some
     * purposes, we use basic builtin wait/notify to take advantage of
     * "monitor inflation" in JVMs that we would otherwise need to
     * emulate to avoid adding further per-task bookkeeping overhead.
     * We want these monitors to be "fat", i.e., not use biasing or
     * thin-lock techniques, so use some odd coding idioms that tend
     * to avoid them, mainly by arranging that every synchronized
     * block performs a wait, notifyAll or both.
     * These control bits occupy only (some of) the upper half (16
     * bits) of status field. The lower bits are used for user-defined
     * tags.

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    static final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits
    static final int NORMAL      = 0xf0000000;  // must be negative
    static final int CANCELLED   = 0xc0000000;  // must be < NORMAL
    static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED
    static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16
    static final int SMASK       = 0x0000ffff;  // short bits for tags




  • fork()提交,返回task,可以进行控制:比如获取、取消等操作
  • invoke要等着返回结果
    public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        return this;
     * Commences performing this task, awaits its completion if
     * necessary, and returns its result, or throws an (unchecked)
     * {@code RuntimeException} or {@code Error} if the underlying
     * computation did so.
     * @return the computed result
    public final V invoke() {
        int s;
        if ((s = doInvoke() & DONE_MASK) != NORMAL)
        return getRawResult();
     * Implementation for invoke, quietlyInvoke.
     * @return status upon completion
    private int doInvoke() {
        int s; Thread t; ForkJoinWorkerThread wt;
        return (s = doExec()) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (wt = (ForkJoinWorkerThread)t).pool.
            awaitJoin(wt.workQueue, this, 0L) :
     * Primary execution method for stolen tasks. Unless done, calls
     * exec and records status if completed, but doesn't wait for
     * completion otherwise.
     * @return status on exit from this method
    final int doExec() {
        int s; boolean completed;
        if ((s = status) >= 0) {
            try {
                completed = exec();
            } catch (Throwable rex) {
                return setExceptionalCompletion(rex);
            if (completed)
                s = setCompletion(NORMAL);
        return s;


3.1 awaitJoin

     * Helps and/or blocks until the given task is done or timeout.
     * @param w caller
     * @param task the task
     * @param deadline for timed waits, if nonzero
     * @return task status on exit
    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
        int s = 0;
        if (task != null && w != null) {
            ForkJoinTask<?> prevJoin = w.currentJoin;
            U.putOrderedObject(w, QCURRENTJOIN, task);
            CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                (CountedCompleter<?>)task : null;
            for (;;) {
                if ((s = task.status) < 0)
                if (cc != null)
                    helpComplete(w, cc, 0);
                else if (w.base == w.top || w.tryRemoveAndExec(task))
                    helpStealer(w, task);
                if ((s = task.status) < 0)
                long ms, ns;
                if (deadline == 0L)
                    ms = 0L;
                else if ((ns = deadline - System.nanoTime()) <= 0L)
                else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                    ms = 1L;
                if (tryCompensate(w)) {
                    U.getAndAddLong(this, CTL, AC_UNIT);
            U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
        return s;

  • step1.更新WorkQueue的currentJoin。

  • step2.如果任务已经结束((s = task.status) < 0),则break。
    step5.如果任务已经结束((s = task.status) < 0),则break。

  • step8.当前任务完成后,替换currentJoin为以前的值

         * If present, removes from queue and executes the given task,
         * or any other cancelled task. Used only by awaitJoin.
         * @return true if queue empty and task not known to be done
        final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; int m, s, b, n;
            if ((a = array) != null && (m = a.length - 1) >= 0 &&
                task != null) {
                while ((n = (s = top) - (b = base)) > 0) {
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                        long j = ((--s & m) << ASHIFT) + ABASE;
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            return s + 1 == top;     // shorter than expected
                        else if (t == task) {
                            boolean removed = false;
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                            else if (base == b)      // replace with proxy
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            if (removed)
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;                  // was cancelled
                        if (--n == 0)
                            return false;
                    if (task.status < 0)
                        return false;
            return true;
  • 从top遍历到base
  • 如果找到给定任务,在下面执行完后,break;
    如果删除成功,则执行任务: task.doExec()
  • 栈顶任务不是给定任务,但是已经完成,则从栈中清除出去


     * Tries to locate and execute tasks for a stealer of the given
     * task, or in turn one of its stealers, Traces currentSteal ->
     * currentJoin links looking for a thread working on a descendant
     * of the given task and with a non-empty queue to steal back and
     * execute tasks from. The first call to this method upon a
     * waiting join will often entail scanning/search, (which is OK
     * because the joiner has nothing better to do), but this method
     * leaves hints in workers to speed up subsequent calls.
     * @param w caller
     * @param task the task to join
    private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
        WorkQueue[] ws = workQueues;
        int oldSum = 0, checkSum, m;
        if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
            task != null) {
            do {                                       // restart point
                checkSum = 0;                          // for stability check
                ForkJoinTask<?> subtask;
                WorkQueue j = w, v;                    // v is subtask stealer
                descent: for (subtask = task; subtask.status >= 0; ) {
                    for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                        if (k > m)                     // can't find stealer
                            break descent;
                        if ((v = ws[i = (h + k) & m]) != null) {
                            if (v.currentSteal == subtask) {
                                j.hint = i;
                            checkSum += v.base;
                    for (;;) {                         // help v or descend
                        ForkJoinTask<?>[] a; int b;
                        checkSum += (b = v.base);
                        ForkJoinTask<?> next = v.currentJoin;
                        if (subtask.status < 0 || j.currentJoin != subtask ||
                            v.currentSteal != subtask) // stale
                            break descent;
                        if (b - v.top >= 0 || (a = v.array) == null) {
                            if ((subtask = next) == null)
                                break descent;
                            j = v;
                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                             U.getObjectVolatile(a, i));
                        if (v.base == b) {
                            if (t == null)             // stale
                                break descent;
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                v.base = b + 1;
                                ForkJoinTask<?> ps = w.currentSteal;
                                int top = w.top;
                                do {
                                    U.putOrderedObject(w, QCURRENTSTEAL, t);
                                    t.doExec();        // clear local tasks too
                                } while (task.status >= 0 &&
                                         w.top != top &&
                                         (t = w.pop()) != null);
                                U.putOrderedObject(w, QCURRENTSTEAL, ps);
                                if (w.base != w.top)
                                    return;            // can't further help
            } while (task.status >= 0 && oldSum != (oldSum = checkSum));

试图找到并执行给定任务的窃取者的任务,跟踪currentSteal - > currentJoin链接查找在给定任务的后代上工作的并使用非空队列的线程,从中窃取任务并执行。 在等待joini时对此方法的第一次调用通常需要扫描/搜索(这是可以的,因为joiner没有更好的事情可做),但是这种方法在工作线程中留下提示以加速后续调用。

  • step1.遍历workQueues中的WorkQueue,找到其currentSteal为subtask(初始时为给定的task)的WorkQueue
  • step2.获取偷取者的currentJoin任务
  • step3.从偷取者队列的栈底进行操作
     * Tries to decrement active count (sometimes implicitly) and
     * possibly release or create a compensating worker in preparation
     * for blocking. Returns false (retryable by caller), on
     * contention, detected staleness, instability, or termination.
     * @param w caller
    private boolean tryCompensate(WorkQueue w) {
        boolean canBlock;
        WorkQueue[] ws; long c; int m, pc, sp;
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            canBlock = tryRelease(c, ws[sp & m], 0L);
        else {
            int ac = (int)(c >> AC_SHIFT) + pc;
            int tc = (short)(c >> TC_SHIFT) + pc;
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    if ((v.scanState & SCANNING) != 0)
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) |
                           (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc);
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT)));
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                canBlock = add && createWorker(); // throws on exception
        return canBlock;

尝试减少活动计数(有时是隐式)并可能释放或创建补偿工作线程以准备阻塞。 在争用、已经过时、不稳定性或终止时返回false(调用者可重试)

config是ForkJoinPool的域,其低16位表示并行度,高16位表示mode。pc = config & SMASK获取的就是并行度。

  • step1.如果调用者终止,或者workQueues为空,或者线程池的并行度为0,即被终止,则赋值canBlock = false
  • step2.如果有空闲的工作线程,释放它
  • step3.没有空闲线程
    2)总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空,不需要补偿


  • 调用者队列不为空,并且有空闲工作线程,这种情况会唤醒空闲线程(调用tryRelease方法)
  • 池尚未停止,活跃线程数不足,这时会新建一个工作线程(调用createWorker方法)


  • 调用者已终止或池处于不稳定状态
  • 总线程数大于并行度 && 活动线程数大于1 && 调用者任务队列为空
     * Signals and releases worker v if it is top of idle worker
     * stack.  This performs a one-shot version of signalWork only if
     * there is (apparently) at least one idle worker.
     * @param c incoming ctl value
     * @param v if non-null, a worker
     * @param inc the increment to active count (zero when compensating)
     * @return true if successful
    private boolean tryRelease(long c, WorkQueue v, long inc) {
        int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
        if (v != null && v.scanState == sp) {          // v is at top of stack
            long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
            if (U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;
                if ((p = v.parker) != null)
                return true;
        return false;

3.2 externalAwaitDone

     * Blocks a non-worker-thread until completion.
     * @return status upon completion
    private int externalAwaitDone() {
        int s = ((this instanceof CountedCompleter) ? // try helping
                     (CountedCompleter<?>)this, 0) :
                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
        if (s >= 0 && (s = status) >= 0) {
            boolean interrupted = false;
            do {
                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
                    synchronized (this) {
                        if (status >= 0) {
                            try {
                            } catch (InterruptedException ie) {
                                interrupted = true;
            } while ((s = status) >= 0);
            if (interrupted)
        return s;
  • 如果是CountedCompleter,则调用ForkJoinPool.common.externalHelpComplete
  • 其他类型,则调用ForkJoinPool.common.tryExternalUnpush(this),如果在栈顶,则弹出并后续执行doExec()
  • 如果执行失败,进入等待


     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     * @return the computed result
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
        return getRawResult();
     * Implementation for join, get, quietlyJoin. Directly handles
     * only cases of already-completed, external wait, and
     * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin.
     * @return status upon completion
    private int doJoin() {
        int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
        return (s = status) < 0 ? s :
            ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
            (w = (wt = (ForkJoinWorkerThread)t).workQueue).
            tryUnpush(this) && (s = doExec()) < 0 ? s :
            wt.pool.awaitJoin(w, this, 0L) :

         * Pops the given task only if it is at the current top.
         * (A shared version is available only via FJP.tryExternalUnpush)
        final boolean tryUnpush(ForkJoinTask<?> t) {
            ForkJoinTask<?>[] a; int s;
            if ((a = array) != null && (s = top) != base &&
                (a, (((a.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
                U.putOrderedInt(this, QTOP, s);
                return true;
            return false;

如果给定任务位于栈顶,则弹出任务。top也要减一;并且随后要执行该任务&& s = doExec()。


5.1 RecursiveTask

 class Fibonacci extends RecursiveTask<Integer> {
   final int n;
   Fibonacci(int n) { this.n = n; }
   Integer compute() {
     if (n <= 1)
       return n;
     Fibonacci f1 = new Fibonacci(n - 1);
     Fibonacci f2 = new Fibonacci(n - 2);
     return f2.compute() + f1.join();

然而,除了作为计算Fibonacci函数的性能不好的愚蠢方法(实践中使用简单的快速线性算法),因为最小的子任务太小而不值得拆分。 相反,就像几乎所有fork / join应用程序的情况一样,选择一些最小粒度大小(例如此处为10),不要继续在细分下去。

5.2 RecursiveAction

一个递归无结果的ForkJoinTask。约定Void ForkJoinTasks。 因为null是Void类型的唯一有效值,所以诸如join之类的方法在完成时始终返回null。

对给定的long []数组进行排序的示例程序:

 static class SortTask extends RecursiveAction {
   final long[] array; final int lo, hi;
   SortTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   SortTask(long[] array) { this(array, 0, array.length); }
   protected void compute() {
     if (hi - lo < THRESHOLD)
       sortSequentially(lo, hi);
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new SortTask(array, lo, mid),
                 new SortTask(array, mid, hi));
       merge(lo, mid, hi);
   // implementation details follow:
   static final int THRESHOLD = 1000;
   void sortSequentially(int lo, int hi) {
     Arrays.sort(array, lo, hi);
   void merge(int lo, int mid, int hi) {
     long[] buf = Arrays.copyOfRange(array, lo, mid);
     for (int i = 0, j = lo, k = mid; i < buf.length; j++)
       array[j] = (k == hi || buf[i] < array[k]) ?
         buf[i++] : array[k++];


 class IncrementTask extends RecursiveAction {
   final long[] array; final int lo, hi;
   IncrementTask(long[] array, int lo, int hi) {
     this.array = array; this.lo = lo; this.hi = hi;
   protected void compute() {
     if (hi - lo < THRESHOLD) {
       for (int i = lo; i < hi; ++i)
     else {
       int mid = (lo + hi) >>> 1;
       invokeAll(new IncrementTask(array, lo, mid),
                 new IncrementTask(array, mid, hi));


 double sumOfSquares(ForkJoinPool pool, double[] array) {
   int n = array.length;
   Applyer a = new Applyer(array, 0, n, null);
   return a.result;

 class Applyer extends RecursiveAction {
   final double[] array;
   final int lo, hi;
   double result;
   Applyer next; // keeps track of right-hand-side tasks
   Applyer(double[] array, int lo, int hi, Applyer next) {
     this.array = array; this.lo = lo; this.hi = hi;
     this.next = next;

   double atLeaf(int l, int h) {
     double sum = 0;
     for (int i = l; i < h; ++i) // perform leftmost base step
       sum += array[i] * array[i];
     return sum;

   protected void compute() {
     int l = lo;
     int h = hi;
     Applyer right = null;
     while (h - l > 1 && getSurplusQueuedTaskCount() <= 3) {
       int mid = (l + h) >>> 1;
       right = new Applyer(array, mid, h, right);
       h = mid;
     double sum = atLeaf(l, h);
     while (right != null) {
       if (right.tryUnfork()) // directly calculate if not stolen
         sum += right.atLeaf(right.lo, right.hi);
       else {
         sum += right.result;
       right = right.next;
     result = sum;

5.3 CountedCompleter


并行流是ForkJoin框架的一个典型应用,JAVA8 Stream api中的并行流定义了大量的以CountedCompleter为基础的操作。利用分割/合并和周边组件实现了基于ForkJoin框架的并行计算调度。


  • 任务状态
  • 内部提交
  • 任务类型
