Java并发编程源码分析系列:
- 分析Java线程池的创建
- 分析Java线程池执行原理
- 分析Java线程池Callable任务执行原理
- 分析ReentrantLock的实现原理
- 分析CountDownLatch的实现原理
- 分析同步工具Semaphore和CyclicBarrier的实现原理
- 分析Java延迟与周期任务的实现原理
- 分析jdk-1.8-ForkJoinPool实现原理(上)
上篇介绍了ForkJoinPool的基本结构和参数,本篇进入代码细节,一窥ForkJoinPool的实现原理。
整个流程和重要方法归纳如下:
任务提交
- 提交任务入口:submit,execute,invoke
- 完整版提交任务:externalSubmit(包括初始化)
- 简单版提交任务:externalPush
worker管理
- 激活或创建:signalWork
- 创建:tryAddWorker,createWorker
- 注册、撤销注册:registerWorker,deregisterWorker
worker执行(runWorker三部曲)
- 获取:scan
- 执行:runTask
- 等待:awaitWork
Fork
- 等同于提交任务
Join(doJoin)
- 当前不是worker:externalAwaitDone
- 当前是worker:awaitJoin
awaitJoin等待两种策略
- Helping:tryRemoveAndExec、helpStealer
- Compensating:tryCompensate
等待所有任务完成
- 静止:awaitQuiescence
- 终止:awaitTermination
关闭
- shutdown,shutdownNow
- tryTerminate
异常处理
提交第一个task
提交任务默认使用来自于接口的submit,除此之外,ForkJoinPool还提供execute和invoke:
- submit:提交任务并返回任务
- execute:只提交任务
- invoke:提交并返回任务结果(return task.join())
它们三个内部实现是一样的,只是返回的东西不同,我们来看submit的实现就够:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task;
}
除了使用ForkJoinTask外,还支持Runnable和Callable,内部使用Adapter最终转为ForkJoinTask。submit很简单地调用externalPush,这是个简化版的任务入队方法,调用不成功时需要调用完整版的externalSubmit。
我们先来看externalSubmit,它处理非正常情况和进行初始化。ForkJoinPool构造函数只初始化一部分参数,包括WorkQueue[]等留到在externalSubmit初始化。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
//1
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//2
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//3
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
//4
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
mark1检查运行状态是否已经进入SHUTDOWN,抛出拒收的异常。对于ForkJoinPool的关闭,见后文“关闭ForkJoinPool”一节。
第一次执行externalSubmit时,运行状态还没有STARTED,执行mark2进行初始化操作:
- 按2的幂设置WorkQueue[]的长度
- 设置原子对象stealCounter
- 运行状态进入STARTED
第二次循环中,执行mark4,创建第一个WorkQueue。
第三次循环中,执行mark3,会找到刚才创建的WorkQueue,从队列的top端加入任务,调用后面要讲的signalWork激活或者创建worker。
WorkQueue在WorkQueue[]的下标,取的是k = r & m & SQMASK。r是线程的probe,来自随机数ThreadLocalRandom;m是WorkQueue[]的长度减一;SQMASK是固定值0x007e,转为二进制是1111110,末尾是0,在&操作后,得出的k必定是偶数。所以创建的第一个WorkQueue没有对应worker,保存的任务是submission,scanState默认是INACTIVE。
externalSubmit是长了点,不过逻辑清晰,不难理解。除了初始化,大部分时间其实不需要externalSubmit,使用简单版的externalPush即可。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
虽然进入externalPush的if有一大堆条件,不过有了前面的分析,我们很容易看懂:
- 线程经过ThreadLocalRandom初始化;
- 运行状态正常;
- WorkQueue[]非空;
- 随机数取到的WorkQueue非空,并锁定成功。
满足上面的条件后,任务从top端入队。如果队列里只有一个任务,调用signalWork。基本实现和externalSubmit的mark3差不多。
worker管理
worker的管理涉及创建、激活、注册、撤销注册。
接上一节创建第一个WorkQueue并加入第一个任务,调用了signalWork,入参是WorkQueue[]和当前操作的WorkQueue。
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
while ((c = ctl) < 0L) { // too few active
//1
if ((sp = (int)c) == 0) { // no idle workers
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
//2
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
首先是进入循环的条件,判断了ctl的正负,我们知道ctl的第一个16bit表示AC,为负时表示活动的worker还未达到预定的Parallelism,需要新增或者激活。mark1通过sp判断现在没有空闲worker,需要执行增加,调用tryAddWorker。
有空闲worker的情况进入mark2,sp取栈顶WorkQueue的下标,具体解挂worker的过程和tryRelease几乎一样,这里合起来介绍。
private boolean tryRelease(long c, WorkQueue v, long inc) {
int sp = (int)c, vs = (sp + SS_SEQ) & ~INACTIVE; Thread p;
if (v != null && v.scanState == sp) { // v is at top of stack
long nc = (UC_MASK & (c + inc)) | (SP_MASK & v.stackPred);
if (U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs;
if ((p = v.parker) != null)
U.unpark(p);
return true;
}
}
return false;
}
在sp上,将状态从inactive改为active,累加版本号,解挂线程,通过stackPred取得前一个WorkQueue的index,设回sp里。
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
增加worker,需要将AC和TC都加1,成功后调用createWorker。
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
deregisterWorker(wt, ex);
return false;
}
createWorker的代码很简单,通过线程工厂创建worker的实例并启动。如果没有异常,直接返回就行;否则,需要逆操作撤销worker的注册。worker什么时候注册了?看ForkJoinWorkerThread的构造函数,里面调用ForkJoinPool.registerWorker。
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//1
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
//2
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
一开始,线程就被设置为守护线程。重温知识点,当只剩下守护线程时,JVM就会退出,垃圾回收线程也是一个典型的守护线程。
mark1,前文讲过有对应worker的WorkQueue只能出现在WorkQueue[]奇数index,代码里取初始index用的是:
i = ((s << 1) | 1) & m;
seed左移再“或”1,是奇数。m是WorkQueue[]长度减1,也是奇数。两者再“与”,保证取得的i是奇数。若该位置已经存在其他WorkQueue,需要重新计算下一个位置,有需要还要扩容WorkQueue[]。
mark2设置新创建WorkQueue的scanState为index,表示了两种意思:
- 非负表示有对应的worker;
- 默认scanState使用SCANNING。
就此描述清楚worker的创建、WorkQueue的创建和加入WorkQueue[]。
创建worker时会默认注册worker,当创建出现异常时,需要执行撤销注册。
final void deregisterWorker(ForkJoinWorkerThread wt, Throwable ex) {
WorkQueue w = null;
//1
if (wt != null && (w = wt.workQueue) != null) {
WorkQueue[] ws; // remove index from array
int idx = w.config & SMASK;
int rs = lockRunState();
if ((ws = workQueues) != null && ws.length > idx && ws[idx] == w)
ws[idx] = null;
unlockRunState(rs, rs & ~RSLOCK);
}
//2
long c; // decrement counts
do {} while (!U.compareAndSwapLong
(this, CTL, c = ctl, ((AC_MASK & (c - AC_UNIT)) |
(TC_MASK & (c - TC_UNIT)) |
(SP_MASK & c))));
//3
if (w != null) {
w.qlock = -1; // ensure set
w.transferStealCount(this);
w.cancelAll(); // cancel remaining tasks
}
//4
for (;;) { // possibly replace
WorkQueue[] ws; int m, sp;
if (tryTerminate(false, false) || w == null || w.array == null ||
(runState & STOP) != 0 || (ws = workQueues) == null ||
(m = ws.length - 1) < 0) // already terminating
break;
if ((sp = (int)(c = ctl)) != 0) { // wake up replacement
if (tryRelease(c, ws[sp & m], AC_UNIT))
break;
}
else if (ex != null && (c & ADD_WORKER) != 0L) {
tryAddWorker(c); // create replacement
break;
}
else // don't need replacement
break;
}
//5
if (ex == null) // help clean on way out
ForkJoinTask.helpExpungeStaleExceptions();
else // rethrow
ForkJoinTask.rethrow(ex);
}
撤销注册过程按部就班的,肯定想到包括处理WorkQueue的善后和修改ctl:
- 将归属的WorkQueue从WorkQueue[]中置空,具体下标从WorkQueue.config中获取;
- AC和TC分别减一;
- WorkQueue的qlock置负,表示要终止了,并且取消队里所有任务;
- 检查运行状态,尝试激活或者创建worker替代;
- 异常处理。
worker执行
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
ForkJoinWorkerThread启动后调用了ForkJoinPool的runWorker:
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
worker执行流程就是三部曲:
- scan:尝试获取一个任务;
- runTask:执行取得的任务;
- awaitWork:没有任务进入等待。
如果awaitWork返回false,等不到任务,跳出runWorker的循环,回到run中执行finally,最后调用deregisterWorker撤销注册。
首先是scan,扫描WorkQueue[],尝试steal一个任务。
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//1
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
if ((q = ws[k]) != null) {
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
//2
if (ss >= 0) {
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
else if (oldSum == 0 && // try to activate
w.scanState < 0)
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
if (ss < 0) // refresh
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//3
if ((k = (k + 1) & m) == origin) { // continue until stable
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
checkSum = 0;
}
}
}
return null;
}
mark1进入循环,通过随机数从WorkQueue[]获取WorkQueue,并尝试从WorkQueue的base端steal任务。到达mark2表示成功定位一个任务,这时要看归属自己WorkQueue的scanState:
- active:从WorkQueue的base端出队并返回,常规地调用signalWork,结束;
- inactive:这个状态下,调用tryRelease,如果WorkQueue正好在栈顶上,激活它。
mark3处,每次循环会校验新取的index是不是等于第一次取的index。如果相等,说明遍历了一圈还没有steal到任务,当前worker是过剩的,执行如下操作:
- 当前WorkQueue的scanState修改为inactive;
- 当前WorkQueue挂到栈顶,AC减一。
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
steal到一个任务后,就可以开始执行:
- 将WorkQueue的scanState从SCANNING转为RUNNING;
- 记录当前任务是steal来的,保存在currentSteal,并执行doExec;
- 执行自己WorkQueue里的任务execLocalTasks(根据mode控制取任务是LIFO还是FIFO,调用doExec执行,直到WorkQueue为空);
- 累加steal数量;
- 能执行的都执行了,scanState转回SCANNING。
final int doExec() {
int s; boolean completed;
if ((s = status) >= 0) {
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (U.compareAndSwapInt(this, STATUS, s, s | completion)) {
if ((s >>> 16) != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
doExec方法,里面最终调用ForkJoinTask的核心方法exec,前文介绍过,RecursiveAction和RecursiveTask它们override的exce调用了compute。这样子,源码和使用的方法关联起来了。
当任务执行完成,调用setCompletion,将任务状态改为NORMAL。注意,使用CAS修改状态时,目标状态使用s|NORMAL。
- 原状态是NORMAL,无符号右移为0;
- 原状态是SIGNAL,无符号右移不为0。
如果任务原状态是SIGNAL,表示有线程由于join而进入了wait,等着任务完成,这时需要额外操作notify触发唤醒。
private boolean awaitWork(WorkQueue w, int r) {
if (w == null || w.qlock < 0) // w is terminating
return false;
for (int pred = w.stackPred, spins = SPINS, ss;;) {
//1
if ((ss = w.scanState) >= 0)
break;
//2
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7;
if (r >= 0 && --spins == 0) { // randomize spins
WorkQueue v; WorkQueue[] ws; int s, j; AtomicLong sc;
if (pred != 0 && (ws = workQueues) != null &&
(j = pred & SMASK) < ws.length &&
(v = ws[j]) != null && // see if pred parking
(v.parker == null || v.scanState >= 0))
spins = SPINS; // continue spinning
}
}
else if (w.qlock < 0) // recheck after spins
return false;
//3
else if (!Thread.interrupted()) {
long c, prevctl, parkTime, deadline;
int ac = (int)((c = ctl) >> AC_SHIFT) + (config & SMASK);
//4
if ((ac <= 0 && tryTerminate(false, false)) ||
(runState & STOP) != 0) // pool terminating
return false;
//5
if (ac <= 0 && ss == (int)c) { // is last waiter
prevctl = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & pred);
int t = (short)(c >>> TC_SHIFT); // shrink excess spares
if (t > 2 && U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // else use timed wait
parkTime = IDLE_TIMEOUT * ((t >= 0) ? 1 : 1 - t);
deadline = System.nanoTime() + parkTime - TIMEOUT_SLOP;
}
else
prevctl = parkTime = deadline = 0L;
Thread wt = Thread.currentThread();
U.putObject(wt, PARKBLOCKER, this); // emulate LockSupport
w.parker = wt;
if (w.scanState < 0 && ctl == c) // recheck before park
U.park(false, parkTime);
U.putOrderedObject(w, QPARKER, null);
U.putObject(wt, PARKBLOCKER, null);
if (w.scanState >= 0)
break;
if (parkTime != 0L && ctl == c &&
deadline - System.nanoTime() <= 0L &&
U.compareAndSwapLong(this, CTL, c, prevctl))
return false; // shrink pool
}
}
return true;
}
awaitWork里核心是一个无限循环,我们重点看里面的等待操作和跳出条件。
mark1判断WorkQueue的scanState,非负表示WorkQueue要不在RUNNING,要不在SCANNING,直接跳出。mark2里,SPINS初始为0,没有启用自旋等待的控制。
重点来看mark3,只要没有中断,就会一直循环执行(tryTerminate终止ForkJoinPool时会中断所有worker)。啰嗦一句,要分清楚return和break的不同含义:
- break:回到runWorker继续执行scan、runTask、awaitWork;
- return false:worker需要终止了。
mark4检查ForkJoinPool的状态,如果走向中止那边,当前worker也就无必要存在,return false。
mark5判断worker的存在是否有必要,如果满足下面条件:
- AC为零;
- TC超过2个;
- 当前WorkQueue在栈顶。
说明当前worker过剩,存在也没有任务执行,所以WorkQueue从栈顶释放,return false终止worker。
其他情况计算一个等待时间,挂起线程,被唤醒有两种可能:
- 外部唤醒:如果scanState非负,break出循环,继续执行scan;
- 时间到达唤醒:还是老样子,自己过剩,return false终止。
Fork
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
fork的代码很简单,如果当前线程是一个worker,直接将任务从top端加入自己的WorkQueue。对于非worker提交的task,执行externalPush,这个前面详细分析过了。
Join
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
join的目的是得到任务的运行结果,核心调用doJoin,根据任务状态返回结果,或者抛出异常。要注意的是,任务在ForkJoinPool中可能处于各种各样的状况,有可能刚好要被执行啊,有可能正在队列里排队啊,有可能已经被别人偷走啊。
doJoin的return是花一样的一串判断,先分解出头两个判断:
- status为负表示任务执行已经有结果,直接返回;
- 区分当前线程是否worker。
先来说当前线程不是worker这种情况,调用externalAwaitDone:
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
先不讲CountedCompleter的协作,将任务状态设置为SIGNAL,然后是使用wait/notify机制,线程进入等待。既然不是worker,不属于ForkJoinPool的管理范围,你挂起等通知就是了。
如果当前线程是worker,那就复杂多了。
(w = (wt = (ForkJoinWorkerThread)t).workQueue).tryUnpush(this) && (s = doExec()) < 0 ? s :wt.pool.awaitJoin(w, this, 0L)
首先调用tryUnpush,如果WorkQueue的top端任务正好是等待join的任务,毫无疑问,下个就是执行它,直接doExec;否则调用ForkJoinPool的awaitJoin。
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
int s = 0;
if (task != null && w != null) {
//1
ForkJoinTask<?> prevJoin = w.currentJoin;
U.putOrderedObject(w, QCURRENTJOIN, task);
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
//2
if ((s = task.status) < 0)
break;
//3
if (cc != null)
helpComplete(w, cc, 0);
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task);
if ((s = task.status) < 0)
break;
long ms, ns;
//4
if (deadline == 0L)
ms = 0L;
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
if (tryCompensate(w)) {
task.internalWait(ms);
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s;
}
mark1处,worker在WorkQueue中标记正在等待的任务,记在currentJoin。进入循环,在mark2处校验任务的状态,如果已经完成,直接跳出。
接下来重点是处理worker的等待,直接闲着太浪费了,awaitJoin里会分别尝试两种策略:
- Helping:尝试安排别的任务;
- Compensating:创建或者激活一个备用worker,原worker进入等待,由备用worker补偿工作量,直到原worker恢复。
mark3和mark4分别尝试两种策略。
Helping
两个help方法意思十分明确,如果任务是CountedCompleter,调用helpComplete。接下来看自己的WorkQueue,调用tryRemoveAndExec检查队列里所有任务,看等待join的任务在不在里面。如无所获,最后调用helpStealer,帮助其他worker。
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
while ((n = (s = top) - (b = base)) > 0) {
for (ForkJoinTask<?> t;;) { // traverse from s to b
long j = ((--s & m) << ASHIFT) + ABASE;
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
return s + 1 == top; // shorter than expected
else if (t == task) {
boolean removed = false;
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
else if (base == b) // replace with proxy
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
if (removed)
task.doExec();
break;
}
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; // was cancelled
}
if (--n == 0)
return false;
}
if (task.status < 0)
return false;
}
}
return true;
}
tryRemoveAndExec功能就是遍历WorkQueue,任务在队列里的位置可以分两种情况:
- 刚好在top端,取出来直接运行;
- 在队列中间,使用EmptyTask替代原位置,也可以取任务出来运行。
最后让tryRemoveAndExec返回false,不再参与helpStealer。
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent: for (subtask = task; subtask.status >= 0; ) {
//1
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) // can't find stealer
break descent;
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
//2
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
ForkJoinTask<?> next = v.currentJoin;
//3
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
//4
if (b - v.top >= 0 || (a = v.array) == null) {
if ((subtask = next) == null)
break descent;
j = v;
break;
}
//5
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
if (v.base == b) {
if (t == null) // stale
break descent;
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
helpStealer体现了互助的原则,你steal了我刚好需要join的任务,我不会闲等着,我也帮你执行任务。看最外面do-while循环和标记为descent的for循环,它们的条件都是只要join的任务没有执行完成,就一直执行帮助。
首先要找到需要帮助的WorkQueue,给个代号叫A,依据是currentSteal正好是等待join的任务。在mark1,遍历WorkQueue[]奇数下标的WorkQueue,检查currentSteal,如果是,表示它就是我们要找的人。
- 如果A中有任务等待执行,循环从base端取任务执行,代码是mark5处。注意到判断w.base != w.top时需要return,因为当原WorkQueue有新任务时,不能继续帮助;
- 如果A中没有任务,难道想帮也帮不了?那就继续帮助下家。在mark4中,根据A的currentJoin,找到下个WorkQueue,逻辑依旧是从base端取任务执行。
各种数据在不断变化中,mark3会校验两个WorkQueue的currentJoin和currentSteal是不是目标任务,不是的直接跳出到descent,重新查找WorkQueue。
Compensating
回到awaitJoin的mark4,尝试第二种策略Compensating。在timeout范围里,tryCompensate会不断调用,看能不能执行补偿。确定能够执行补偿,当前任务状态转为SIGNAL,并进入wait。
private boolean tryCompensate(WorkQueue w) {
boolean canBlock;
WorkQueue[] ws; long c; int m, pc, sp;
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false;
//1
else if ((sp = (int)(c = ctl)) != 0) // release idle worker
canBlock = tryRelease(c, ws[sp & m], 0L);
else {
//2
int ac = (int)(c >> AC_SHIFT) + pc;
int tc = (short)(c >> TC_SHIFT) + pc;
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {
if ((v.scanState & SCANNING) != 0)
break;
++nbusy;
}
}
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
//3
else if (tc >= pc && ac > 1 && w.isEmpty()) {
long nc = ((AC_MASK & (c - AC_UNIT)) |
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
//4
else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
//5
else { // similar to tryAddWorker
boolean add = false; int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT)));
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
canBlock = add && createWorker(); // throws on exception
}
}
return canBlock;
}
mark1如果栈顶有空闲的worker,激活即可。否则,考虑创建新的worker:
- mark2检查RUNNING的worker是否等于TC(上面遍历奇数WorkQueue时循环了两次,所以TC需要乘以2),这种情况很明显不需要创建worker补偿;
- mark3如果发现WorkQueue空了,调整AC的数量,减一;
- mark4检查TC是否超过最大值;(TC最大值不是parallelism哦)
- mark5具体增加worker的代码和tryAddWorker类似,不过这里只有TC加一,AC不需要变动,因为活动worker数量在补偿下没有改变。
等待所有任务完成
向ForkJoinPool提交一堆任务后,我们会希望等待所有任务执行完成后,继续下一步操作。ForkJoinPool提供了两个阻塞的await方法。
- awaitQuiescence
- awaitTermination
前者等待线程池静止,后者等待线程池终止,都很好理解。
public boolean isQuiescent() {
return (config & SMASK) + (int)(ctl >> AC_SHIFT) <= 0;
}
判断静止是通过判断AC是否少于等于零,当没有活动worker时,也就说明当前所有任务都执行完成。
public boolean awaitQuiescence(long timeout, TimeUnit unit) {
long nanos = unit.toNanos(timeout);
ForkJoinWorkerThread wt;
Thread thread = Thread.currentThread();
if ((thread instanceof ForkJoinWorkerThread) &&
(wt = (ForkJoinWorkerThread)thread).pool == this) {
helpQuiescePool(wt.workQueue);
return true;
}
long startTime = System.nanoTime();
WorkQueue[] ws;
int r = 0, m;
boolean found = true;
//1
while (!isQuiescent() && (ws = workQueues) != null &&
(m = ws.length - 1) >= 0) {
if (!found) {
//2
if ((System.nanoTime() - startTime) > nanos)
return false;
Thread.yield(); // cannot block
}
//3
found = false;
for (int j = (m + 1) << 2; j >= 0; --j) {
ForkJoinTask<?> t; WorkQueue q; int b, k;
if ((k = r++ & m) <= m && k >= 0 && (q = ws[k]) != null &&
(b = q.base) - q.top < 0) {
found = true;
if ((t = q.pollAt(b)) != null)
t.doExec();
break;
}
}
}
return true;
}
awaitQuiescence又是区分当前线程是否是worker,如果是worker,调用helpQuiescePool,接下来马上就讲。如果不是worker,有趣的是线程也参与到执行当中。
非worker线程进入mark1的循环,条件是ForkJoinPool还没有quiescent。mark2在时间没有timeout的情况下,先让步,如果能够得到执行权,进入mark3寻找有任务的WorkQueue,从base端取出任务执行。
不得不说,ForkJoinPool极尽所能利用资源,加快任务的执行速度。
从helpQuiescePool的方法名也能知道,在awaitQuiescence时,worker当仁不让会从别的WorkQueue取任务,整体加快执行速度。
final void helpQuiescePool(WorkQueue w) {
ForkJoinTask<?> ps = w.currentSteal; // save context
for (boolean active = true;;) {
long c; WorkQueue q; ForkJoinTask<?> t; int b;
//1
w.execLocalTasks(); // run locals before each scan
//2
if ((q = findNonEmptyStealQueue()) != null) {
if (!active) { // re-establish active count
active = true;
U.getAndAddLong(this, CTL, AC_UNIT);
}
if ((b = q.base) - q.top < 0 && (t = q.pollAt(b)) != null) {
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec();
if (++w.nsteals < 0)
w.transferStealCount(this);
}
}
//3
else if (active) { // decrement active count without queuing
long nc = (AC_MASK & ((c = ctl) - AC_UNIT)) | (~AC_MASK & c);
if ((int)(nc >> AC_SHIFT) + (config & SMASK) <= 0)
break; // bypass decrement-then-increment
if (U.compareAndSwapLong(this, CTL, c, nc))
active = false;
}
//4
else if ((int)((c = ctl) >> AC_SHIFT) + (config & SMASK) <= 0 &&
U.compareAndSwapLong(this, CTL, c, c + AC_UNIT))
break;
}
U.putOrderedObject(w, QCURRENTSTEAL, ps);
}
第一步mark1,worker需要先execLocalTasks,将自己WorkQueue里的任务执行完毕。
接着mark2,findNonEmptyStealQueue查找非空WorkQueue。这里使用了变量active标记worker是否活动,以便修改AC。当找到非空WorkQueue,worker当前是inactive,重新变为active并将AC加一,并从非空WorkQueue的base端取任务执行。
mark3,当worker是active,但没有非空WorkQueue,将worker变为inactive并将AC减一。如果变化前AC已经为0,表示整个ForkJoinPool所有任务都执行完成进入quiescent。OK这就是我们的目标,直接跳出循环。
mark4,当worker是inactive,没有非空WorkQueue,AC又等于0,没有东西可干,跳出循环,保持AC至少为1。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (this == common) {
awaitQuiescence(timeout, unit);
return false;
}
long nanos = unit.toNanos(timeout);
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long deadline = System.nanoTime() + nanos;
synchronized (this) {
for (;;) {
if (isTerminated())
return true;
if (nanos <= 0L)
return false;
long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
wait(millis > 0L ? millis : 1L);
nanos = deadline - System.nanoTime();
}
}
}
在时间范围内,awaitTermination等待运行状态进入TERMINATED,没有什么特别要讲。
关闭ForkJoinPool
ForkJoinPool的关闭方法shutdown和shutdownNow都是调用tryTerminate,区别是now是否为true。tryTerminate的代码很长,我们将它分拆开几段来看。
int rs;
if (this == common) // cannot shut down
return false;
if ((rs = runState) >= 0) {
if (!enable)
return false;
rs = lockRunState(); // enter SHUTDOWN phase
unlockRunState(rs, (rs & ~RSLOCK) | SHUTDOWN);
}
shutdown和shutdownNow调用tryTerminate将运行状态设置为SHUTDOWN(enable==true)。其他代码调用tryTerminate,enable是false,仅仅检测ForkJoinPool是否正在关闭或者已经关闭。
if ((rs & STOP) == 0) {
if (!now) { // check quiescence
for (long oldSum = 0L;;) { // repeat until stable
WorkQueue[] ws; WorkQueue w; int m, b; long c;
long checkSum = ctl;
if ((int)(checkSum >> AC_SHIFT) + (config & SMASK) > 0)
return false; // still active workers
if ((ws = workQueues) == null || (m = ws.length - 1) <= 0)
break; // check queues
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
if ((b = w.base) != w.top || w.scanState >= 0 ||
w.currentSteal != null) {
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
return false; // arrange for recheck
}
checkSum += b;
if ((i & 1) == 0)
w.qlock = -1; // try to disable external
}
}
if (oldSum == (oldSum = checkSum))
break;
}
}
if ((runState & STOP) == 0) {
rs = lockRunState(); // enter STOP phase
unlockRunState(rs, (rs & ~RSLOCK) | STOP);
}
}
接下来一段很明确是为了进入STOP状态,如果now是true,毫不犹豫修改状态。否则需要进行检查,看是否真的能马上进入STOP。
检查AC,还有活动worker当然不能进入STOP;检查所有WorkQueue,如果还在正常执行任务,不能进入STOP;偶数WorkQueue的qlock置为负,拦截从外部提交任务。
int pass = 0; // 3 passes to help terminate
for (long oldSum = 0L;;) { // or until done or stable
WorkQueue[] ws; WorkQueue w; ForkJoinWorkerThread wt; int m;
long checkSum = ctl;
//1
if ((short)(checkSum >>> TC_SHIFT) + (config & SMASK) <= 0 ||
(ws = workQueues) == null || (m = ws.length - 1) <= 0) {
if ((runState & TERMINATED) == 0) {
rs = lockRunState(); // done
unlockRunState(rs, (rs & ~RSLOCK) | TERMINATED);
synchronized (this) { notifyAll(); } // for awaitTermination
}
break;
}
//2
for (int i = 0; i <= m; ++i) {
if ((w = ws[i]) != null) {
checkSum += w.base;
w.qlock = -1; // try to disable
if (pass > 0) {
w.cancelAll(); // clear queue
if (pass > 1 && (wt = w.owner) != null) {
if (!wt.isInterrupted()) {
try { // unblock join
wt.interrupt();
} catch (Throwable ignore) {
}
}
if (w.scanState < 0)
U.unpark(wt); // wake up
}
}
}
}
if (checkSum != oldSum) { // unstable
oldSum = checkSum;
pass = 0;
}
else if (pass > 3 && pass > m) // can't further help
break;
else if (++pass > 1) { // try to dequeue
long c; int j = 0, sp; // bound attempts
while (j++ <= m && (sp = (int)(c = ctl)) != 0)
tryRelease(c, ws[sp & m], AC_UNIT);
}
}
最后执行真正终止ForkJoinPool的运作。
mark1是循环的跳出条件,检查TC和WorkQueue[],如果为零或者为空,表示该停的都停了,进入TERMINATED。
mark2遍历所有WorkQueue,pass记录次数,每次遍历会执行不同的操作:
- 第一次:qlock设置为负数;
- 第二次:取消WorkQueue里所有任务,释放栈顶WorkQueue;
- 第三次:如果有归属的worker,中断并解锁线程。
异常处理
ForkJoinPool正常流程讲完了,再补充讲下异常处理。
出现的异常我们无办法直接在主线程捕获,所以ForkJoinTask提供了isCompletedAbnormally检查任务状态,并且可以通过任务的getException方法获取异常。
if (task.isCompletedAbnormally) {
println(task.exception.message)
}
任务执行doExec出现异常时,会调用setExceptionalCompletion,里面继续调用了recordExceptionalCompletion。
final int recordExceptionalCompletion(Throwable ex) {
int s;
if ((s = status) >= 0) {
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
int i = h & (t.length - 1);
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
s = setCompletion(EXCEPTIONAL);
}
return s;
}
ExceptionNode保存了异常任务和异常信息,由一个ExceptionNode数组(exceptionTable)统一保存,ExceptionNode之间通过next构成一条链。
获取任务的异常使用getException,方法里判断状态后继续调用getThrowableException。CANCELLED状态的异常直接创建CancellationException,和EXCEPTIONAL状态的流程不同。
public final Throwable getException() {
int s = status & DONE_MASK;
return ((s >= NORMAL) ? null :
(s == CANCELLED) ? new CancellationException() :
getThrowableException());
}
getThrowableException的代码不贴了,它从exceptionTable取出任务的异常信息并返回。里面对抛出异常线程不是当前线程这种情况进行了处理,为了得到更准确的结果,会让当前线程使用反射创建一样的异常返回。
后记
本文仅过一遍流程代码,很多设计思想没有也很难写清楚,多看代码注释吧。ForkJoinPool细节复杂,文里肯定有很多错漏,望指正,谢谢。