1.官方文档
A ForkJoinTask with a completion action performed when triggered
and there are no remaining pending actions. CountedCompleters are
in general more robust in the presence of subtask stalls and blockage
than are other forms of ForkJoinTasks, but are less intuitive to
program. Uses of CountedCompleter are similar to those of other
completion based components (such as CompletionHandler) except
that multiple pending completions may be necessary to trigger the
completion action onCompletion(CountedCompleter), not just one.
Unless initialized otherwise, the pending count starts at zero, but may
be (atomically) changed using methods setPendingCount(int),
addToPendingCount(int), and compareAndSetPendingCount(int, int).
Upon invocation of tryComplete(), if the pending action count is
nonzero, it is decremented; otherwise, the completion action is
performed, and if this completer itself has a completer, the process is
continued with its completer. As is the case with related
synchronization components such as Phaser and Semaphore, these
methods affect only internal counts; they do not establish any further
internal bookkeeping. In particular, the identities of pending tasks are
not maintained. As illustrated below, you can create subclasses that
do record some or all pending tasks or their results when needed. As
illustrated below, utility methods supporting customization of
completion traversals are also provided. However, because
CountedCompleters provide only basic synchronization mechanisms,
it may be useful to create further abstract subclasses that maintain
linkages, fields, and additional support methods appropriate for a set
of related usages.
A concrete CountedCompleter class must define method compute(),
that should in most cases (as illustrated below), invoke tryComplete()
once before returning. The class may also optionally override method
onCompletion(CountedCompleter) to perform an action upon normal
completion, and method onExceptionalCompletion(Throwable,
CountedCompleter) to perform an action upon any exception.
CountedCompleters most often do not bear results, in which case
they are normally declared as CountedCompleter<Void>, and will
always return null as a result value. In other cases, you should
override method getRawResult() to provide a result from join(),
invoke(), and related methods. In general, this method should return
the value of a field (or a function of one or more fields) of the
CountedCompleter object that holds the result upon completion.
Method setRawResult(T) by default plays no role in
CountedCompleters. It is possible, but rarely applicable, to override
this method to maintain other objects or fields holding result data.
A CountedCompleter that does not itself have a completer (i.e., one
for which getCompleter() returns null) can be used as a regular
ForkJoinTask with this added functionality. However, any completer
that in turn has another completer serves only as an internal helper
for other computations, so its own task status (as reported in methods
such as ForkJoinTask.isDone()) is arbitrary; this status changes only
upon explicit invocations of complete(T),
ForkJoinTask.cancel(boolean),
ForkJoinTask.completeExceptionally(Throwable) or upon exceptional
completion of method compute. Upon any exceptional completion,
the exception may be relayed to a task's completer (and its
completer, and so on), if one exists and it has not otherwise already
completed. Similarly, cancelling an internal CountedCompleter has
only a local effect on that completer, so is not often useful.
一个ForkJoinTask,在触发时并且没有剩余的待处理操作时执行完成操作。与其他形式的ForkJoinTasks相比,CountedCompleters在子任务停顿和阻塞的情况下通常更强大,但编程不太直观。 CountedCompleter的使用类似于其他基于完成的组件(例如CompletionHandler)的使用,除了可能需要多个挂起的完成来触发完成操作onCompletion(CountedCompleter),而不仅仅是一个。除非另有初始化,否则挂起计数从零开始,但可以使用方法setPendingCount(int),addToPendingCount(int)和compareAndSetPendingCount(int, int)进行(原子)更改。在调用tryComplete()时,如果挂起的操作计数非零,则递减;否则,执行完成动作,并且如果该完成者本身具有completer,则该过程继续其completer。与Phaser和Semaphore等相关同步组件的情况一样,这些方法仅影响内部计数;他们没有建立任何进一步的内部簿记。特别是,未维护待处理任务的身份。如下所示,可以创建在需要时记录部分或全部待处理任务或其结果的子类。如下所示,还提供了支持完成遍历的定制的实用方法。但是,由于CountedCompleters仅提供基本同步机制,因此创建更多抽象子类可能很有用,这些子类维护适链接、字段和其他支持方法。
具体的CountedCompleter类必须定义方法compute(),在大多数情况下(如下所示),在返回之前调用tryComplete()一次。该类还可以选择覆盖方法onCompletion(CountedCompleter)以在正常完成时执行操作,并使用onExceptionalCompletion(Throwable,CountedCompleter)方法对任何异常执行操作。
CountedCompleters通常不会产生结果,在这种情况下,它们通常被声明为CountedCompleter <Void>,并且将始终返回null作为结果值。在其他情况下,应该重写方法getRawResult()以提供join()、invoke()和相关方法的结果。通常,此方法应返回CountedCompleter对象的在完成时保存结果字段值(一个或多个字段的函数)。默认情况下,方法setRawResult(T)在CountedCompleters中不起任何作用。可以(但很少适用)覆盖此方法以维护保存结果数据的其他对象或字段。
一个不具有completer(即getCompleter()返回null的completer)的CountedCompleter可以用作具有此附加功能的常规ForkJoinTask。但是,任何具有另一个completer的completer仅用作其他计算的内部帮助者,因此其自身的任务状态(如ForkJoinTask.isDone()等方法中所报告的)是任意的;只有在显式调用complete(T)、ForkJoinTask.cancel(boolean)、ForkJoinTask.completeExceptionally(Throwable)或方法计算异常完成时,此状态才会更改。在任何异常完成时,异常可以被转发到任务的completer(及completer的completer,等等),如果存在并且尚未完成。同样,取消内部的CountedCompleter只对该completer产生局部影响,因此通常没有多大作用。
2.示例用法
2.1 Parallel recursive decomposition并行递归分解
CountedCompleters可以安排在树中,类似于RecursiveActions,尽管设置它们的结构通常会有所不同。这里,每个任务的完成者是计算树中它们的父亲。尽管它们需要更多的簿记,但在对数组或集合的每个元素应用可能耗时的操作(无法进一步细分)时,CountedCompleters可能是更好的选择;特别是当某些元素的操作完成时间与其他元素完全不同时,或者由于内在的变化(例如I / O)或者诸如垃圾收集之类的辅助效果。因为CountedCompleters提供了自己的continuation,所以其他线程不需要阻塞等待执行它们。
例如,如下是一个类的初始版本,它使用二分递归分解将工作分成单个部分(叶子任务)。即使工作被分成单独的调用,基于树的技术通常比直接forking叶子任务更可取,因为它们减少了线程间的通信并改善了负载平衡。在递归的情况下,要完成的每对子任务中的第二个触发其父项的完成(因为没有执行结果组合,方法onCompletion的默认空操作实现没有被覆盖)。静态实用方法设置基本任务并调用它(此处,隐式使用ForkJoinPool.commonPool())。
class MyOperation<E> { void apply(E e) { ... } }
class ForEach<E> extends CountedCompleter<Void> {
public static <E> void forEach(E[] array, MyOperation<E> op) {
new ForEach<E>(null, array, op, 0, array.length).invoke();
}
final E[] array; final MyOperation<E> op; final int lo, hi;
ForEach(CountedCompleter<?> p, E[] array, MyOperation<E> op, int lo, int hi) {
super(p);
this.array = array; this.op = op; this.lo = lo; this.hi = hi;
}
public void compute() { // version 1
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(2); // must set pending count before fork
new ForEach(this, array, op, mid, hi).fork(); // right child
new ForEach(this, array, op, lo, mid).fork(); // left child
}
else if (hi > lo)
op.apply(array[lo]);
tryComplete();
}
}
注意在递归的情况下,任务在forking right任务之后什么也没做,因此可以在返回之前直接调用其左任务来改进此设计。 (这是尾递归删除的类比。)另外,因为任务在执行其左任务时返回(而不是直接调用tryComplete),挂起的计数被设置为1。
class ForEach<E> ...
public void compute() { // version 2
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
setPendingCount(1); // only one pending
new ForEach(this, array, op, mid, hi).fork(); // right child
new ForEach(this, array, op, lo, mid).compute(); // direct invoke
}
else {
if (hi > lo)
op.apply(array[lo]);
tryComplete();
}
}
作为进一步的改进,请注意左任务甚至不需要存在。 我们可以使用原始任务进行迭代,并为每个fork添加一个挂起计数,而不是创建一个新任务。 此外,由于此树中的任务都没有实现onCompletion(CountedCompleter)方法,因此tryComplete()可以替换为propagateCompletion()。
class ForEach<E> ...
public void compute() { // version 3
int l = lo, h = hi;
while (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
new ForEach(this, array, op, mid, h).fork(); // right child
h = mid;
}
if (h > l)
op.apply(array[l]);
propagateCompletion();
}
这些类的其他改进可能需要预先计算挂起的计数,以便它们可以在构造器中建立,专门用于叶子步骤的类,例如通过细分四次,而不是两次,并使用自适应阈值而不是总是细分为单个元素。
2.2 Searching搜索
CountedCompleters树可以在数据结构的不同部分中搜索值或属性,并在找到时立即在AtomicReference中报告结果。 其他的可以poll结果以避免不必要的工作。 (还可以取消其他任务,但通常让他们注意到结果已设置会更简单、更高效,如果值已设置,则跳过进一步处理。)再次使用完全分区的数组说明(实践中,叶子任务几乎总是处理多个元素):
class Searcher<E> extends CountedCompleter<E> {
final E[] array; final AtomicReference<E> result; final int lo, hi;
Searcher(CountedCompleter<?> p, E[] array, AtomicReference<E> result, int lo, int hi) {
super(p);
this.array = array; this.result = result; this.lo = lo; this.hi = hi;
}
public E getRawResult() { return result.get(); }
public void compute() { // similar to ForEach version 3
int l = lo, h = hi;
while (result.get() == null && h >= l) {
if (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
new Searcher(this, array, result, mid, h).fork();
h = mid;
}
else {
E x = array[l];
if (matches(x) && result.compareAndSet(null, x))
quietlyCompleteRoot(); // root task is now joinable
break;
}
}
tryComplete(); // normally complete whether or not found
}
boolean matches(E e) { ... } // return true if found
public static <E> E search(E[] array) {
return new Searcher<E>(null, array, new AtomicReference<E>(), 0, array.length).invoke();
}
}
在这个例子中,除了compareAndSet一个公共结果之外没有其他任何效果,tryComplete的尾部无条件调用可以是有条件的(if(result.get()== null)tryComplete();)因为完成根任务后,管理完成无需进一步簿记。
2.3 Recording subtasks记录子任务
组合多个子任务结果的CountedCompleter任务通常需要在方法onCompletion(CountedCompleter)中访问这些结果。 如下面的类所示(执行map-reduce的简化形式,其中映射和归约都是E类型),在分治的设计中执行此操作的一种方法是在每个子任务记录其兄弟,以便它可以在方法onCompletion中访问。 这种技术适用于左右结果的组合顺序无关紧要的归约; 有序归约需要明确的左/右指定。 在上述示例中看到的其他流型的变体也可以适用。
class MyMapper<E> { E apply(E v) { ... } }
class MyReducer<E> { E apply(E x, E y) { ... } }
class MapReducer<E> extends CountedCompleter<E> {
final E[] array; final MyMapper<E> mapper;
final MyReducer<E> reducer; final int lo, hi;
MapReducer<E> sibling;
E result;
MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
MyReducer<E> reducer, int lo, int hi) {
super(p);
this.array = array; this.mapper = mapper;
this.reducer = reducer; this.lo = lo; this.hi = hi;
}
public void compute() {
if (hi - lo >= 2) {
int mid = (lo + hi) >>> 1;
MapReducer<E> left = new MapReducer(this, array, mapper, reducer, lo, mid);
MapReducer<E> right = new MapReducer(this, array, mapper, reducer, mid, hi);
left.sibling = right;
right.sibling = left;
setPendingCount(1); // only right is pending
right.fork();
left.compute(); // directly execute left
}
else {
if (hi > lo)
result = mapper.apply(array[lo]);
tryComplete();
}
}
public void onCompletion(CountedCompleter<?> caller) {
if (caller != this) {
MapReducer<E> child = (MapReducer<E>)caller;
MapReducer<E> sib = child.sibling;
if (sib == null || sib.result == null)
result = child.result;
else
result = reducer.apply(child.result, sib.result);
}
}
public E getRawResult() { return result; }
public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
return new MapReducer<E>(null, array, mapper, reducer,
0, array.length).invoke();
}
}
这里,方法onCompletion采用了一种许多完成设计的共同形式来组合结果。 这个回调式方法在每个任务中被触发一次,在如下两个不同的上下文中,挂起计数变为零:
- 1)任务本身,如果在调用tryComplete时其挂起计数为零
- 2)当任何子任务完成并将挂起计数减少到零时
调用者参数区分这些情况。 大多数情况下,当调用者是这样时,不需要采取任何行动。 否则,可以使用调用者参数(通常通过强制转换)来提供要组合的值(和/或指向其他值的链接)。 假设正确使用挂起计数,onCompletion内的操作在完成任务及其子任务时发生(一次)。 此方法中不需要其他额外的同步,以确保访问此任务的字段或其他已完成任务的线程安全性。
2.4 Completion Traversals
如果使用onCompletion处理完成不适用或不方便,则可以使用方法firstComplete()和nextComplete()来创建自定义遍历。 例如,要定义一个以上面的2.1中ForEach示例的第三种形式拆分right任务的MapReducer,completions必须合作归约未使用的子任务链接,这可以通过以下方式完成:
class MapReducer<E> extends CountedCompleter<E> { // version 2
final E[] array; final MyMapper<E> mapper;
final MyReducer<E> reducer; final int lo, hi;
MapReducer<E> forks, next; // record subtask forks in list
E result;
MapReducer(CountedCompleter<?> p, E[] array, MyMapper<E> mapper,
MyReducer<E> reducer, int lo, int hi, MapReducer<E> next) {
super(p);
this.array = array; this.mapper = mapper;
this.reducer = reducer; this.lo = lo; this.hi = hi;
this.next = next;
}
public void compute() {
int l = lo, h = hi;
while (h - l >= 2) {
int mid = (l + h) >>> 1;
addToPendingCount(1);
(forks = new MapReducer(this, array, mapper, reducer, mid, h, forks)).fork();
h = mid;
}
if (h > l)
result = mapper.apply(array[l]);
// process completions by reducing along and advancing subtask links
for (CountedCompleter<?> c = firstComplete(); c != null; c = c.nextComplete()) {
for (MapReducer t = (MapReducer)c, s = t.forks; s != null; s = t.forks = s.next)
t.result = reducer.apply(t.result, s.result);
}
}
public E getRawResult() { return result; }
public static <E> E mapReduce(E[] array, MyMapper<E> mapper, MyReducer<E> reducer) {
return new MapReducer<E>(null, array, mapper, reducer,
0, array.length, null).invoke();
}
}
2.5 Triggers
一些CountedCompleters本身从不forked,而是在其他设计中用作管道的一部分; 包括那些:完成一个或多个异步任务触发另一个异步任务的。 例如:
class HeaderBuilder extends CountedCompleter<...> { ... }
class BodyBuilder extends CountedCompleter<...> { ... }
class PacketSender extends CountedCompleter<...> {
PacketSender(...) { super(null, 1); ... } // trigger on second completion
public void compute() { } // never called
public void onCompletion(CountedCompleter<?> caller) { sendPacket(); }
}
// sample use:
PacketSender p = new PacketSender();
new HeaderBuilder(p, ...).fork();
new BodyBuilder(p, ...).fork();
3.源码分析
3.1 构造器和域
/** This task's completer, or null if none */
final CountedCompleter<?> completer;
/** The number of pending tasks until completion */
volatile int pending;
/**
* Creates a new CountedCompleter with the given completer
* and initial pending count.
*
* @param completer this task's completer, or {@code null} if none
* @param initialPendingCount the initial pending count
*/
protected CountedCompleter(CountedCompleter<?> completer,
int initialPendingCount) {
this.completer = completer;
this.pending = initialPendingCount;
}
/**
* Creates a new CountedCompleter with the given completer
* and an initial pending count of zero.
*
* @param completer this task's completer, or {@code null} if none
*/
protected CountedCompleter(CountedCompleter<?> completer) {
this.completer = completer;
}
/**
* Creates a new CountedCompleter with no completer
* and an initial pending count of zero.
*/
protected CountedCompleter() {
this.completer = null;
}
3.2 exec和compute
/**
* Implements execution conventions for CountedCompleters.
*/
protected final boolean exec() {
compute();
return false;
}
/**
* The main computation performed by this task.
*/
public abstract void compute();
参考ForkJoinTask中的doExec:
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
当出现异常时,流程不变,但当compute方式正常完成的情况,将不可能进行后续的设置完成和唤醒操作。因此它必须由CountedCompleter自定义的完成。
3.2 tryComplete
/**
* If the pending count is nonzero, decrements the count;
* otherwise invokes {@link #onCompletion(CountedCompleter)}
* and then similarly tries to complete this task's completer,
* if one exists, else marks this task as complete.
*/
public final void tryComplete() {
CountedCompleter<?> a = this, s = a;
for (int c;;) {
if ((c = a.pending) == 0) {
a.onCompletion(s);
if ((a = (s = a).completer) == null) {
s.quietlyComplete();
return;
}
}
else if (U.compareAndSwapInt(a, PENDING, c, c - 1))
return;
}
}
/**
* Completes this task normally without setting a value. The most
* recent value established by {@link #setRawResult} (or {@code
* null} by default) will be returned as the result of subsequent
* invocations of {@code join} and related operations.
*
* @since 1.8
*/
public final void quietlyComplete() {
setCompletion(NORMAL);
}
tryComplete方法只会对root进行quietlyComplete,进而setComplete(NORMAL),对于链上的其他任务,最多会帮助挂起数减一,而不会把它们置为完成态。
每一个CountedCompleter都可能有自己的completer,最终组成倒树形,任何一条链只能有一个root,root的completer为null。
4.总结
CountedCompleter使用普通树的结构存放动作,但是它又是另类的树,因为子节点能找到父节点,父节点却找不到子节点,而只知道子节点代表的动作未执行的数量。