在整个JUC框架中,ForkJoinPool 相对其他类会复杂很多,想吃透它需要有足够的耐心,ForkJoinPool两篇文章从草稿到发布,笔者前前后后使用了近两个月才完成。废话不多说了,下面正式开始。
ForkJoinPool 是JDK 7加入的一个线程池类。Fork/Join 技术是分治算法(Divide-and-Conquer)的并行实现,它是一项可以获得良好的并行性能的简单且高效的设计技术。目的是为了帮助我们更好地利用多处理器带来的好处,使用所有可用的运算能力来提升应用的性能。我们常用的数组工具类 Arrays 在JDK 8之后新增的并行排序方法(parallelSort)就运用了 ForkJoinPool 的特性,还有 ConcurrentHashMap 在JDK 8之后添加的函数式方法(如forEach等)也有运用。
注:由于 ForkJoinPool 涉及的知识太多,导致篇幅太长,所以我们对其的分析会分两篇进行,本篇我们主要来看一些比较概念性的东西,为后面我们的源码分析做准备工作。
概述
在 java.util.concurrent 包中,Fork/Join 框架主要由 ForkJoinPool、ForkJoinWorkerThread 和 ForkJoinTask 来实现,它们之间有着很复杂的联系。ForkJoinPool 中只可以运行 ForkJoinTask 类型的任务(在实际使用中,也可以接收 Runnable/Callable 任务,但在真正运行时,也会把这些任务封装成 ForkJoinTask 类型的任务);而 ForkJoinWorkerThread 是运行 ForkJoinTask 任务的工作线程。
作为线程池的一种,ForkJoinPool 并行的实现了分治算法(Divide-and-Conquer):把任务递归的拆分为各个子任务,这样可以更好的利用系统资源,尽可能的使用所有可用的计算能力来提升应用性能。首先看一下 Fork/Join 框架的任务运行机制:
ForkJoinPool 的另一个特性是它使用了work-stealing(工作窃取)算法:线程池内的所有工作线程都尝试找到并执行已经提交的任务,或者是被其他活动任务创建的子任务(如果不存在就阻塞等待)。这种特性使得 ForkJoinPool 在运行多个可以产生子任务的任务,或者是提交的许多小任务时效率更高。尤其是构建异步模型的 ForkJoinPool 时,对不需要合并(join)的事件类型任务也非常适用。
在 ForkJoinPool 中,线程池中每个工作线程(ForkJoinWorkerThread)都对应一个任务队列(WorkQueue),工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数 mode 决定),然后以FIFO的顺序随机窃取其他队列中的任务。
ForkJoinPool 中的任务分为两种:一种是本地提交的任务(Submission task,如 execute、submit 提交的任务);另外一种是 fork 出的子任务(Worker task)。两种任务都会存放在 WorkQueue 数组中,但是这两种任务并不会混合在同一个队列里,ForkJoinPool 内部使用了一种随机哈希算法(有点类似 ConcurrentHashMap 的桶随机算法)将工作队列与对应的工作线程关联起来,Submission 任务存放在 WorkQueue 数组的偶数索引位置,Worker 任务存放在奇数索引位。实质上,Submission 与 Worker 一样,只不过它被限制只能执行它们提交的本地任务,在后面的源码解析中,我们统一称之为“Worker”。
任务的分布情况如下图:
官方文档说明
单独阅读 FrokJoinPool 的源码可能会比较困难,建议大家结合官方文档。这里笔者翻译了一份中文的官方说明文档,内容比较长,有些地方翻译的比较生硬,如果有兴趣,还是推荐同学们去看原版英文文档。这里大家可以先大概看一下(也可以忽略以下内容),后面在源码解析时如果遇到不明白的地方再回到此文档中寻找答案。
1. 实现概览
ForkJoinPool 和它的内部类为一组工作线程提供主要功能和控制:非FJ线程任务放在 Submission queue,工作线程拿到这些任务并拆分为多个子任务,队列任务也可能被其他工作线程偷取。工作线程优先处理来自自身队列的任务(LIFO或FIFO顺序,参数mode决定),然后以 FIFO 的顺序随机窃取其他队列中的任务。work-stealing 框架在刚开始是为了支持树状结构并行操作,随着时间的推移它的可伸缩性优势得到扩展和优化,以更好地支持更多样化的使用。由于很多内部方法和嵌套类相互关联,它们的主要原理和描述都在下面进行说明;个别方法和嵌套类只包含关于细节的简短注释。
2. WorkQueues
大多数操作都发生在work-stealing队列中(内部类WorkQueue)。此workQueue支持三种形式的出列操作:push、pop、poll(也叫steal),push和pop只能被队列内部持有的线程调用,poll可被其他线程偷取任务时调用。和最初的 Work-Stealing 队列不同,考虑到GC问题,我们一有机会就将队列的获取槽位置空,即使是在生成大量任务的程序中,也要保持尽可能小的占用空间。为了实现这一点,我们使用CAS解决pop和poll(steal)的线程冲突问题。
关于pop和poll的区别,下面用一段简单的伪代码说明:
pop,持有队列的工作线程调用:
if ((base != top) and
(在top位的任务不为null) and
(通过CAS操作设置top位为null))
递减top值并返回任务;
poll,通常是偷取线程调用:
if ((base != top) and
(在base位的任务不为null) and
(base位没有被其他线程修改) and
(通过CAS操作设置base位为null))
增加base值并返回任务;
由于使用了 CAS 引用操作 workerQueue 数组,所以在队列 base 和 top 不必使用标志位。这些也是数组结构的队列的特性(例如ArrayDeque)。对索引的更新保证了当 top==base 时代表这个队列为空,但是如果在push、pop或poll没有完全完成的情况下,可能出现即使 base==top 但队列为非空的错误情况(isEmpty方法可以检查“移除最后一个元素操作”部分完成的情况)。所以,单独考虑 poll 操作,它并不是 wait-free 的(无等待算法)。在一个线程正在偷取任务时,另外一个线程是无法完成偷取操作的。大体上讲,我们起码有一定概率保证了阻塞性。如果一个偷取操作失败,偷取线程会选择另外一个随机目标继续尝试。所以,为了保证偷取线程能够正常执行,它必须能够满足任何正在执行的对 queue 的 poll 或 push 操作都能完成(这就是为什么我们通常使用 pollAt 方法和它的变体,在已知的 base 索引中先尝试一次,然后再考虑可替代的操作,而不直接使用可以重试的 poll 方法)。
这种方法同样也支持本地任务(Submission task)以 FIFO 模式运行(使用poll)。FIFO 和 LIFO 这两种模式都不会考虑共用性、加载、缓存地址等,所以很少能在给定的机器上提供最好的性能,但通过对这些因素进行平均,可以提供良好的吞吐量。更进一步来讲,即使我们尝试使用这些信息,也没有能利用它的基础。例如,一些任务集从缓存共用中获取到良好的性能收益,但其他任务集会因此受到它的影响。另外,虽然队列中提供了扫描功能,但是从长远看来为了吞吐量通常最好使用随机选择,而非直接选择。所以,在ForkJoinPool 中我们也就使用了一种 XorShifts(一种随机算法,有些带有不同的偏移量) 随机算法。
WorkQueue 对于提交到池中的任务也使用类似的随机插入方式。我们不能把这些被工作线程使用的任务混合同一个队列中,所以,我们会使用一种随机哈希算法(有点类似ConcurrentHashMap的随机算法)将工作队列与工作线程关联起来。ThreadLocalRandom的probe(探针值)会为选中的已存在的队列提供一个哈希值,在与其他提交任务的线程(submitters)竞争时,也可以利用probe来随机移位。实际上,submitters 就像工作线程(worker)一样,只不过他们被限制只能执行它们提交的本地任务(CountedCompleter类型任务也不能执行)。在共享模式里提交的任务需要锁来控制(在扩容情况下提供保护),我们使用了一个简单的自旋锁(qlock字段,后面详细讲解),因为 submitters 遇到一个繁忙队列时会继续尝试提交其他队列或创建新的队列-只有在 submitters 创建和注册新队列时阻塞。另外,"qlock"会在 shutdown 时饱和到不可锁定值(-1),但是解锁操作依然可以执行。
3. 池管理
work-stealing 模式的吞吐量优势来自于分散控制:工作线程主要从它们自己的队列或其他worker的队列中获取任务,速度可以超过每秒十亿。池本身的创建、激活(为运行中的任务提供扫描功能)、撤销、阻塞和销毁线程这些操作的信息都很小,只有一小部分属性我们可以全局追踪或维护,所以我们把他们封装成一系列小的数字变量,并且这些变量通常是没有阻塞或锁定的原子数。几乎所有基本的原子控制状态都被保存在两个volatile变量中(ctl和runState),这两个变量常被用来状态读取和一致性检查(此外,"config"字段持有了不可变的配置状态)。
ctl 定义为一个64位的原子类型字段,用来标识对工作线程进行添加、灭活、重新激活和对队列出列、入列操作。ctl 原子性地维护了活跃线程数、工作线程总数,和一个放置等待线程的队列。活跃线程数同样也担任静止指标的角色,当工作线程确认已经没有任务可以执行时,就递减这个值。工作队列其实是一个 Treiber 栈(无锁并发栈),它可以以最近使用的顺序来存储活跃线程。这改善了线程执行性能和任务定位,只要任务在栈的顶端,在发生争用时也可以很好的释放工作线程。当找不到 worker 时,我们使用 park/unpark 来操作已经被放到空闲 worker 栈的工作线程(使用 ctl 低32位表示)。栈顶状态也维护了工作线程的“scanState”子域:它的索引和状态(ctl),加上一个版本计数(SS_SEQ),再加上子域数(同样也可作为版本戳)提供对 Treiber 栈ABA问题的保护。
runState表示当前池的运行状态(STARTED、STOP等),同时也可作为锁保护workQueues数组的更新。当作为一个锁使用时,通常只能被一部分操作持有(唯一的异常是数组进行一次性初始化和非常规扩容),所以最多在一段自旋后总是处于可用状态。需要注意的是,awaitRunStateLock 方法(仅当使用CAS获取失败时调用)在自旋一段时间之后,使用 wait/notify 机制来实现阻塞(这种情况很少)。对于一个高竞争的锁来说这是一个很不好的设计,但多数pool在自旋限制之后都是在无锁竞争情况下运行的,所以作为一个保守的选择它运行的还是比较稳定的。因为我们没有其他的内部对象作为监视器使用,所以“stealCounter”(AtomicLong,也是在externalSubmit中延迟加载)充当了这个角色。
"runState" 和 "ctl"的用法只有在一种情况中会相互影响: 当添加一个新的工作线程时(tryAddWorker),只有在获得锁时才可以对ctl进行CAS操作。
scanState:工作线程(worker)和线程池(pool)都使用了 scanState,通过 scanState 可以管理和追踪工作线程是否为 INACTIVE(可能正在阻塞等待唤醒)状态,也可以判断任务是否为 SCANNING 状态(当两者都不是时,它就是正在运行的任务)。当一个工作线程处于灭活状态(INACTIVE),它的scanState被设置为禁止执行任务,但是即便如此它也必须扫描一次以避免队列争用。注意,scanState 的更新在队列CAS释放之后(会有延迟)。在工作线程入队后,scanState的低16位必须持有它在池中的索引,所以我们在初始化时就将索引设置好(参考 registerWorker 方法),并将其一直保存在那里或在必要时恢复它。
WorkQueue记录: ForkJoinPool 内部持有一个 WorkQueue 数组("workQueues")来记录任务工作队列。这个数组在第一次使用时(externalSubmit方法)创建,如果必要的话对其进行扩容。数组的更新操作受runState锁的保护,但可以并发读取。为了简化基于索引的操作,数组大小一定为2的幂,并且可存储null值。工作任务(Worker queues)存放在奇数位索引;共享任务(submission/external queues)存放在偶数位索引,最多64个槽位。以这种方式将它们组合在一起可以简化和加速任务扫描。
所有工作线程都是按需创建,被 submission 任务触发,然后被终止工作线程替代,或者作为阻塞线程的补偿线程。为了简化GC的工作,我们并不会直接持有工作线程的引用,而是通过索引的方式来访问工作队列(所以ForkJoinPool的代码结构看上去很麻烦)。实际上,工作队列数组充当弱引用机制,例如使用 ctl 也维护了索引的栈顶(top)域而不是直接存储引用。
闲置工作线程排队:跟高性能计算(HPC)work-stealing 框架不同,当没有立刻扫描到任务时,我们不能让工作线程无限制的自旋扫描, 除非有任务可用,否则不能开启/重启工作线程。另一方面,当新任务提交或生成时,我们必须快速地将它们推进行动。在许多情况下,通过增加时间来激活工作线程是限制整体性能的主要因素,并且JIT编译和分配使程序启动更加复杂,所以我们应尽可能地简化它。
内存排序:由于我们必须不定时的唤醒某个工作线程,为了不会丢失 signal 信号指令,就需要更强的内存排序规则(full-fence)来避免指令重排序。一些核心操作:如出列、更新ctl状态也需要 full-fence 来进行 CAS 操作。数组的读取则是使用 Unsafe 类提供的仿照 volatile 的方式。从其他线程访问 WorkQueue 的base、top 和任务数组时,也需要保证首次读取的可见性,所以,我们对"base"引用使用了 volatile 来修饰,并且在读取其他字段或引用之前总是先读取"base"。队列的持有线程也必须要保证变量的顺序更新,所以在更新时使用了内建指令(如 putOrderedObject)。
Worker(工作线程)创建:创建 worker 时,首先增加 worker 总数(作为一个保留字段),然后构造一个 ForkJoinWorkerThread。在构造新的 ForkJoinWorkerThread 期间,会调用 registerWorker 来创建对应的工作队列 workQueue,并在workQueues数组(如果需要就对数组扩容)中为之分配一个索引,然后启动线程。如果在这期间出现异常或线程工厂创建线程失败,则调用 deregisterWorker 回滚线程数和其他记录。如果工作线程创建失败,线程池就会使用比目标数更少的工作线程继续运行。如果发生异常,异常通常被传递给其他外部调用者。为每个工作线程都分配索引避免了在扫描时发生偏移。我们可以把这个数组看作是一个简单的2幂哈希表,根据元素需要来进行扩容。当需要扩容或一个工作线程被撤销并替换时,增加 seedIndex 值以保证不会发生碰撞,在这之后要尽可能的保证低碰撞。
灭活和等待:任务入队时可能会遭遇到内部竞争:当一个线程已经放弃查找任务,但还没有进入工作队列等待,此时另外一个生产任务的线程可能会错过查看(并唤醒)这个线程。当一个工作线程偷取不到任务,它就会被灭活并入队等待。很多时候,由于GC或OS调度,缺少任务只是暂时性的;为了减少这种类型的灭活,扫描者在扫描期间会计算队列的 checksum (每次偷取时,对每个队列的 base 索引进行求和)。在扫描完成后并且 checksum 已经稳定,工作线程才可以放弃查找并尝试灭活;为了避免错过唤醒,在成功入队等候再次稳定之后会再重复这个扫描过程。在这种状态下,工作线程不能 take/run 任务,除非这个任务在队列中被释放,所以工作线程本身最终会尝试释放自己或其他继承者(参见tryRelease)。另外,在一次空的扫描过程中,一个失活的工作线程在阻塞(park)前会自旋指定的次数(参见 awaitWork)。注意关于伴随着 parking 或其他阻塞的 Thread.interrupt 指令的不寻常的约定:线程的中断状态仅仅用来检查线程是否可以销毁,而且在线程阻塞时同样也会检查线程是否可销毁,所以在调用park阻塞线程之前我们会先清除中断状态(调用Thread.interrupted),这样做是主要是为了防止在用户代码中,会有一些与内部使用不相关的调用导致中断状态被设置,从而导致线程被错误地销毁。
唤醒和激活:在至少有一个可被找到并执行的任务时,工作线程才会被创建或激活。在一个先前为空的队列中进行push(工作线程或外部提交)操作时,如果有空闲工作线程正在等待就唤醒它,如果工作线程少于给定并行度(parallelism)就创建一个新的工作线程。在多数平台上,signal(unpark)指令的开销很大,在真正唤醒一个线程和它实际运行之间的时间会很长,因此,尽可能多地消除这些延迟是很有必要的。并且,失活的工作线程并非都是被阻塞,它们经常会进行二次扫描或自旋,我们通过设置/清除 WorkQueue 的"parker"引用来减少不必要的 unpark 操作。
缩减工作线程:资源空闲一定的时间后需要释放。如果池保持静止状态超过一个周期时长(IDLE_TIMEOUT,默认2秒),工作线程就会由于超时而被终止(参见 awaitWork),随着线程数量的减少和周期的增加,最终移除所有的工作线程。当剩余线程数多于2个,过量的线程在下一个静止点会被立即终止。
关闭和终止:
shutdownNow():直接终止,内部调用 tryTerminate 首先设置runState值,然后终止调用线程和其他所有工作线程,通过设置他们的 qlock 状态来帮助销毁,取消他们未处理的任务,并唤醒等待的线程,重复上述操作一直到池处于稳定状态(循环数被工作线程数量限制,至少3次)。
shutdown():首先检查池是否可以终止,依赖"ctl"内维护的的活动线程数—在awaitWork中如果池处于稳定状态并且活动线程数<=0也会调用tryTerminate进行销毁操作。不过,外部提交任务(submissions)并不依照上述条件。
tryTerminate 通过扫描队列(处于稳定状态),以确保在触发"STOP"阶段之前没有正在进行的submission任务和work任务需要处理。(注意:如果在池关闭时调用 helpQuiescePool 会发生内部冲突,因为他们都会等待静止,在 helpQuiescePool 完成执行之前 tryTerminate 都不会被触发)
4. Join任务
由于我们把许多任务都复用给一批工作线程,并且不能让它们阻塞,也不能为这些任务重新分配其他的运行时堆栈。所以就使用了一种"延续性"的形式,即使这可能不是一个好的方案,因为我们可能既需要一个非阻塞的任务,也需要它的延续性来继续运行。为此我们总结了两种策略:
Helping-帮助运行:如果偷取还未开始,为这些 joiner 安排一些它可以执行的其它任务。
Compensating-补偿运行:如果没有足够的活动线程,tryCompensate()可能创建或重新激活一个备用的线程来为被阻塞的 joiner 补偿运行。
第三种形式(在方法 tryRemoveAndExec 中实现)相当于帮助一个假想的补偿线程来运行任务:如果补偿线程窃取并执行的是被join的任务,那么 join 线程不需要补偿线程就可以直接执行它(尽管牺牲了更大的运行时堆栈,但这种权衡通常是值得的)。设想一下这种互相帮助的场景:补偿线程帮助 join 线程执行任务,反过来 join 线程也会帮助补偿线程执行任务。
helpStealer(补偿执行)使用了一种“线性帮助(linear helping)”的算法。每个工作线程都记录了最近一个从其他工作队列(或 submission 队列)偷取过来的任务("currentSteal"引用),同样也记录了当前被 join 的任务(currentJoin 引用)。helpStealer 方法使用这些标记去尝试找到偷取者并帮助它执行任务,(也就是说,从偷取任务中拿到任务并执行,“偷取者偷我的任务执行,我去偷偷取者的任务执行”),这样就可以加速任务的执行。这种算法在 ForkJoinPool 中的大概实现方式如下:
- 从 worker 到 steal 之间我们只保存依赖关系,而不是记录每个 steal 任务。有时可能需要对 workQueues 进行线性扫描来定位偷取者,但是一般不需要,因为偷取者在偷取任务时会把他的索引存放在在 hint 引用里。一个 worker 可能进行了多个偷取操作,但只记录了其中一个偷取者的索引(通常是最近的那个),为了节省开销,hint 在需要时才会记录。
- 它是相对“浅层的”,忽略了嵌套和可能发生的循环相互偷取。
- "currentJoin"引用只有在 join 的时候被更新,这意味着我们在执行生命周期比较长的任务时会丢失链接,导致GC停转(在这种情况下利用阻塞通常是一个好的方案)。
- 我们使用 checksum 限制查找任务的次数,然后挂起工作线程,必要时使用其他工作线程替换它。
注意:CountedCompleter 的帮助动作不需要追踪"currentJoin":helpComplete 方法会获取并执行在同一个父节点下等待的所有任务。不过,这仍然需要对 completer 的链表进行遍历,所以使用 CountedCompleters 相对于直接定位"currentJoin"效率要低。
补偿执行的目的不在于在任何给定的时间内保持未阻塞线程的目标并行数。这个类之前的版本为所有阻塞的join任务都提供即时补偿,然而,在实践中,绝大多数阻塞都是由GC和其他JVM或OS活动产生的瞬时的附带影响,这种情况下使用补偿线程替换工作线程会使情况变得更糟。现在,通过检查 WorkQueue.scanState 的状态确认所有活动线程都正在运行,然后使用补偿操作消除多余的活跃线程数。补偿操作通常情况下是被忽略运行的(容忍少数线程),因为它带来的利益很少:当一个有着空等待队列的工作线程在 join 时阻塞,它仍然会有足够的线程来保证活性,所以不需要进行补偿。补偿机制可能是有界限的。commonPool的界限(commonMaxSpares)使 JVM 在资源耗尽之前能更好的处理程序错误和资源滥用。用户可能通过自定义工厂来限制线程的构造,所以界限的作用在这种 pool 中是不精确的。当线程撤销(deregister)时,工作线程的总数会随之减少,不用等到他们退出并且资源被 JVM 和 OS 回收时才减少工作线程数,所以活跃线程在此瞬间可能会超过界限。
5. Common Pool
在 ForkJoinPool 静态初始化之后 commonPool 会一直存在,并且在应用中共享,使用 common pool 通常可以减少资源占用(它的线程在空闲一段时间后可以回收再使用)。大多数初始操作会发生在首次提交任务时,在方法 externalSubmit 中进行。
当外部线程提交任务到 commonPool 时,在 join 过程中他们也可以帮助执行子任务(参见 externalHelpComplete 和相关方法)。这种 caller-helps 策略是合理的,它会设置 commonPool 并行度为1(或更多,但小于可用内核的总数),或者对于纯粹的caller-runs(调用者运行)直接设置为0。我们不需要记录外部提交任务是否属于 commonPool—如果不是,外部帮助方法会快速返回结果。
当系统配置中有一个SecurityManager(安全管理)时,我们使用InnocuousForkJoinWorkerThread代替ForkJoinWorkerThread。这些工作线程没有设置许可,不属于任何明确的用户线程组,并且在执行完任何顶层任务后消除所有的ThreadLocal变量(参见 WorkQueue.runTask)。这些相关的机制(主要在ForkJoinWorkerThread中)都是依赖JVM的,而且为了达到预期效果,必须访问特定的线程域。
小结
本篇的主要目的是让同学们先对 ForkJoinPool 有一个简单的认识,源码分析我们放到下一篇来讲,也可以方便大家对照官方说明来看源码实现。