Java并发编程源码分析系列:
- 分析Java线程池的创建
- 分析Java线程池执行原理
- 分析Java线程池Callable任务执行原理
- 分析ReentrantLock的实现原理
- 分析CountDownLatch的实现原理
- 分析同步工具Semaphore和CyclicBarrier的实现原理
- 分析Java延迟与周期任务的实现原理
看Kotlin的协程时,看到内部实现使用了ForkJoinPool,它实现于ExecutorService,但又和我们常用的ThreadPoolExecutor原理不同,是另一种实现方式。顾名思义,ForkJoinPool运用了Fork/Join原理,使用“分而治之”的思想,将大任务分拆成小任务分配给多个线程执行,最后合并得到最终结果,加快运算。
最核心的思想可以这样描述:
if(任务很小){
直接计算得到结果
}else{
分拆成N个子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
这也是使用ForkJoinPool的模板,举一个简单的例子,累加1到100,单线程用循环计算易如反掌,那用多线程怎么写?这时,用ForkJoinPool正合适。
private class CountTask(val start: Int, val end: Int) : RecursiveTask<Int>() {
override fun compute(): Int {
var sum = 0
if (end - start <= 5) {
for (i in start..end) {
Thread.sleep(500)
sum += i
}
println("${Thread.currentThread().name}-sum from $start to $end with result $sum")
} else {
val mid = (start + end) / 2
val leftTask = CountTask(start, mid - 1)
val rightTask = CountTask(mid, end)
//切分大任务
leftTask.fork()
rightTask.fork()
//合并小任务结果
sum += leftTask.join()
sum += rightTask.join()
}
return sum
}
}
fun main(args: Array<String>) {
val pool = ForkJoinPool.commonPool()
println("Pool init:$pool")
val task = pool.submit(CountTask(1, 100))
println("total:${task.get()}")
try {
pool.awaitTermination(10, TimeUnit.SECONDS)
} catch (e: InterruptedException) {
println(e.message)
}
pool.shutdown()
}
ForkJoinPool的使用很简单,教程就不在这里说了,本文目标是了解它的执行原理。对比前面几篇文章,ForkJoinPool的代码难得多,里面有很多位运算需要细细琢磨才能弄懂,我也花了两个星期时间查资料看代码。
ForkJoinPool在jdk1.7引入,在jdk1.8进行了优化,本文的源码基于jdk1.8最新版本。
work-stealing
ForkJoinPool核心是work-stealing算法,翻译过来叫"工作窃取"算法,有点别扭,还是叫work-stealing吧。
ForkJoinPool里有三个重要的角色:
- ForkJoinWorkerThread(下文简称worker):包装Thread;
- WorkQueue:任务队列,双向;
- ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。
ForkJoinPool使用数组保存所有WorkQueue(下文经常出现的WorkQueue[]),每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。
- 没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;
- 属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。
WorkQueue是一个双端队列,同时支持LIFO(last-in-first-out)的push和pop操作,和FIFO(first-in-first-out)的poll操作,分别操作top端和base端。worker操作自己的WorkQueue是LIFO操作(可选FIFO),除此之外,worker会尝试steal其他WorkQueue里的任务,这个时候执行的是FIFO操作。
分开两端取任务的好处:
- LIFO操作只有对应的worker才能执行,push和pop不需要考虑并发;
- 拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算。
光看概念一知半解,我们进入ForkJoinPool的代码。本文首先从构造函数和类开始了解ForkJoinPool的基本参数,下篇再详细过一遍流程。
初始化一个ForkJoinPool
val pool = ForkJoinPool.commonPool()
public static ForkJoinPool commonPool() {
// assert common != null : "static init error";
return common;
}
获取ForkJoinPool很简单,直接调用commonPool()。注意,这个方法是jdk1.8才加的,也是推荐的方法,满足大部分场景。
static{
//...
common = java.security.AccessController.doPrivileged
(new java.security.PrivilegedAction<ForkJoinPool>() {
public ForkJoinPool run() { return makeCommonPool(); }});
//...
}
private static ForkJoinPool makeCommonPool() {
//...
return new ForkJoinPool(parallelism, factory, handler, LIFO_QUEUE,
"ForkJoinPool.commonPool-worker-");
}
common在static{}里创建,调用的是makeCommonPool(),最终调用ForkJoinPool的构造函数。
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode;
long np = (long)(-parallelism); // offset ctl counts
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
parallelism默认是cpu核心数,ForkJoinPool里线程数量依据于它,但不表示最大线程数,不要等同于ThreadPoolExecutor里的corePoolSize或者maximumPoolSize。
factory是线程工厂,不是新东西了,默认实现是DefaultForkJoinWorkerThreadFactory。workerNamePrefix是其中线程名称的前缀,默认使用“ForkJoinPool-*”
config保存不变的参数,包括了parallelism和mode,供后续读取。mode可选FIFO_QUEUE和LIFO_QUEUE,默认是LIFO_QUEUE,具体用哪种,就要看业务。
ctl是ForkJoinPool中最重要的控制字段,将下面信息按16bit为一组封装在一个long中。
- AC: 活动的worker数量;
- TC: 总共的worker数量;
- SS: WorkQueue状态,第一位表示active的还是inactive,其余十五位表示版本号(对付ABA);
- ID: 这里保存了一个WorkQueue在WorkQueue[]的下标,和其他worker通过字段stackPred组成一个TreiberStack。后文讲的栈顶,指这里下标所在的WorkQueue。
TreiberStack:这个栈的pull和pop使用了CAS,所以支持并发下的无锁操作。
AC和TC初始化时取的是parallelism负数,后续代码可以直接判断正负,为负代表还没有达到目标数量。另外ctl低32位有个技巧可以直接用sp=(int)ctl取得,为负代表存在空闲worker。
线程池缺不了状态的变化,记录字段是runState,具体介绍在后面的“ForkJoinPool状态修改”。
任务ForkJoinTask
ForkJoinPool执行任务的对象是ForkJoinTask,它是一个抽象类,有两个具体实现类RecursiveAction和RecursiveTask。
public abstract class RecursiveAction extends ForkJoinTask<Void> {
protected abstract void compute();
public final Void getRawResult() { return null; }
protected final void setRawResult(Void mustBeNull) { }
protected final boolean exec() {
compute();
return true;
}
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
V result;
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
protected final boolean exec() {
result = compute();
return true;
}
}
ForkJoinTask的抽象方法exec由RecursiveAction和RecursiveTask实现,它被定义为final,具体的执行步骤compute延迟到子类实现。很容易看出RecursiveAction和RecursiveTask的区别,前者没有result,getRawResult返回空,它们对应不需要返回结果和需要返回结果两种场景。
ForkJoinTask里很重要的字段是它的状态status,默认是0,当得出结果时变更为负数,有三种结果:
- NORMAL
- CANCELLED
- EXCEPTIONAL
除此之外,在得出结果之前,任务状态能够被设置为SIGNAL,表示有线程等待这个任务的结果,执行完成后需要notify通知,具体看后文的join。
ForkJoinTask在触发执行后,并不支持其他什么特别操作,只能等待任务执行完成。CountedCompleter是ForkJoinTask的子类,它在子任务协作方面扩展了更多操作。我们聚焦ForkJoinPool主线流程,CountedCompleter相关内容另文再介绍。
WorkQueue
WorkQueue是一个双端队列,它定义在ForkJoinPool类里。
scanState描述WorkQueue当前状态:
- 偶数表示RUNNING
- 奇数表示SCANNING
- 负数表示inactive
stackPred是WorkQueue组成TreiberStack时,保存前者的字段。
volatile int base;
int top;
base和top分别指向WorkQueue的两端,小小区别是base带上了volatile,回答了对top端push和pop不需要考虑并发这个优点。
操作WorkQueue前需要锁定,记录在字段qlock:
- 1:锁定;
- 0:未锁定;
- 负数:对应的worker已经撤销注册,WorkQueue也就终止使用。
WorkQueue也有config,不要和ForkJoinPool的config混淆了。WorkQueue的config记录了在WorkQueue[]的下标和当前mode。
打印ForkJoinPool信息
上面讲了这么多ForkJoinPool参数,可以使用get方法单独获取,也可以直接打印ForkJoinPool对象,toString被重写了。
public String toString() {
// Use a single pass through workQueues to collect counts
long qt = 0L, qs = 0L; int rc = 0;
AtomicLong sc = stealCounter;
long st = (sc == null) ? 0L : sc.get();
long c = ctl;
WorkQueue[] ws; WorkQueue w;
if ((ws = workQueues) != null) {
for (int i = 0; i < ws.length; ++i) {
if ((w = ws[i]) != null) {
int size = w.queueSize();
if ((i & 1) == 0)
qs += size;
else {
qt += size;
st += w.nsteals;
if (w.isApparentlyUnblocked())
++rc;
}
}
}
}
int pc = (config & SMASK);
int tc = pc + (short)(c >>> TC_SHIFT);
int ac = pc + (int)(c >> AC_SHIFT);
if (ac < 0) // ignore transient negative
ac = 0;
int rs = runState;
String level = ((rs & TERMINATED) != 0 ? "Terminated" :
(rs & STOP) != 0 ? "Terminating" :
(rs & SHUTDOWN) != 0 ? "Shutting down" :
"Running");
return super.toString() +
"[" + level +
", parallelism = " + pc +
", size = " + tc +
", active = " + ac +
", running = " + rc +
", steals = " + st +
", tasks = " + qt +
", submissions = " + qs +
"]";
}
这些参数都好理解,上面都说过了。有一点,要注意active和running的区别。一般来说,两者是相等的,正在running的worker肯定是active的,但不能排除在compute里主动阻塞线程,这个时候,worker虽然是active,但没有running。判断running用了下面这个方法:
final boolean isApparentlyUnblocked() {
Thread wt; Thread.State s;
return (scanState >= 0 &&
(wt = owner) != null &&
(s = wt.getState()) != Thread.State.BLOCKED &&
s != Thread.State.WAITING &&
s != Thread.State.TIMED_WAITING);
}
ForkJoinPool状态修改
- STARTED
- STOP
- TERMINATED
- SHUTDOWN
- RSLOCK
- RSIGNAL
runState记录了ForkJoinPool的运行状态,除了SHUTDOWN是负数,其他都是正数。前面四种不用说了,线程池标准状态流转。在多线程环境修改runState,不能简单想改就改,需要先获取锁,RSLOCK和RSIGNAL就用在这里。
private int lockRunState() {
int rs;
return ((((rs = runState) & RSLOCK) != 0 ||
!U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
awaitRunStateLock() : rs);
}
修改前调用lockRunState锁定,检查当前状态,尝试一次使用CAS修改runState为RSLOCK。需要状态变化的机会很少,大多数时间一次就能成功,但不能排除少几率的竞争,这时候进入awaitRunStateLock。
private int awaitRunStateLock() {
Object lock;
boolean wasInterrupted = false;
for (int spins = SPINS, r = 0, rs, ns;;) {
//1
if (((rs = runState) & RSLOCK) == 0) {
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
if (wasInterrupted) {
try {
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
}
}
return ns;
}
}
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0)
--spins;
}
//2
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
Thread.yield(); // initialization race
//3
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
synchronized (lock) {
if ((runState & RSIGNAL) != 0) {
try {
lock.wait();
} catch (InterruptedException ie) {
if (!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true;
}
}
else
lock.notifyAll();
}
}
}
}
在自旋中,第一步,mark1再次尝试修改runState为RSLOCK,成功直接返回。
mark2检查ForkJoinPool初始化情况,这里没有额外多写个变量做锁,直接利用了stealCounter这个原子变量。因为初始化时(后文的externalSubmit),才会对stealCounter赋值。所以当状态不是STARTED或者stealCounter为空时,让出线程等待。
mark3处,线程不会无限制自旋尝试,会利用wait/notify进入阻塞等待。RSIGNAL代替原状态,表示有线程进入了等待,解锁时要处理。在高并发下,这不是一个好的设计,但进入这里的几率很低,作为兜底还是可以的。
private void unlockRunState(int oldRunState, int newRunState) {
if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
Object lock = stealCounter;
runState = newRunState; // clears RSIGNAL bit
if (lock != null)
synchronized (lock) { lock.notifyAll(); }
}
}
解锁的逻辑就比较简单,如果顺利将状态修改为目标状态,成功大吉。否则表示有别的线程进入了wait,需要调用notifyAll唤醒,重新尝试竞争。
辅助对象
ForkJoinPool代码用到了一些支持并发的类,先学习储备着。
ThreadLocalRandom
我们以前常用的Random,在并发下,多个线程同时计算种子需要用到同一个原子变量。由于更新操作使用CAS,同时执行只有一个线程成功,其他线程的大量自旋造成性能损失,ThreadLocalRandom继承Random,对此进行了改进。
顾名思义,ThreadLocalRandom运用了ThreadLocal,每个线程内部维护一个种子变量,多线程下计算新种子时使用线程自己的种子变量进行更新,避免了竞争。
Thread为ThreadLocalRandom新增了三个变量:
- threadLocalRandomSeed
- threadLocalRandomProbe
- threadLocalRandomSecondarySeed
每个线程默认的probe是0,当线程调用ThreadLocalRandom.current时,会初始化seed和probe,维护在线程内部。
Unsafe
ForkJoinPool经常使用的U对象实质是Unsafe类,主要提供了native的硬件原子操作,包括内存操作、CAS、线程挂起和恢复等函数。这个类我们不能直接使用,了解它的常用函数即可。
伪共享
@sun.misc.Contended标记,避免伪共享,具体自行了解。