externalSubmit
externalSubmit是externalPush的完整版本,从externalPush源码中可以看到,它把很多“疑难杂症”都交给externalSubmit处理,自己仅处理简单的情况。前面externalPush中的第一个if有这样一个条件:&&U.compareAndSwapInt(q, QLOCK, 0, 1),如果执行CAS失败,难道就要把任务抛弃?显然不是,所以看一下externalSubmit是怎么处理这种情况的。
从externalPush的注释中,我们知道每个线程有自己的probe,通过probe,每个线程跟一个队列绑定。externalSubmit的策略比较简单,就是之前在externalPush中由于执行CAS失败而无法push的任务,在externalSubmit再执行一次CAS,如果成功则把任务放入线程对应的队列,如果失败说明该队列比较繁忙,所以externalSubmit就给该线程换一个probe,从而给该线程换一个队列。显然,这个过程是循环的,即每次执行CAS失败externalSubmit就会给该线程换一个队列,直到执行CAS成功为止。
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false;
// runState<0 此时 对应的状态是terminate
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
// 如果 runState 的starter 位 对应是0 , 说明还没有初始化,所以要初始化,并且启动它
else if ((rs & STARTED) == 0 || // initialize
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
// workqueque 的长度 是2 byte
//config 高16位是mode(FIFO或者LIFO),低16位是ForkJoinPool.workQueues的下标;
// 低16位永远是2 的幂次方,p 永远是 一个1 ,后面一堆0
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
//这一串眼花缭乱的操作 其实就是要把int 低16位,从最左端第一个1 ,到最右端 全部设置成1
// for example n -> 10000000, n|n>>>1 -> 11000000, n|n>>>2 -> 11110000,
// n|n>>>4 ->11111111, 其他的操作就没有作用了。
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
// externalpush 失败的任务会在这里再次提交
else if ((q = ws[k = r & m & SQMASK]) != null) {
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a = q.array;
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
if ((a != null && a.length > s + 1 - q.base) ||
(a = q.growArray()) != null) {
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
submitted = true;
}
} finally {
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) {
signalWork(ws, q);
return;
}
}
move = true; // move on failure
}
// 创建新队列
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
q = new WorkQueue(this, null);
q.hint = r;
q.config = k | SHARED_QUEUE;
q.scanState = INACTIVE;
rs = lockRunState(); // publish index
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else
move = true; // move if busy
if (move)
r = ThreadLocalRandom.advanceProbe(r);
}
}
externalSubmit和externalPush都使用一个CAS操作来保证同步:
U.compareAndSwapInt(q, QLOCK, 0, 1)
其实forkjoinPool内workQueues数组已经设置的比较大了,为2^16=65536,外部线程对任务的提交只用到其中的偶数部分,但也有32768。通常线程数上百已经算比较多的了,但是相对于32786而言,还是比较小的,所以会发生冲突的几率就很小。这也是为何使用CAS来简单地保证同步的原因。
创建和唤醒worker
唤醒和创建worker:signalWork
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
//ctl <0 说明小于目标并行度
while ((c = ctl) < 0L) { // too few active
// 为0 说明 没有闲置的 worker
if ((sp = (int)c) == 0) { // no idle workers
// 还需要再加worker
if ((c & ADD_WORKER) != 0L) // too few workers
tryAddWorker(c);
break;
}
//线程未开始运行,未初始化, 或者已经终止
if (ws == null) // unstarted/terminated
break;
if (ws.length <= (i = sp & SMASK)) // terminated
break;
if ((v = ws[i]) == null) // terminating
break;
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
int d = sp - v.scanState; // screen CAS
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; // activate v
if ((p = v.parker) != null)
U.unpark(p);
break;
}
if (q != null && q.base == q.top) // no more work
break;
}
}
添加worker:tryAddWorker与createWorker
private void tryAddWorker(long c) {
boolean add = false;
do {
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) {
int rs, stop; // check if terminating
if ((stop = (rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
if (stop != 0)
break;
if (add) {
createWorker();
break;
}
}
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
private boolean createWorker() {
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null;
ForkJoinWorkerThread wt = null;
try {
if (fac != null && (wt = fac.newThread(this)) != null) {
// 启动了线程
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
// 出现异常,取消注册
deregisterWorker(wt, ex);
return false;
}
registerWorker
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
// 设置work为守护线程,保证worker 随着main 结束而结束
wt.setDaemon(true); // configure thread
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
WorkQueue w = new WorkQueue(this, wt);
int i = 0; // assign a pool index
int mode = config & MODE_MASK;
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
if ((ws = workQueues) != null && (n = ws.length) > 0) {
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
w.hint = s; // use as random seed
w.config = i | mode;
w.scanState = i; // publication fence
ws[i] = w;
}
} finally {
unlockRunState(rs, rs & ~RSLOCK);
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
ForkJoinThread的run
public void run() {
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart();
// 从这里开始工作
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception);
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception);
}
}
}
}
任务执行
线程启动后会执行pool的runWorker:
final void runWorker(WorkQueue w) {
w.growArray(); // allocate queue
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null)
w.runTask(t);
else if (!awaitWork(w, r))
break;
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
scanState &= ~SCANNING; // mark as busy
(currentSteal = task).doExec();
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
execLocalTasks();
ForkJoinWorkerThread thread = owner;
// 因为窃取了一个任务,nsteal++
if (++nsteals < 0) // collect on overflow
transferStealCount(pool);
scanState |= SCANNING;
if (thread != null)
thread.afterTopLevelExec();
}
}
任务扫描(任务窃取)
scan的作用是扫描workQueues,窃取一个task,因为worker刚刚创建,所以worker的工作队列是空的,必须从外面扫一个task来执行。如果扫描来的task在被执行时调用了task.fork,生成新的task,则新的task就会被push到这个worker的工作队列。之后worker会执行自身工作队列中的task。当自身工作队列执行完毕,进入下一次循环,再次扫描task。如果扫描不到task,说明worker,多余了,即现有的worker已经足够执行workQueues中的任务了,此时worker应当休眠,在产生新task时,可能会被唤醒。如果workQueues已经没有多余任务可以执行,说明pool要终止了,则应当终止worker。worker应当休眠还是终止,这个逻辑在awaitWork实现,可查看后续章节
// w 是worker所在的workqueue, 在奇数队列, r是一个随机种子,用来获取偶数task 队列
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//如果队列不为空,开始扫描task
if ((q = ws[k]) != null) {
//如果有task
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
// q.base ==b 是确保 base没有变化过,如果有变化,说明这个被别的线程修改了, 也就是说别的线程已经获取了task ,当前试图获取task 的努力就应该放弃了
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
q.base == b) {
// w是激活的
if (ss >= 0) {
// 如果cas 失败,说明有别的线程在操作这个queue, 需要换一个queue,此时 更改r值, contintue
if (U.compareAndSwapObject(a, i, t, null)) {
q.base = b + 1;
//如果剩余任务大于1, 则唤醒worker, 来增加处理能力
if (n < -1) // signal others
signalWork(ws, q);
return t;
}
}
//如果w未激活,但是又有task ,说明当前task 比较多,当前的worker 处理不过来, 需要激活新的worker
else if (oldSum == 0 && // try to activate
w.scanState < 0)
// 尝试唤醒休眠的worker ,或者创建worker
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
// 如果取不到task 且 w 未激活
if (ss < 0) // refresh
//w.scanState 重新赋给ss ,这期间有可能有其他线程激活了w
ss = w.scanState;
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
//scan 是从origin 的位置扫描的, 即workQueues[origin], 但是如果队列竞争比较大, 有多个worker 在扫描它, 则需要换一个队列进行扫描,即 r ^= r << 1; r ^= r >>> 3; r ^= r << 10; origin = k = r & m;
// 最后origin 队列应该是非空而且是竞争比较低的队列
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
checkSum += b;
}
//判断是否是否应该设置worker 为失活。 当k = (k + 1) & m) = = origin, 说明遍历了一遍,还是没有找到workeQueues 的元素遍历了一遍,还是取不到task
if ((k = (k + 1) & m) == origin) { // continue until stable
//如果是激活状态,则进入if 体,把状态设置为失活, 但是此时是伪失活,只是把scanState 设置到control,而没有让线程真正休眠。线程还会继续扫描任务,当代码再次来到这里,且workQueues 又被遍历了一遍
// 这时候判断ss = w.scanState 就是判断此时worker有没有被其他线程用signalWorker 或者tryRelease 唤醒, 当满足 oldSum == (oldSum = checkSum) , 这个条件必须满足,当前求导的q必须为空,如果不为空,此条件被更改
//checkSum 不可能等于oldsum。由于伪失活后,扫描的一直不为空, 则说明worker 繁忙,伪失活的会继续处理,如果 扫到为空的, 说明不忙, 则退出循环由后面的awaitWorker让worker 休眠或者终止。
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
if (ss < 0 || w.qlock < 0) // already inactive
break;
int ns = ss | INACTIVE; // try to inactivate
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns);
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns;
else
w.scanState = ss; // back out
}
// worker 伪失活之后,会重新扫描
checkSum = 0;
}
}
}
return null;
}