基于 CAS自旋实现的轻量级锁在恶性自旋时会消费大量的CPU资源。解决这个问题有2种方案:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSyncronizer(抽象同步器类,简称为AQS)。
AQS的内部队列
AQS是CLH队列的一个变种,原理和CLH类似。AQS队列内部维护的是一个FIFO双向链表,其中的每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中:当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点。
AQS的内部结构可以参考下图。
AQS核心成员
AQS基本模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各类锁去实现。
状态标志位
AQS 中维持了一个单一的 volatile 修饰的状态信息是 state,AQS使用int类型的state标志锁的状态,可以理解为锁的同步状态。
// 同步状态
private volatile int state;
state 因为使用了 volatile 保证了操作的可见性,AQS提供了 getState()
、setState()
来获取和设置同步状态。
// 获取同步状态
protected final int getState() {
return state;
}
// 设置同步状态
protected final void setState(int newState) {
state = neWState;
}
// 通过CAS设置同步状态
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.comapreAndSwapInt(this, stateOffset, expect, update);
}
由于 setState()
无法保证原子性,因此AQS给我们提供了 compareAndSetState()
方法利用底层 Unsafe
的 CAS机制来实现原子性。
AbstractQueueSynchronizer
继承了 AbstarctOwnableSynchronizer
,这个基类只有一个变量叫 exclusiveOwnerThread
,表示当前占用该锁的线程,并且提供了相应的 get()
和 set()
方法。具体如下:
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
// 表示当前占用该锁的线程
private transient Thread exlusiveOwnerThread;
...
}
队列节点类
AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类 Node 定义。
static final class Node {
// 节点等待状态值:取消状态
static final int CANCELLED = 1;
// 节点等待状态值:标识后继线程处于等待状态
static final int SIGNAL = -1;
// 节点等待状态值:标识当前线程正在进行条件等待
static final int CONDITION = -2;
// 节点等待状态值:标识下一次共享锁的acquireShared操作需要无条件传播
static final int PROPAGATE = -3;
// 节点状态
// 普通的同步节点初始值为0,条件等待节点的初始值为CONDITION(-2)
volatile int waitState;
// 节点所对应的线程,为抢锁线程或者条件等待线程
volatile Thread thread;
// 前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 此属性指向下一个条件等待节点
Node nextWaiter;
}
waitStatus 属性
每个节点与等待线程关联,每个节点维护一个状态 waitStatus,它的各个常量值具体如下:
- CANCELLED(1)。1表示该线程节点已经释放(超时、中断),已取消的节点不会再阻塞,不会参与竞争,会一直保持取消状态。
- SIGNAL(-1)。表示其后继的节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,就会通知后继节点,使后继节点的线程得以运行。
- CONDITION(-2)。表示该线程在条件队列中阻塞,表示节点在等待队列中。当持有锁的线程调用了CONDITION的
signal()
方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁。 - PROPAGATE(-3)。表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用。
- 0。表示当前节点处于初始状态。
thread成员
Node 的 thread 成员用来存放进入AQS队列中的线程引用,Node的nextWaiter成员用来执行自己的后继等待节点。
抢占类型常量标识
Node 节点还定义了两个抢占类型常量标识,SHARED和EXCLUSIVE,具体如下:
// 标识节点在抢占共享锁
static final Node SHARED = new Node();
// 表示节点在抢占独占锁
static final Node EXCLUSIVE = null;
FIFO双向同步队列
AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点 head 和 tail 记录队首和队尾元素,元素的节点类型为 Node 类型,具体如下:
/* 首节点的引用 */
private transient volatile Node head;
/* 尾节点的引用 */
private transient volatile Node tail;
AQS 的首尾节点都是懒加载的,需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。尾节点只在有新线程阻塞时才被创建。
JUC显式锁和AQS的关系
AQS是一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两种方式,该类是使用模板模式来实现的,成为构建锁和同步器的框架,使用该类可以简单且高效地构造出应用广泛的同步器。
JUC中的显式锁、线程同步工具等,内部都使用了AQS作为等待队列。
AQS中的模板模式
模板模式
模板模式是类的行为模式。准备一个抽象类,将部分的逻辑以具体方法的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类提供不同的方式实现这些抽象方法,从而对剩余的逻辑有不同的实现。
模板模式的关键在于:父类提供框架性的公共逻辑,子类提供个性化的定制逻辑。
模板方法(Template Method)也被称为骨架方法,主要定义了整个方法需要实现的业务操作的算法框架。
钩子方法(Hook Method)是被模板方法的算法框架所调用的,由子类提供具体的实现方法。
AQS的模板流程
AQS定义了两种资源共享方式:
- Exclusive(独占锁):只有一个线程能占有锁资源。
- Share(共享锁):多个线程可以同时占有锁资源。
AQS 为不同的资源共享方式提供了不同的模板流程。
AQS中的钩子方法
自定义同步器时,AQS中需要重写的钩子方法大致如下:
- tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true。
- tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true。
- tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true。
- isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。
以上钩子的默认实现会抛出 UnsupportOperationException 异常。
通过AQS实现一把简单的独占锁
本部分模拟 ReentrantLock 实现一个简单的独占锁,真实的 ReentrantLock 要复杂很多。
public class SimpleMockLock implements Lock {
private final Sync sync = new Sync();
@Override
public void lock() {
sync.acquire(1);
}
@Override
public void unlock() {
sync.release(1);
}
private static class Sync extends AbstractQueuedSynchronizer {
// 钩子方法
protected boolean tryAcquire(int arg) {
// CAS更新状态值为1
if(compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 钩子方法
protected boolean tryRelease(int arg) {
if(Thread.currentThread() != getExclusiveOwnerThread()) {
throw new IllegalMonitorStateException();
}
if(getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
}
AQS抢占锁的原理
下面通过AQS中的方法讲解一下AQS抢占锁的原理。
AQS模板方法:acquire
acquire 是AQS 封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,编码实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire 至少执行一次 tryAcquire 钩子方法。若调用成功,则 acquire 直接返回,表示已经抢到锁,若不成功,则将线程加入等待队列。
钩子实现: tryAcquire
tryAcquire 是需要实现的钩子方法,可以参照我们前面实现的 SimpleMockLock。
直接入队:addWaiter
在 acquire 模板方法中,如果钩子方法 tryAcquire 尝试获取同步状态失败的话,就构造同步节点,通过 addWaiter 方法将该节点加入同步队列的队尾。
private Node addWaiter(Node mode) {
// 创建新节点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
Node pred = tail;
if (pred != null) {
node.prev = pred;
// 现场时通过AQS方式修改尾节点为最新的节点
// 如果修改陈宫,将节点加入队列的尾部
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
enq(node);
return node;
}
addWaiter 方法中,首先需要构造一个 Node 对象,其中有两个参数,第一个是当前线程,第二个是Node节点,可能值为 SHARED 或 EXCLUSIVE。
自旋入队:enq
addWaiter 第一次尝试在尾部加节点失败,意味着有并发抢锁发生,需要进行自旋,enq 方法通过 CAS 自旋将节点添加到队列尾部。
/**
* 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
* 如果依然不存在,就把当前线程作为 head 节点
* 插入节点后,调用acquireQueued() 进行阻塞
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
/**
* CAS 操作head指针,仅仅被enq()调用
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS 操作tail指针,仅仅被enq()调用
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
自旋抢占:acquireQueued
在节点入队之后,启动自旋抢锁的流程,aquireQueued方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:
- 头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
- 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自选检查当前节点的前驱节点是否为头节点,才能获取锁
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 节点中的线程循环地检查自己的前驱节点是否为head节点
// 前驱节点是head时,进一步调用子类的 tryAcquire 实现
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
// 如果需要挂起,调用 parkAndCheckInterrupt 方法挂起当前线程,直到被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true; // 若两个操作都是true,则置true
}
} finally {
// 如果等待过程中没有成功获取资源,
// 那么取消节点在队列中的等待
if (failed)
// 取消等待,将当前节点从队列中移除
cancelAcquire(node);
}
}
为了不浪费资源,acquireQueued自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行 shouldParkAfterFailedAcquire、parkAndCheckInterrupt来达到阻塞的效果。
挂起预判:shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire 方法的主要功能是,将当前节点的有效前驱节点(不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。
具体分为三种情况:
- 如果前驱节点的状态为-1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
- 如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好,调整前驱节点的next指针为自己。
- 如果其他情况:-3、-2、0,那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // 获取前驱节点的状态
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
线程挂起:parkAndCheckInterrupt
parkAndCheckInterrupt 的主要任务是暂停当前线程,具体如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 调用park()使线程进入waiting状态
return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
}
AQS 会把所有等待的线程构成一个阻塞等待队列,当一个线程执行完 lock.unlock()
,会激活其后继节点,通过 LockSupport.unpark(postThread)
完成后继线程的唤醒。
AQS 的两个关键点:节点的入队和出队
理解AQS的一个关键点是掌握节点的入队和出队。
节点的自旋入队
节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:
如果AQS的队列非空,新节点入队的插入位置在队列的尾部,并且通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。
如果AQS的队列为空,新节点入队时,AQS 通过 CAS 方法将新节点设置为头节点 head,并将 tail 指针指向新节点。
private Node enq(final Node node) {
for (;;) { // 自旋入队
Node t = tail;
if (t == null) {
// 队列为空,初始化尾节点和头节点为新节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 如果队列不为空,将新节点插入队列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
节点的出队
节点的出队算法在 acquireQueued()
方法中,这是一个模板方法,acquireQueued()
方法不断在前驱节点上自旋(for 循环),如果前驱节点是头节点并且当前线程使用钩子方法 tryAcquire()
获得了锁,就移除头节点,将当前节点设置为头节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 中断标记
boolean interrupted = false;
for (;;) {
// 获取当前线程节点的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点,则使当前线程尝试获取锁资源(tryAcquire方法忘了回头看第五步)
if (p == head && tryAcquire(arg)) {
// 如果当前程线程获取锁资源成功,则将当前线程节点设置为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 根据前驱节点p的等待状态判断是否要将当前线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 生成CANCELLED状态节点,并唤醒节点
if (failed)
cancelAcquire(node);
}
}
节点加入队列尾部后,如果其前驱节点不是头节点,通常情况下,该新节点所绑定的线程会被无限期阻塞,而不会去执行无效循环,从而导致CPU资源的浪费。
对于公平锁,头节点一般是占用锁的节点,在释放锁时,会唤醒其后稷街店所绑定的线程,后继节点的线程被唤醒后会重新执行自旋抢锁逻辑。
AQS 释放锁的原理
AQS 释放锁分成 3 个部分:
release 模板方法
首先会调用 AQS 的 release
模板方法,代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
可以看到其中主要有 2 个方法,一个是 tryRelease
钩子方法,该方法会尝试释放当前线程持有的资源,由子类根据具体业务提供具体实现,执行成功后返回 true
。
另一个方法是 unparkSucessor()
,用来唤醒后继节点,代码如下。
private void unparkSuccessor(Node node) {
// 获取节点状态,释放锁的节点,也就是头节点
int ws = node.waitStatus;
// 若头节点状态小于0,则将其置为0,表示初始状态
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到后头的一个节点
if (s == null || s.waitStatus > 0) {
// 如果新节点已经被取消
s = null;
// 从队列尾部开始,往前去找醉千年的一个 waitStatus 小于0的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 唤醒后继节点对应的线程
if (s != null)
LockSupport.unpark(s.thread);
}
ReentrantLock 抢锁流程
下面分别对 ReentrantLock 的公平锁和非公平锁的实现进行讲述。
ReentrantLock 非公平锁的抢占流程
总体流程图如下:
非公平锁的同步器子类
ReentrantLock 为非公平锁实现了一个内部的同步器—— NofairSync
,器显式锁获取方法 lock()
的源码如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
首先用一个CAS操作判断 state
是不是0(表示当前锁未被占用),如果是0就把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用一个锁时,CAS操作只能保证一个线程操作成功,剩下的要排队。
非公平体现在:如果占用锁的线程刚刚释放锁,state
置为0,而排队等待锁 的线程还未唤醒,新来的线程就直接抢占了该锁,那么久“插队”了。
非公平抢占的钩子方法:tryAcquire
如果非公平抢占没有成功,非公平锁的 lock
汇之星模板方法 acquire
,首先会调用钩子方法 tryAcquire
,非公平抢占的钩子方法实现如下:
static final class NonfairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 先直接获取锁的状态
int c = getState();
if (c == 0) {
// 如果内部队列首节点的线程执行晚了,它会将锁的state置为0
// 当前抢锁线程的下一步就是直接进行抢占
// 发现state是空的,就直接拿来加锁使用
if (compareAndSetState(0, acquires)) {
// 1.利用CAS自旋方式确认当前state确实为0,然后设置acquire(1)
setExclusiveOwnerThread(current);
// 设置当前执行的线程,直接返回true
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 2.当前线程和执行中的线程是同一个,也就意味着可重入操作
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
// 表示当前锁被1个线程重复获取了nextc次
return true;
}
// 否则就返回false,表示没有成功获取当前锁,进入排队过程
return false;
}
...
}
其核心思想是当前线程尝试获取锁的时候,如果发现锁的状态为0,则直接尝试将锁拿过来,而不会考虑其他排队节点。
ReentrantLock 公平锁的抢占流程
ReentrantLock 公平锁的抢占流程如下:
公平锁的同步器子类
ReentrantLock 为公平锁实现了一个内部的同步器——FairSync,其显式锁获取方法 lock
的代码如下:
static final class FairSync extends Sync {
final void lock() {
acquire(1);
}
...
}
其核心思想是通过 AQS 模板方法进入队列入队操作。
公平抢占的钩子方法:tryAcquire
公平锁的 lock
会执行模板方法 acquire
,该方法首先会调用钩子方法 tryAcquire
,其实现如下:
static final class FairSync extends Sync {
...
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 锁状态
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁的钩子方法中,首先判断是否有后继节点,如果有,并且当前线程不是锁的占有线程,钩子方法就返回 false
,模板方法会进入排队的执行流程。
是否有后继节点的判断
FairSync 是否有后继节点的判断代码如下:
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
执行场景大致如下:
当
h != t
不成立的时候,说明h
头节点,t
尾节点要么是同一节点。要么都是null
,此时返回false
,表示没有后继节点。当
h != t
成立的时候,进一步检查head.next
是否为null
,如果为null
,返回true
,这种场景在有其他线程第一次正在入队时可能会出现。如果
h != t
成立,head.next != null
,判断head.next
是不是当前线程,如果是就返回false
,不是就返回true
。
AQS 条件队列
Condition
是 JUC 用来代替传统 Object
的 wait/notify
线程间通信与写作机制的新组件,它更加的高效。
Condition 基本原理
ConditionObject 类是实现条件队列的关键,每个 ConditionObject 对象都维护一个单独的条件等待队列。每个 ConditionObject 对应一个条件队列,它记录该队列的头结点和尾节点。
public class ConditionObject implements Condition, java.io.Serializable {
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
}
一个 Condition 对象是一个单条件的等待队列.
在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java 内置锁上只有唯一的一个等待队列。比如,我们可以使用 newCondition 创建两个等待队列,具体如下:
private Lock lock = new ReentrantLock();
//创建第一个等待队列
private Condition firstCond = lock.newCondition();
//创建第二个等待队列
private Condition secondCond = lock.newCondition();
await等待方法原理
当线程调用 await
方法时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await
方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。
在 await 方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中的 head 节点的下一个节点,方法代码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 1
int savedState = fullyRelease(node); // 2
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 3
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 4
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // 5
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
整体流程如下:
- 执行
await
时,会新创建一个节点并放入 Condition 队列尾部。 - 然后释放锁,并唤醒AQS同步队列中的头节点的后一个节点。
- 然后执行 while 循环,将该节点的线程阻塞,直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出 while 循环。
- 退出循环后,开始调用
acquireQueued()
不断尝试拿锁。 - 拿到锁后,会清空 Condition 队列中被取消的节点。
创建一个新节点并放入 Condition 队列尾部的工作由 addCondtionWaiter 方法完成。
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点取消,重新定位尾节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 创建一个新Node,作为等待节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 将新Node加入等待队列
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
signal唤醒方法原理
线程在某个 ConditionObject 对象上调用 signal
方法后,等待队列中的 firstWaiter
会被加入同步队列中,等待节点被唤醒,流程如下:
public final void signal() {
// 如果当前线程不是持有该锁的线程,就抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); // 唤醒节点
}
// 执行唤醒
private void doSignal(Node first) {
do {
// 出队的代码写的很巧妙
// first出队,firstWaiter头部指向下一个节点,自己的nextWaiter
if ( (firstWaiter = first.nextWaiter) == null)
// 如果第二个节点为空,则尾部也为空
lastWaiter = null;
// 将原来头部first的后继置空,help for GC
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 将被唤醒的节点转移到同步队列
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
signal
方法的整体流程如下:
通过
enq()
方法自旋讲条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点。如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程,否则节点在同步队列的尾部,参与排队。
同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行
condition.wait()
语句后面的临界区代码。
AQS实际应用
JUC 的同一架构如下图所示。
AQS 建立在 CAS 原子操作和 volatile 可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架使用很广泛。