1.ForkJoinPool使用说明
An ExecutorService for running ForkJoinTasks. A ForkJoinPool
provides the entry point for submissions from non-ForkJoinTask
clients, as well as management and monitoring operations.
A ForkJoinPool differs from other kinds of ExecutorService mainly by
virtue of employing work-stealing: all threads in the pool attempt to
find and execute tasks submitted to the pool and/or created by other
active tasks (eventually blocking waiting for work if none exist). This
enables efficient processing when most tasks spawn other subtasks
(as do most ForkJoinTasks), as well as when many small tasks are
submitted to the pool from external clients. Especially when setting
asyncMode to true in constructors, ForkJoinPools may also be
appropriate for use with event-style tasks that are never joined.
A static commonPool() is available and appropriate for most
applications. The common pool is used by any ForkJoinTask that is
not explicitly submitted to a specified pool. Using the common pool
normally reduces resource usage (its threads are slowly reclaimed
during periods of non-use, and reinstated upon subsequent use).
For applications that require separate or custom pools, a
ForkJoinPool may be constructed with a given target parallelism level;
by default, equal to the number of available processors. The pool
attempts to maintain enough active (or available) threads by
dynamically adding, suspending, or resuming internal worker threads,
even if some tasks are stalled waiting to join others. However, no such
adjustments are guaranteed in the face of blocked I/O or other
unmanaged synchronization. The nested
ForkJoinPool.ManagedBlocker interface enables extension of the
kinds of synchronization accommodated.
In addition to execution and lifecycle control methods, this class
provides status check methods (for example getStealCount()) that are
intended to aid in developing, tuning, and monitoring fork/join
applications. Also, method toString() returns indications of pool state
in a convenient form for informal monitoring.
As is the case with other ExecutorServices, there are three main task
execution methods summarized in the following table. These are
designed to be used primarily by clients not already engaged in
fork/join computations in the current pool. The main forms of these
methods accept instances of ForkJoinTask, but overloaded forms also
allow mixed execution of plain Runnable- or Callable- based activities
as well. However, tasks that are already executing in a pool should
normally instead use the within-computation forms listed in the table
unless using async event-style tasks that are not usually joined, in
which case there is little difference among choice of methods.
用于运行ForkJoinTasks的ExecutorService。 ForkJoinPool提供非ForkJoinTask客户端提交的入口点,以及管理和监视操作。
ForkJoinPool与其他类型的ExecutorService的不同之处主要在于使用了工作窃取:池中的所有线程都尝试查找和执行提交给池的任务和/或由其他活动任务创建的任务(如果不存在则会最终会阻塞等待) 。当大多数任务产生子任务(大多数ForkJoinTasks都如此),以及从外部客户端向池提交许多小任务时,均可以实现高效处理。特别是在构造函数中将asyncMode设置为true时,ForkJoinPools也可能适用于不会join的事件形式的任务。
静态commonPool()适用于大多数应用程序。common pool由未显式提交到指定池的任何ForkJoinTask使用。使用common pool通常会减少资源使用(其线程在不使用期间缓慢回收,并在后续使用时恢复)。
对于需要单独或自定义池的应用程序,可以使用目标并行度构造ForkJoinPool;默认情况下,等于可用处理器的数量。池尝试通过动态添加、挂起或恢复内部工作线程来维护足够的活动(或可用)线程,即使某些任务停下来等待join其他任务也是如此。但是,面对阻塞的I/O或其他非可控的同步,不能保证这样的调整。嵌套的ForkJoinPool.ManagedBlocker接口可以扩展所容纳的同步类型。
除了执行和生命周期控制方法之外,此类还提供状态检查方法(例如getStealCount()),旨在帮助开发、调优和监视fork / join应用程序。此外,方法toString()以方便的形式返回池状态。
与其他ExecutorServices的情况一样,下表总结了三种主要的任务执行方法。这些主要用于当前池中尚未参与fork / join计算的客户端。这些方法的主要形式接受ForkJoinTask的实例,但重载形式也允许混合执行基于Runnable或基于Callable的简单活动。但是,已经在池中执行的任务通常应该使用表中列出的内部计算形式,除非使用非joined的异步事件形式的任务,在这种情况下,这些方法几乎没有区别。
默认情况下,common pool使用默认参数构造,但可以通过设置三个系统属性来控制:
- java.util.concurrent.ForkJoinPool.common.parallelism - 并行度,非负整数
- java.util.concurrent.ForkJoinPool.common.threadFactory - the class name of a ForkJoinPool.ForkJoinWorkerThreadFactory
- java.util.concurrent.ForkJoinPool.common.exceptionHandler - the class name of a Thread.UncaughtExceptionHandler
如果存在SecurityManager且未指定工厂,则默认池使用生产无任何权限线程的线程工厂。 系统类加载器用于加载这些类。 如果在建立这些设置时出现任何错误,则使用默认参数。 通过将parallelism属性设置为零和/或使用可能返回null的工厂,可以禁用或限制公共池中线程的使用。 但是,这样做可能会导致永远不会执行unjoined的任务。
实现说明:此实现将最大运行线程数限制为32767.尝试创建大于最大数量的池会导致IllegalArgumentException。
仅当池关闭或内部资源耗尽时,此实现才会拒绝提交的任务(即通过抛出RejectedExecutionException)。
2.实现说明
2.1 Implementation Overview
Implementation Overview
This class and its nested classes provide the main
functionality and control for a set of worker threads:
Submissions from non-FJ threads enter into submission queues.
Workers take these tasks and typically split them into subtasks
that may be stolen by other workers. Preference rules give
first priority to processing tasks from their own queues (LIFO
or FIFO, depending on mode), then to randomized FIFO steals of
tasks in other queues. This framework began as vehicle for
supporting tree-structured parallelism using work-stealing.
Over time, its scalability advantages led to extensions and
changes to better support more diverse usage contexts. Because
most internal methods and nested classes are interrelated,
their main rationale and descriptions are presented here;
individual methods and nested classes contain only brief
comments about details.
此类及其内部类为一组工作线程提供主要功能和控制:来自非FJ线程的提交进入提交队列。 工作线程取走这些任务并通常将其分成子任务,子任务可能被其他线程窃取。 首选规则优先处理自己队列的(LIFO或FIFO,取决于模式)任务,然后随机FIFO窃取其他队列中的任务。 该框架开始用作支持树结构并行的工具,使用工作窃取。 随着时间的推移,其可扩展性优势引发扩展和更改,以更好地支持更多样化的使用环境。 因为大多数内部方法和嵌套类是相互关联的,所以它们的主要原理和描述都在这里给出; 单个方法和嵌套类仅包含简短注释。
2.2 WorkQueues
WorkQueues
==========
Most operations occur within work-stealing queues (in nested
class WorkQueue). These are special forms of Deques that
support only three of the four possible end-operations -- push,
pop, and poll (aka steal), under the further constraints that
push and pop are called only from the owning thread (or, as
extended here, under a lock), while poll may be called from
other threads. (If you are unfamiliar with them, you probably
want to read Herlihy and Shavit's book "The Art of
Multiprocessor programming", chapter 16 describing these in
more detail before proceeding.) The main work-stealing queue
design is roughly similar to those in the papers "Dynamic
Circular Work-Stealing Deque" by Chase and Lev, SPAA 2005
(http://research.sun.com/scalable/pubs/index.html) and
"Idempotent work stealing" by Michael, Saraswat, and Vechev,
PPoPP 2009 (http://portal.acm.org/citation.cfm?id=1504186).
The main differences ultimately stem from GC requirements that
we null out taken slots as soon as we can, to maintain as small
a footprint as possible even in programs generating huge
numbers of tasks. To accomplish this, we shift the CAS
arbitrating pop vs poll (steal) from being on the indices
("base" and "top") to the slots themselves.
Adding tasks then takes the form of a classic array push(task):
q.array[q.top] = task; ++q.top;
(The actual code needs to null-check and size-check the array,
properly fence the accesses, and possibly signal waiting
workers to start scanning -- see below.) Both a successful pop
and poll mainly entail a CAS of a slot from non-null to null.
The pop operation (always performed by owner) is:
if ((base != top) and
(the task at top slot is not null) and
(CAS slot to null))
decrement top and return task;
And the poll operation (usually by a stealer) is
if ((base != top) and
(the task at base slot is not null) and
(base has not changed) and
(CAS slot to null))
increment base and return task;
Because we rely on CASes of references, we do not need tag bits
on base or top. They are simple ints as used in any circular
array-based queue (see for example ArrayDeque). Updates to the
indices guarantee that top == base means the queue is empty,
but otherwise may err on the side of possibly making the queue
appear nonempty when a push, pop, or poll have not fully
committed. (Method isEmpty() checks the case of a partially
completed removal of the last element.) Because of this, the
poll operation, considered individually, is not wait-free. One
thief cannot successfully continue until another in-progress
one (or, if previously empty, a push) completes. However, in
the aggregate, we ensure at least probabilistic
non-blockingness. If an attempted steal fails, a thief always
chooses a different random victim target to try next. So, in
order for one thief to progress, it suffices for any
in-progress poll or new push on any empty queue to
complete. (This is why we normally use method pollAt and its
variants that try once at the apparent base index, else
consider alternative actions, rather than method poll, which
retries.)
This approach also enables support of a user mode in which
local task processing is in FIFO, not LIFO order, simply by
using poll rather than pop. This can be useful in
message-passing frameworks in which tasks are never joined.
However neither mode considers affinities, loads, cache
localities, etc, so rarely provide the best possible
performance on a given machine, but portably provide good
throughput by averaging over these factors. Further, even if
we did try to use such information, we do not usually have a
basis for exploiting it. For example, some sets of tasks
profit from cache affinities, but others are harmed by cache
pollution effects. Additionally, even though it requires
scanning, long-term throughput is often best using random
selection rather than directed selection policies, so cheap
randomization of sufficient quality is used whenever
applicable. Various Marsaglia XorShifts (some with different
shift constants) are inlined at use points.
WorkQueues are also used in a similar way for tasks submitted
to the pool. We cannot mix these tasks in the same queues used
by workers. Instead, we randomly associate submission queues
with submitting threads, using a form of hashing. The
ThreadLocalRandom probe value serves as a hash code for
choosing existing queues, and may be randomly repositioned upon
contention with other submitters. In essence, submitters act
like workers except that they are restricted to executing local
tasks that they submitted (or in the case of CountedCompleters,
others with the same root task). Insertion of tasks in shared
mode requires a lock (mainly to protect in the case of
resizing) but we use only a simple spinlock (using field
qlock), because submitters encountering a busy queue move on to
try or create other queues -- they block only when creating and
registering new queues. Additionally, "qlock" saturates to an
unlockable value (-1) at shutdown. Unlocking still can be and
is performed by cheaper ordered writes of "qlock" in successful
cases, but uses CAS in unsuccessful cases.
大多数操作发生在工作窃取队列中(在嵌套类WorkQueue中)。这些特殊形式的Deques只支持四种可能的端操作中的三种 - push,pop和poll(也称为steal),进一步的限制是push和pop仅从拥有的线程调用,而poll可以从其他线程调用。 (如果你不熟悉,可以参阅Herlihy和Shavit的书“"The Art of Multiprocessor programming"”第16章)。主要的工作窃取队列设计大致类似于Chase和Lev撰写的“Dynamic Circular Work-Stealing Deque”和Michael、Saraswat和Vechev的“Idempotent work stealing”。主要差异最终源于GC要求,这里会尽快将slot置为null,即使在生成大量任务的程序中也尽可能保持足够小的痕迹。为了实现这一点,将CAS仲裁pop vs poll(steal)从索引(“base”和“top”)转移到插槽本身。
然后添加任务采用经典数组push(task)的形式:
q.array[q.top] = task; ++ q.top;
(实际代码需要对数组进行空检查和大小检查,正确阻止访问,并signal等待线程开始进行扫描 - 见下文。)成功的pop和poll主要需要一个将槽从non-null 转换为 null的CAS。
The pop operation (always performed by owner) is:
if ((base != top) and
(the task at top slot is not null) and
(CAS slot to null))
decrement top and return task;
And the poll operation (usually by a stealer) is
if ((base != top) and
(the task at base slot is not null) and
(base has not changed) and
(CAS slot to null))
increment base and return task;
因为我们依赖于引用的CAS,所以不需要base或top的标记位。它们只是任何基于循环数组的队列中使用的简单整数(参见ArrayDeque)。对索引的更新保证top == base意味着队列是空的,但是当push、pop或者poll没有完全提交时,可能会使队列显得非空。 (方法isEmpty检查部分完成最后一个元素删除的情况。)因此,单独考虑的poll操作不是wait-free。一个小偷无法成功继续,直到另一个正在执行的操作(或者,如果先前为空,push)完成。但是,总的来说,我们至少确保概率无阻塞。如果尝试窃取失败,小偷总是会选择一个不同的随机目标来尝试。因此,为了让一个小偷继续处理,任何正在进行的poll或任何空队列的新push都可以完成。 (这就是为什么我们通常使用方法pollAt及其在base索引上尝试一次的变体,否则考虑替代操作,而不是重试的方法poll。)
该方法还支持用户模式,本地任务以FIFO顺序而不是LIFO进行处理,只需使用poll而不是pop。这在任务不用joined的消息传递框架中非常有用。然而,两种模式都不考虑关联性、负载、缓存本地等,因此很少在给定的机器上提供最佳性能,但通过对这些因素的平衡可以提供良好的吞吐量。此外,即使尝试使用这些信息,通常也没有开发它的基础。例如,一些任务集从缓存关联性中获益,但其他任务受到缓存污染的伤害。此外,即使需要扫描,长期吞吐量通常最好使用随机选择而不是定向选择策略,因此在适用的情况下使用足够质量的廉价随机化。各种Marsaglia XorShifts(一些具有不同的移位常数)在使用点处内联。
WorkQueues也以类似的方式用于提交(submit)到池的任务。不能将这些任务混合放入工作线程使用的同一个队列。相反,使用散列形式随机将提交队列与提交线程相关联。 ThreadLocalRandom探测器值用作选择现有队列的哈希码,并且在与其他提交者争用时随机重新定位。本质上,提交者的行为类似于工作线程,除了他们被限制执行提交的本地任务(或者在CountedCompleters的情况下,其他具有相同的根任务)。在共享模式下插入任务需要锁定(主要是为了在调整大小的情况下保护),但只使用一个简单的自旋锁(使用字段qlock),因为遇到忙队列的提交者会继续尝试或创建其他队列 - 仅在创建和注册新队列时会阻塞。此外,“qlock”在关必时设置为可解锁值(-1)。在成功情况中,解锁仍然可以通过更便宜的“qlock”有序写入来执行,但在不成功的情况下使用CAS。
2.3 Management
Management
==========
The main throughput advantages of work-stealing stem from
decentralized control -- workers mostly take tasks from
themselves or each other, at rates that can exceed a billion
per second. The pool itself creates, activates (enables
scanning for and running tasks), deactivates, blocks, and
terminates threads, all with minimal central information.
There are only a few properties that we can globally track or
maintain, so we pack them into a small number of variables,
often maintaining atomicity without blocking or locking.
Nearly all essentially atomic control state is held in two
volatile variables that are by far most often read (not
written) as status and consistency checks. (Also, field
"config" holds unchanging configuration state.)
Field "ctl" contains 64 bits holding information needed to
atomically decide to add, inactivate, enqueue (on an event
queue), dequeue, and/or re-activate workers. To enable this
packing, we restrict maximum parallelism to (1<<15)-1 (which is
far in excess of normal operating range) to allow ids, counts,
and their negations (used for thresholding) to fit into 16bit
subfields.
Field "runState" holds lockable state bits (STARTED, STOP, etc)
also protecting updates to the workQueues array. When used as
a lock, it is normally held only for a few instructions (the
only exceptions are one-time array initialization and uncommon
resizing), so is nearly always available after at most a brief
spin. But to be extra-cautious, after spinning, method
awaitRunStateLock (called only if an initial CAS fails), uses a
wait/notify mechanics on a builtin monitor to block when
(rarely) needed. This would be a terrible idea for a highly
contended lock, but most pools run without the lock ever
contending after the spin limit, so this works fine as a more
conservative alternative. Because we don't otherwise have an
internal Object to use as a monitor, the "stealCounter" (an
AtomicLong) is used when available (it too must be lazily
initialized; see externalSubmit).
Usages of "runState" vs "ctl" interact in only one case:
deciding to add a worker thread (see tryAddWorker), in which
case the ctl CAS is performed while the lock is held.
Recording WorkQueues. WorkQueues are recorded in the
"workQueues" array. The array is created upon first use (see
externalSubmit) and expanded if necessary. Updates to the
array while recording new workers and unrecording terminated
ones are protected from each other by the runState lock, but
the array is otherwise concurrently readable, and accessed
directly. We also ensure that reads of the array reference
itself never become too stale. To simplify index-based
operations, the array size is always a power of two, and all
readers must tolerate null slots. Worker queues are at odd
indices. Shared (submission) queues are at even indices, up to
a maximum of 64 slots, to limit growth even if array needs to
expand to add more workers. Grouping them together in this way
simplifies and speeds up task scanning.
All worker thread creation is on-demand, triggered by task
submissions, replacement of terminated workers, and/or
compensation for blocked workers. However, all other support
code is set up to work with other policies. To ensure that we
do not hold on to worker references that would prevent GC, All
accesses to workQueues are via indices into the workQueues
array (which is one source of some of the messy code
constructions here). In essence, the workQueues array serves as
a weak reference mechanism. Thus for example the stack top
subfield of ctl stores indices, not references.
Queuing Idle Workers. Unlike HPC work-stealing frameworks, we
cannot let workers spin indefinitely scanning for tasks when
none can be found immediately, and we cannot start/resume
workers unless there appear to be tasks available. On the
other hand, we must quickly prod them into action when new
tasks are submitted or generated. In many usages, ramp-up time
to activate workers is the main limiting factor in overall
performance, which is compounded at program start-up by JIT
compilation and allocation. So we streamline this as much as
possible.
The "ctl" field atomically maintains active and total worker
counts as well as a queue to place waiting threads so they can
be located for signalling. Active counts also play the role of
quiescence indicators, so are decremented when workers believe
that there are no more tasks to execute. The "queue" is
actually a form of Treiber stack. A stack is ideal for
activating threads in most-recently used order. This improves
performance and locality, outweighing the disadvantages of
being prone to contention and inability to release a worker
unless it is topmost on stack. We park/unpark workers after
pushing on the idle worker stack (represented by the lower
32bit subfield of ctl) when they cannot find work. The top
stack state holds the value of the "scanState" field of the
worker: its index and status, plus a version counter that, in
addition to the count subfields (also serving as version
stamps) provide protection against Treiber stack ABA effects.
Field scanState is used by both workers and the pool to manage
and track whether a worker is INACTIVE (possibly blocked
waiting for a signal), or SCANNING for tasks (when neither hold
it is busy running tasks). When a worker is inactivated, its
scanState field is set, and is prevented from executing tasks,
even though it must scan once for them to avoid queuing
races. Note that scanState updates lag queue CAS releases so
usage requires care. When queued, the lower 16 bits of
scanState must hold its pool index. So we place the index there
upon initialization (see registerWorker) and otherwise keep it
there or restore it when necessary.
Memory ordering. See "Correct and Efficient Work-Stealing for
Weak Memory Models" by Le, Pop, Cohen, and Nardelli, PPoPP 2013
(http://www.di.ens.fr/~zappa/readings/ppopp13.pdf) for an
analysis of memory ordering requirements in work-stealing
algorithms similar to the one used here. We usually need
stronger than minimal ordering because we must sometimes signal
workers, requiring Dekker-like full-fences to avoid lost
signals. Arranging for enough ordering without expensive
over-fencing requires tradeoffs among the supported means of
expressing access constraints. The most central operations,
taking from queues and updating ctl state, require full-fence
CAS. Array slots are read using the emulation of volatiles
provided by Unsafe. Access from other threads to WorkQueue
base, top, and array requires a volatile load of the first of
any of these read. We use the convention of declaring the
"base" index volatile, and always read it before other fields.
The owner thread must ensure ordered updates, so writes use
ordered intrinsics unless they can piggyback on those for other
writes. Similar conventions and rationales hold for other
WorkQueue fields (such as "currentSteal") that are only written
by owners but observed by others.
Creating workers. To create a worker, we pre-increment total
count (serving as a reservation), and attempt to construct a
ForkJoinWorkerThread via its factory. Upon construction, the
new thread invokes registerWorker, where it constructs a
WorkQueue and is assigned an index in the workQueues array
(expanding the array if necessary). The thread is then
started. Upon any exception across these steps, or null return
from factory, deregisterWorker adjusts counts and records
accordingly. If a null return, the pool continues running with
fewer than the target number workers. If exceptional, the
exception is propagated, generally to some external caller.
Worker index assignment avoids the bias in scanning that would
occur if entries were sequentially packed starting at the front
of the workQueues array. We treat the array as a simple
power-of-two hash table, expanding as needed. The seedIndex
increment ensures no collisions until a resize is needed or a
worker is deregistered and replaced, and thereafter keeps
probability of collision low. We cannot use
ThreadLocalRandom.getProbe() for similar purposes here because
the thread has not started yet, but do so for creating
submission queues for existing external threads.
Deactivation and waiting. Queuing encounters several intrinsic
races; most notably that a task-producing thread can miss
seeing (and signalling) another thread that gave up looking for
work but has not yet entered the wait queue. When a worker
cannot find a task to steal, it deactivates and enqueues. Very
often, the lack of tasks is transient due to GC or OS
scheduling. To reduce false-alarm deactivation, scanners
compute checksums of queue states during sweeps. (The
stability checks used here and elsewhere are probabilistic
variants of snapshot techniques -- see Herlihy & Shavit.)
Workers give up and try to deactivate only after the sum is
stable across scans. Further, to avoid missed signals, they
repeat this scanning process after successful enqueuing until
again stable. In this state, the worker cannot take/run a task
it sees until it is released from the queue, so the worker
itself eventually tries to release itself or any successor (see
tryRelease). Otherwise, upon an empty scan, a deactivated
worker uses an adaptive local spin construction (see awaitWork)
before blocking (via park). Note the unusual conventions about
Thread.interrupts surrounding parking and other blocking:
Because interrupts are used solely to alert threads to check
termination, which is checked anyway upon blocking, we clear
status (using Thread.interrupted) before any call to park, so
that park does not immediately return due to status being set
via some other unrelated call to interrupt in user code.
Signalling and activation. Workers are created or activated
only when there appears to be at least one task they might be
able to find and execute. Upon push (either by a worker or an
external submission) to a previously (possibly) empty queue,
workers are signalled if idle, or created if fewer exist than
the given parallelism level. These primary signals are
buttressed by others whenever other threads remove a task from
a queue and notice that there are other tasks there as well.
On most platforms, signalling (unpark) overhead time is
noticeably long, and the time between signalling a thread and
it actually making progress can be very noticeably long, so it
is worth offloading these delays from critical paths as much as
possible. Also, because inactive workers are often rescanning
or spinning rather than blocking, we set and clear the "parker"
field of WorkQueues to reduce unnecessary calls to unpark.
(This requires a secondary recheck to avoid missed signals.)
Trimming workers. To release resources after periods of lack of
use, a worker starting to wait when the pool is quiescent will
time out and terminate (see awaitWork) if the pool has remained
quiescent for period IDLE_TIMEOUT, increasing the period as the
number of threads decreases, eventually removing all workers.
Also, when more than two spare threads exist, excess threads
are immediately terminated at the next quiescent point.
(Padding by two avoids hysteresis.)
Shutdown and Termination. A call to shutdownNow invokes
tryTerminate to atomically set a runState bit. The calling
thread, as well as every other worker thereafter terminating,
helps terminate others by setting their (qlock) status,
cancelling their unprocessed tasks, and waking them up, doing
so repeatedly until stable (but with a loop bounded by the
number of workers). Calls to non-abrupt shutdown() preface
this by checking whether termination should commence. This
relies primarily on the active count bits of "ctl" maintaining
consensus -- tryTerminate is called from awaitWork whenever
quiescent. However, external submitters do not take part in
this consensus. So, tryTerminate sweeps through queues (until
stable) to ensure lack of in-flight submissions and workers
about to process them before triggering the "STOP" phase of
termination. (Note: there is an intrinsic conflict if
helpQuiescePool is called when shutdown is enabled. Both wait
for quiescence, but tryTerminate is biased to not trigger until
helpQuiescePool completes.)
工作窃取的主要吞吐量优势源于分散控制 - 工作线程大多从自己或其他队列获取任务,速度可超过每秒10亿。池本身创建、激活(启用扫描和运行任务)、停用、阻塞和终止线程,所有这些操作都只需要最少的全局信息。需要全局跟踪或维护属性比较少,因此将它们打包成少量变量,通常不需要通过阻塞或锁来保持原子性。几乎所有原子控制状态都保存在两个volatile变量中,这些变量最常作为状态被读取(而非写)和一致性检查。 (另外,字段“config”保持不变的配置状态。)
字段“ctl”包含64位,保存所需的信息用于原子地决定添加、停用、入队(在事件队列上)、出队和/或重新激活工作工作线程。为了实现这种打包,将最大并行度限制为(1 << 15)-1(远远超过正常工作范围),以允许ids,counts及其negations(用于阈值处理)适合16位子域。
字段“runState”保存锁状态位(STARTED,STOP等),同时保护对workQueues数组的更新。当用作锁时,它通常仅用于少量指令(唯一的例外是一次性数组初始化和不常见的大小调整),因此在最多短暂自旋后几乎总是可用。但要谨慎,在自旋后,方法awaitRunStateLock(仅在初始CAS失败时调用),需要时在内置monitor(锁-管程)使用等待/通知机制来阻塞。对于高度争用的锁来说这是一个可怕的想法,但是大多数池在自旋后没有锁的情况下运行,因此这可以作为保守的替代方案。因为没有将内部Object用作锁,所以在可用时会使用“stealCounter”(AtomicLong)(它也必须被懒惰地初始化;请参阅externalSubmit)。
“runState”与“ctl”的使用仅在一种情况下交互:决定是否添加工作线程(请参阅tryAddWorker),在这种情况下,在保持锁定时执行ctl CAS。
记录WorkQueues。 WorkQueues记录在“workQueues”数组中。首次使用时会创建该数组(请参阅externalSubmit)并在必要时进行扩展。在记录新工作线程和取消终止的线程时对数组的更新由runState锁保护,但该数组可以同时并发读取并直接访问。还确保数组引用本身的读取永远不会过时。为了简化基于索引的操作,数组大小始终是2的幂,并且所有读取线程必须容忍空槽。工作线程队列是奇数索引。共享(submission)队列在偶数索引处,最多64个插槽以限制增长,即使数组需要扩展以添加更多工作线程。以这种方式将它们组合在一起简化并加速了任务扫描。
所有工作线程创建都是按需创建的,由任务提交、替换已终止的工作线程和/或对被阻塞的工作线程补偿来触发。但是,所有其他支持代码都设置为与其他策略一起使用。为了确保不会持有不会被GC的工作线程引用,所有对workQueues的访问都是通过workQueues数组索引(这里是一些凌乱的代码结构的一个来源)。实质上,workQueues数组充当弱引用机制。因此,例如,栈顶部ctl的子字段存储索引,而不是引用。
排队空闲工作线程。与HPC工作窃取框架不同,不能让工作线程在无法立即找到任何工作时无限期地扫描任务,不能启动/恢复工作线程除非有任务需要处理。另一方面,必须在提交或生成新任务时迅速采取行动。在许多用途中,激活工作线程的加速时间是整体性能的主要限制因素,在JIT编译和分配的程序启动时更加复杂。所以我们尽可能地简化了这一点。
“ctl”字段原子地维护活动和总工作线程计数以及放置等待线程的队列,以便在signal时方便定位。活动计数也起到静止指示符的作用,因此当工作线程认为不再执行任务时会减少。 “队列”实际上是Treiber栈的一种形式。栈非常适合以最近使用的顺序激活线程。这样可以提高性能和局部性,这些优点比如下缺点更重要:容易发生争用和无法释放工作线程,除非它是栈顶线程。当线程找不到任务时,在压入栈后(由ctl的低32位子字段表示)park/unpark工作线程。顶部栈状态保存worker的“scanState”字段的值:其索引和状态,以及除计数子字段(也用作版本标记)之外的版本计数器,防止reiber栈ABA问题。
工作线程和池使用字段scanState来管理和跟踪工作线程是否处于INACTIVE状态(可能是阻塞等待信号),还是SCANNING任务(当他们都没有忙于运行任务时)。当一个worker被停用时,它的scanState字段被设置,并且被阻止执行任务,即使它必须扫描一次以避免排队竞争。请注意,scanState更新滞后于队列CAS 释放,因此需要小心使用。排队时,scanState的低16位必须保存其池索引。所以在初始化时将索引放在那里(参见registerWorker),将其保存在那里或在必要时恢复它。
内存排序。请参阅Le、Pop、Cohen和Nardelli的“Correct and Efficient Work-Stealing for
Weak Memory Models”,文中分析的工作窃取算法中的内存排序要求类似于此处使用的算法。通常需要强于最小排序,因为有时必须向工作线程发出信号signal,需要Dekker-like的全屏障以避免丢失信号。在没有昂贵的过度屏障的情况下安排足够的排序需要在支持的访问限制的方式之间进行权衡。最核心的操作:从队列取取元素和更新ctl状态需要全屏障CAS。使用Unsafe提供的volatile模拟读操作数组槽。其他线程访问WorkQueue base、top和array需要对这些读取中的第一个进行volatile加载。使用volatile的base索引的约定,并始终在其他字段之前读取它。所有者线程必须确保有序更新,因此写入使用有序内部函数,除非他们可以搭载其他写入。类似的约定和基本原理适用于其他WorkQueue字段(例如“currentSteal”),这些字段仅由所有者写入但由其他人读取。
创建工作线程。为了创建一个worker,预先增加总计数(用作保留),并尝试通过其工厂构造一个ForkJoinWorkerThread。在构造时,新线程调用registerWorker,在其中构造WorkQueue并在workQueues数组中分配索引(如果需要,扩展数组)。然后启动该线程。如果这些步骤之间出现任何异常,或者从工厂返回null,则deregisterWorker会相应地调整计数和记录。如果返回null,则池继续以少于目标数字的worker运行。如果发生异常,则异常通常传播给某些外部调用者。工作线程索引的分配避免了可能发生的扫描偏差:如果条目是在workQueues数组的前面开始顺序打包的。将数组视为一个简单的二次幂哈希表,根据需要进行扩展。 seedIndex增量确保不会发生冲突,直到需要调整大小或者取消注册和替换worker,然后保持较低的冲突概率。不能在此处使用ThreadLocalRandom.getProbe()用于类似目的,因为线程尚未启动,但是为现有外部线程创建提交队列会这么做。
停止和等待。排队遇到几个内部竞争;最值得注意的是,生成任务的线程可能会错过查看(并发出信号)另一个放弃寻找工作但尚未进入等待队列的线程。当工作线程找不到要偷的任务时,它会停止并入队。通常,由于GC或OS调度,缺少任务是暂时的。为了减少错误警报停止,扫描程序在清除sweep期间计算队列状态的校验和。 (此处和其他地方使用的稳定性检查是快照技术的概率变体 - 请参阅Herlihy&Shavit。)工作线程放弃并尝试停止仅在扫描期间总和稳定后发生。此外,为避免遗漏信号,在成功入队后重复此扫描过程,直到再次稳定。在这种状态下,工作线程在从队列中释放之前不能take/run它看到的任务,因此工作线程本身最终会尝试释放自己或任何后继者(请参阅tryRelease)。否则,在空扫描时,停用的工作线程在阻塞(通过park)之前使用自适应本地自旋构造(请参阅awaitWork)。注意有关Thread.interrupts周围park和其他阻塞的不寻常约定:因为中断仅用于警告线程检查终止,在阻塞时总是检查终止,在任何调用park之前清除状态(使用Thread.interrupted):由于状态是通过用户代码中的其他无关的中断调用设置的,因此park不会立即返回。
信号和激活。仅当看起来至少有一个能够找到并执行的任务时,才会创建或激活工作线程。在push(由工作线程或外部提交)压入先前(可能)空队列时,工作线程在空闲时会被signal,或者如果存在的数量少于给定的并行度时会创建新线程。每当其他线程从队列中移除任务时注意到还有其他任务时,其他线程也会进行signal。在大多数平台上,信号(unpark)开销时间明显很长,并且发信号通知线程和线程实际开始处理之间的时间可能非常长,因此值得尽可能地从关键路径减少这些延迟。此外,由于非活动工作线程经常重新扫描或自旋而不是阻塞,设置并清除WorkQueues的“parker”字段以减少对unpark的不必要调用。 (这需要进行二次重新检查以避免错过信号。)
修剪工作线程。要在周期性的缺少使用后释放资源,当池静止时开始等待的工作线程将超时并终止(请参阅awaitWork)如果池在IDLE_TIMEOUT期间保持静止,则随着线程数减少而增加周期,最终删除所有工作线程。此外,当存在两个以上的备用线程时,多余的线程会立即在下一个静止点终止。 (两个填充可以避免滞后现象。)
Shutdown和Termination.。调用shutdownNow会进一步调用tryTerminate,其以原子方式设置runState位。调用线程以及此后终止的所有其他工作线程,通过设置其他线程的(qlock)状态、取消未处理的任务并唤醒它们来帮助终止其他线程,重复这样做直到稳定。通过检查终止是否应该开始来调用非突然shutdown()。这主要依赖于“ctl”的活动计数位 - 每当静止时,都会从awaitWork调用tryTerminate。但是,外部提交者不参与此共识。因此,tryTerminate清除队列(直到稳定)以确保缺少飞行中的提交,以及工作线程在触发终止的“停止”阶段之前处理它们。 (注意:如果在启用关闭时调用了helpQuiescePool,则会发生内在冲突。两者都等待静止,但是在helpQuiescePool完成之前,tryTerminate偏向于不会触发。)
2.4 Joining Tasks
Joining Tasks
=============
Any of several actions may be taken when one worker is waiting
to join a task stolen (or always held) by another. Because we
are multiplexing many tasks on to a pool of workers, we can't
just let them block (as in Thread.join). We also cannot just
reassign the joiner's run-time stack with another and replace
it later, which would be a form of "continuation", that even if
possible is not necessarily a good idea since we may need both
an unblocked task and its continuation to progress. Instead we
combine two tactics:
Helping: Arranging for the joiner to execute some task that it
would be running if the steal had not occurred.
Compensating: Unless there are already enough live threads,
method tryCompensate() may create or re-activate a spare
thread to compensate for blocked joiners until they unblock.
A third form (implemented in tryRemoveAndExec) amounts to
helping a hypothetical compensator: If we can readily tell that
a possible action of a compensator is to steal and execute the
task being joined, the joining thread can do so directly,
without the need for a compensation thread (although at the
expense of larger run-time stacks, but the tradeoff is
typically worthwhile).
The ManagedBlocker extension API can't use helping so relies
only on compensation in method awaitBlocker.
The algorithm in helpStealer entails a form of "linear
helping". Each worker records (in field currentSteal) the most
recent task it stole from some other worker (or a submission).
It also records (in field currentJoin) the task it is currently
actively joining. Method helpStealer uses these markers to try
to find a worker to help (i.e., steal back a task from and
execute it) that could hasten completion of the actively joined
task. Thus, the joiner executes a task that would be on its
own local deque had the to-be-joined task not been stolen. This
is a conservative variant of the approach described in Wagner &
Calder "Leapfrogging: a portable technique for implementing
efficient futures" SIGPLAN Notices, 1993
(http://portal.acm.org/citation.cfm?id=155354). It differs in
that: (1) We only maintain dependency links across workers upon
steals, rather than use per-task bookkeeping. This sometimes
requires a linear scan of workQueues array to locate stealers,
but often doesn't because stealers leave hints (that may become
stale/wrong) of where to locate them. It is only a hint
because a worker might have had multiple steals and the hint
records only one of them (usually the most current). Hinting
isolates cost to when it is needed, rather than adding to
per-task overhead. (2) It is "shallow", ignoring nesting and
potentially cyclic mutual steals. (3) It is intentionally
racy: field currentJoin is updated only while actively joining,
which means that we miss links in the chain during long-lived
tasks, GC stalls etc (which is OK since blocking in such cases
is usually a good idea). (4) We bound the number of attempts
to find work using checksums and fall back to suspending the
worker and if necessary replacing it with another.
Helping actions for CountedCompleters do not require tracking
currentJoins: Method helpComplete takes and executes any task
with the same root as the task being waited on (preferring
local pops to non-local polls). However, this still entails
some traversal of completer chains, so is less efficient than
using CountedCompleters without explicit joins.
Compensation does not aim to keep exactly the target
parallelism number of unblocked threads running at any given
time. Some previous versions of this class employed immediate
compensations for any blocked join. However, in practice, the
vast majority of blockages are transient byproducts of GC and
other JVM or OS activities that are made worse by replacement.
Currently, compensation is attempted only after validating that
all purportedly active threads are processing tasks by checking
field WorkQueue.scanState, which eliminates most false
positives. Also, compensation is bypassed (tolerating fewer
threads) in the most common case in which it is rarely
beneficial: when a worker with an empty queue (thus no
continuation tasks) blocks on a join and there still remain
enough threads to ensure liveness.
The compensation mechanism may be bounded. Bounds for the
commonPool (see commonMaxSpares) better enable JVMs to cope
with programming errors and abuse before running out of
resources to do so. In other cases, users may supply factories
that limit thread construction. The effects of bounding in this
pool (like all others) is imprecise. Total worker counts are
decremented when threads deregister, not when they exit and
resources are reclaimed by the JVM and OS. So the number of
simultaneously live threads may transiently exceed bounds.
当一个工作线程等待join另一个被盗(或总是持有)的任务时,可以采取任何一种行动。因为我们将许多任务复用到工作池上,所以不能让它们阻塞(如在Thread.join中)。也不能只是将joiner的运行时栈重新分配给另一个并稍后替换它,这将是一种“continuation”,即使可能也不一定是个好主意,因为我们可能需要一个未阻塞的任务并且其continuation也在进展。相反,我们结合了两种策略:
- 帮助:如果没有发生窃取,安排joiner执行一些将要运行的任务。
- 补偿:除非已有足够的活动线程,否则方法tryCompensate()可以创建或重新激活备用线程,以补偿阻塞的joiners,直到它们解除阻塞。
第三种形式(在tryRemoveAndExec中实现)相当于帮助假设的补偿器:如果我们可以很容易地告诉补偿器要窃取的可能动作并执行正在被joined的任务,则joining线程可以直接执行,而不需要补偿线程(虽然以较大的运行时栈为代价,但该权衡通常是值得的)。
ManagedBlocker扩展API无法使用helping,因此仅依赖于方法awaitBlocker中的补偿。
helpStealer中的算法需要一种“线性帮助”的形式。每个工作线程(在字段currentSteal中)记录它从其他工作线程(或提交)中窃取的最新任务。它还记录(在currentJoin中)它当前正在joining的任务。方法helpStealer使用这些标记来尝试找到一个工作线程来帮助(即,从一个工作线程中窃取一个任务并执行它),这可以加速主动joined任务的完成。因此,如果待加入的任务没有被盗,则joiner执行将在其自己的本地deque上的任务。这是Wagner&Calder所描述的方法的保守变体“Leapfrogging:a portable technique for implementing efficient futures”。不同之处在于:
- 1)只是在窃取时维护工作线程之间的依赖关系,而不是使用每个任务的簿记。这有时需要对workQueues数组进行线性扫描以找到窃取者,但通常不会,因为窃取者留下提示(可能会陈旧/错误)的位置。这只是一个提示,因为一个工作线程可能有多次窃取,而提示只记录其中一个(通常是最新的)。提示可以隔离成本到需要的程度,而不是增加每个任务的开销。
- 2)它是“浅的”,忽略了嵌套和潜在的循环相互窃取。
- 3)故意racy:域currentJoin仅在主动joining时更新,这意味着在长期任务、GC停顿等期间会丢失链中的链接(这是正常的,因为在这种情况下阻塞通常是个好主意) 。
- 4)我们限制了使用校验和查找工作的次数,然后回退到暂停工作线程,必要时将其替换为另一工作线程。
对CountedCompleters的帮助操作不需要跟踪currentJoins:方法helpComplete获取并执行与等待的任务具有相同根的任何任务(更喜欢本地pop而非本地poll)。但是,这仍然需要遍历完整链,因此效率低于使用没有显式join的CountedCompleters。
补偿的目的不是准确保持在任何给定时间运行的未阻塞线程数目都是目标并行数。此类的某些先前版本对任何阻塞的join使用立即补偿。但是,在实践中,绝大多数阻塞都是GC和其他JVM或OS活动的短暂副作用,会因替代而变得更糟。目前,仅在通过检查字段WorkQueue.scanState验证所有声称活动的线程正在处理任务之后才尝试补偿,这消除了大多数误报。此外,在最常见的情况下,补偿被绕过(容忍更少的线程),因为带来的好处太少:当具有空队列(因此没有连续任务)的工作线程阻塞在join并且仍然保留足够的线程以确保活跃时。
补偿机制可能是有限制的。 commonPool的边界(请参阅commonMaxSpares)可以更好地使JVM在耗尽资源之前应对编程错误和滥用。在其他情况下,用户可能会提供限制线程创建的工厂。此池中的限制效果(与所有其他池一样)是不精确的。当线程取消注册时,工作线程总数减少,而不是当它们退出并且JVM和OS回收资源时。因此,同时活动线程的数量可能会暂时超过限制。
2.5 Common Pool
Common Pool
===========
The static common pool always exists after static
initialization. Since it (or any other created pool) need
never be used, we minimize initial construction overhead and
footprint to the setup of about a dozen fields, with no nested
allocation. Most bootstrapping occurs within method
externalSubmit during the first submission to the pool.
When external threads submit to the common pool, they can
perform subtask processing (see externalHelpComplete and
related methods) upon joins. This caller-helps policy makes it
sensible to set common pool parallelism level to one (or more)
less than the total number of available cores, or even zero for
pure caller-runs. We do not need to record whether external
submissions are to the common pool -- if not, external help
methods return quickly. These submitters would otherwise be
blocked waiting for completion, so the extra effort (with
liberally sprinkled task status checks) in inapplicable cases
amounts to an odd form of limited spin-wait before blocking in
ForkJoinTask.join.
As a more appropriate default in managed environments, unless
overridden by system properties, we use workers of subclass
InnocuousForkJoinWorkerThread when there is a SecurityManager
present. These workers have no permissions set, do not belong
to any user-defined ThreadGroup, and erase all ThreadLocals
after executing any top-level task (see WorkQueue.runTask).
The associated mechanics (mainly in ForkJoinWorkerThread) may
be JVM-dependent and must access particular Thread class fields
to achieve this effect.
静态初始化后,静态公共池始终存在。由于它(或任何其他创建的池)不需要使用,我们将初始构造开销和占用空间最小化到大约十二个字段的设置,没有嵌套分配。在第一次提交到池期间,大多数引导都发生在方法externalSubmit中。
当外部线程提交到公共池时,它们可以在join时执行子任务处理(请参阅externalHelpComplete和相关方法)。这种调用者帮助策略使得将公共池并行度级别设置为小于可用核心总数的一个(或更多)是明智的,对于纯调用者运行甚至为零。我们不需要记录外部提交是否属于公共池 - 如果不是,外部帮助方法会快速返回。否则这些提交者会阻塞等待完成,因此在不适用的情况下额外的努力(通过大量的任务状态检查)相当于在ForkJoinTask.join中阻塞之前的有限自旋等待的奇怪形式。
作为管理环境中更合适的默认设置,除非被系统属性覆盖,否则当存在SecurityManager时,我们使用子类InnocuousForkJoinWorkerThread的worker。这些worker没有设置权限,不属于任何用户定义的ThreadGroup,并且在执行任何顶级任务后擦除所有ThreadLocals(请参阅WorkQueue.runTask)。相关的机制(主要在ForkJoinWorkerThread中)可能依赖于JVM,并且必须访问特定的Thread类字段才能实现此效果。
2.6 Style notes
Style notes
===========
Memory ordering relies mainly on Unsafe intrinsics that carry
the further responsibility of explicitly performing null- and
bounds- checks otherwise carried out implicitly by JVMs. This
can be awkward and ugly, but also reflects the need to control
outcomes across the unusual cases that arise in very racy code
with very few invariants. So these explicit checks would exist
in some form anyway. All fields are read into locals before
use, and null-checked if they are references. This is usually
done in a "C"-like style of listing declarations at the heads
of methods or blocks, and using inline assignments on first
encounter. Array bounds-checks are usually performed by
masking with array.length-1, which relies on the invariant that
these arrays are created with positive lengths, which is itself
paranoically checked. Nearly all explicit checks lead to
bypass/return, not exception throws, because they may
legitimately arise due to cancellation/revocation during
shutdown.
There is a lot of representation-level coupling among classes
ForkJoinPool, ForkJoinWorkerThread, and ForkJoinTask. The
fields of WorkQueue maintain data structures managed by
ForkJoinPool, so are directly accessed. There is little point
trying to reduce this, since any associated future changes in
representations will need to be accompanied by algorithmic
changes anyway. Several methods intrinsically sprawl because
they must accumulate sets of consistent reads of fields held in
local variables. There are also other coding oddities
(including several unnecessary-looking hoisted null checks)
that help some methods perform reasonably even when interpreted
(not compiled).
The order of declarations in this file is (with a few exceptions):
(1) Static utility functions
(2) Nested (static) classes
(3) Static fields
(4) Fields, along with constants used when unpacking some of them
(5) Internal control methods
(6) Callbacks and other support for ForkJoinTask methods
(7) Exported methods
(8) Static block initializing statics in minimally dependent order
内存排序主要依赖于Unsafe内部机制,它还承担着明确执行空值和边界检查的责任,否则JVM会隐式执行这些检查。这种方式不够优雅,但也反映了在竞争非常激烈代码中控制结果的需要。所以这些显式检查无论如何都会以某种形式存在。所有字段在使用前都会读入本地,如果它们是引用则会进行空值检查。这通常在方法或块的头部以类似“C”的方式进行列表声明,并在第一次遇到时使用内联赋值。数组边界检查通常通过使用array.length-1掩码来执行,依赖于使用正长度创建这些数组的不变量,其本身就是异常检查。几乎所有显式检查都会导致绕过/返回,而不是异常抛出,因为它们可能由于在关闭期间取消/撤销而合法地出现。
类ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask之间有很多表示级耦合。 WorkQueue维护的数据结构的字段由ForkJoinPool管理,因此可以直接访问。试图减少这一点没有什么意义,因为表示中任何相关的未来变化都需要伴随着算法变化。有几种方法本质上是无用的,因为它们必须累积一组对局部变量中保存的字段的一致读取。还有其他奇怪编码(包括几个看似不必要的空值检查),这有助于某些方法即使在解释(未编译)时也能合理地执行。
此文件中的声明顺序是(除了少数例外):
- 1)静态实用程序
- 2)嵌套(静态)类
- 3)静态字段
- 4)字段,以及unpacking其中一些字段时使用的常量
- 5)内部控制方法
- 6)对ForkJoinTask方法的回调和其他支持
- 7)导出的方法
- 8)以最小依赖顺序初始化静态变量的静态块