AQS是用来构建锁或者其他同步组件的基础框架,能够实现大部分同步需求的基础,AQS基于volatile变量提供的锁的内存语义和CAS原子操作指令来实现多线程的同步机制。state同步状态量为整型,并发locks包中还提供了AbstractQueuedSynchronizer的镜像long类型state版本AbstractQueuedLongSynchronizer,支持更多的线程获取同步状态。
原理解析
等待队列节点
static final class Node {
/**共享节点模式*/
static final Node SHARED = new Node();
/** 独占节点模式 */
static final Node EXCLUSIVE = null;
/** waitStatus 状态的常量表示 */
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 前继节点
volatile Node prev;
// 后继节点
volatile Node next;
// 入队的线程
volatile Thread thread;
// 等待在condition上的下一个节点(独占模式)或者是特定的值SHARED用来标示此节点是共享模式
Node nextWaiter;
// 如果当前节点等待在共享模式上则返回true
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回前继节点,如果前继节点为null则抛出异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 默认构造函数:用来初始化头节点或者SHARED共享模式标志
Node() {
}
// 被addWaiter调用
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
// 被Condition使用
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
等待状态详解:
- CANCELLED 取消状态
由于在等待队列中等待的线程等待超时或者被中断,需要从等待队列中取消等待,之后这个状态的节点状态将不再发生变化。处于这种状态的节点会被踢出队列,被GC回收
- SIGNAL 通知状态
当前节点的线程释放了同步状态或者被取消,将会唤醒后继等待(阻塞)的节点,使得后继节点的线程得以运行。
- CONDITION 条件等待状态
当前节点等待(阻塞)在Condition上,当其他线程对Condition调用signal方法后,该节点将会从等待队列中转移到同步队列中,加入对同步状态的获取中。
- PROPAGATE 传播状态
表示在下一次共享模式同步状态获取将会无条件地被传播下去。使用共享模式的head节点有可能处于这种状态。
- INITIAL 初始状态
新建节点处于这种状态
分析思路:分析主要方法的调用流程,洞察实现原理
独占式同步状态获取与释放
主要方法是acquire和release
acquire方法流程
- acquire 独占式同步状态的获取,获取过程中忽略中断,但是最终会设置中断标志
/**
* 向外提供的独占式获取同步状态的方法
*
*
* 第一次尝试获取同步状态,成功则返回,
* 失败则将当前线程构造成节点并添加到等待队列
* 并死循环获取同步状态
* 如果线程被中断则设置当前线程的中断状态
*/
public final void acquire(long arg) {
// 第一次尝试获取同步状态,成功则返回
if (!tryAcquire(arg) &&
// 失败则构造节点,入队,死循环获取
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 设置中断标志
selfInterrupt();
}
- tryAcquire
/**
* 需要重写的自定义方法,该方法需要保证线程
* 安全的获取同步状态(CAS,用AQS提供的方法实现)
* @return 获取成功则返回true
*/
protected boolean tryAcquire(long arg) {
throw new UnsupportedOperationException();
}
- addWaiter
/**
* 为当前线程以及给定模式创建新节点并入队
*
* @param mode 节点等待模式
* @return node 新添加的节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 进行一次入队尝试,失败则有enq的死循环CAS来保证最后能入队
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 循环CAS来保证最后能入队
enq(node);
return node; // 返回新添加的等待节点
}
- enq
/**
* 线程安全地插入节点到等待队列中(死循环CAS)
* 当尾节点为null时插入前需要初始化头结点
* @param node 入队的节点
* @return t 入队节点的前继节点
*/
private Node enq(final Node node) {
// 循环CAS将节点入队
for (;;) {
Node t = tail;
// 当尾节点为null时表明:则说明等待队列为空
// 需要初始化一个头尾相等的等待队列。
// head和tail的延迟初始化。
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))// 初始化需要 CAS 设置头节点
tail = head; // 头尾相等
}
// 尾节点不为null,说明等待队列不为空,可以插入
else {
node.prev = t; // 插入节点和尾节点前向关联,防止断裂
if (compareAndSetTail(t, node)) { // CAS 插入尾节点
t.next = node; // 后向关联
return t; // 返回插入节点的前继节点
}
}
}
}
- acquireQueued
/**
* 每一个加入等待队列的节点线程通过自旋获取同步状态
* 判断自己的前继节点是否为头结点,如果是则尝试获取同步状态
* 成功则返回,否则根据前继节点是否为SIGNAL来阻塞线程
*
* @param node 入队的节点
* @return 等待过程中发生中断则返回true
*/
final boolean acquireQueued(final Node node, long arg) {
// 获取失败标志初始化为true(第一次确实失败了)
boolean failed = true;
try {
// 线程中断标志
boolean interrupted = false;
// 自旋获取同步状态
for (;;) {
// 如果为null抛出异常
final Node p = node.predecessor();
// 判断前继节点是否为head节点,
// 如果是则尝试获取同步状态
if (p == head && tryAcquire(arg)) {
// 成功获取同步状态,则将当前节点设置为head结点
// 在独占模式下设置head节点,不需要考虑线程问题,
// 因为同一时刻只有一个线程能获取同步状态
setHead(node);
// head节点的prev和thread引用为null,如果next引用也为null则将被GC
// 断开head节点的所有引用,帮助GC
p.next = null;
failed = false;
return interrupted;
}
// 如果当前节点前继不是head节点,或者是前继节点head节点但是
// 没有获取到同步状态
// 而且如果需要阻塞线程,则阻塞线程,并将线程中断标志
// 重置,并设置interrupted为重置前的中断状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
}
// 前面死循环自旋,只有成功获取才能退出循环,而且还会设置failed为false
// 所以我在想什么时候failed为true,并执行cancelAcquire操作
// 后来发现在 node.predecessor() 方法中当前继节点为null时会抛出
// NullPonitException异常,看来当前节点没有前继节点时需要取消获取的操作
// 这是由于取消状态的节点从等待队列中移除,没有关联的前节点而导致,
// 因为head节点一定是获取到同步状态的线程,在成功获取同步状态线程会
// 立刻返回。
finally {
if (failed)
cancelAcquire(node);
}
}
值得注意的是,在获取同步状态的主循环中,仅仅是记录中断状态,然后将中断状态返回,在acquire中则根据这个中断状态,在获取成功返回前设置这个中断标志,经历如下过程:
**parkAndCheckInterrupt重置返回→acquireQueued记录并返回→acquire重新设置 **
成功获取同步状态的线程在这个方法里** 只是调用setHead设置head节点,但是并没有修改head的等待状态。,当释放同步状态时会将head节点状态CAS设置为初始状态,然后唤醒后继阻塞的线程,被唤醒的线程会检查前继节点以及其状态,当获取失败(隐含有线程成功获取同步状态,并设置了新的head)**会确保前继节点的状态为SIGNAL
什么时候会执行finally语句块的内容?
比如fullyRelease方式失败,会将节点状态设置为CANCEL,详解见上面代码注释
- setHead
/**
* 将给定的节点设置为头结点
* 等待队列初始化后,只有获得同步状态的线程才能设置头结点
* 所以不需要循环CAS来设置头结点
* head节点只是获得同步状态的线程的通知阻塞线程的载体
* @param node 新的head节点
*/
private void setHead(Node node) {
head = node;
node.thread = null; // 并不保存获得同步状态线程信息
node.prev = null; // 前继节点为null
}
独占模式下获取到同步状态的线程的节点的状态不会在setHead之后发生变化,也不会影响后继节点。但是在共享模式下会将发生变化,见共享模式
- shouldParkAfterFailedAcquire
/**
* 检查并设置获取失败的节点的等待状态
* 新加入的节点要么获取同步状态要么阻塞,
* 如果当前节点是head结点,则tryAcquire获取同步状态
* 如果当前节点不是head结点,则需要判断是否需要阻塞
* 这个方法是整个acquire循环中主要的信号控制
* @param pred node的前继节点
* @param node
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的等待状态
int ws = pred.waitStatus;
// 如果前继节点为通知状态
// 表示后继阻塞的节点能够被前继节点唤醒(释放或取消),所以大胆的阻塞当前节点
// 则返回true,表示应该阻塞当前节点
if (ws == Node.SIGNAL)
return true;
// 如果为取消状态,则将当前节点的前继节点设置为
// 向前遍历不为取消状态的第一个节点,并关联彼此。
// 其实相当于在等待队列中删除取消状态的节点。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
}
// 如果前继节点不为取消状态,则表明需要一个通知的信号
// 设置前继节点为通知状态,但是不会阻塞线程。
// 并由调用者在下一次尝试保证
// 在阻塞之前不能进行获取同步状态的操作
else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
对当前节点的前继节点进行判断:
- 当为状态为SIGNAL时则返回true,
- 当为CANCEL时则删除当前节点的状态为CANCEL的前继节点,直到第一个非CANCEL节点并关联彼此。
- 状态为INITIAL、PROPAGATE和CONDITION时,则将前继节点设置为SIGNAL,确保前继节点能够通知到当前节点,并返回false,不阻塞,进行下一次循环获取。
前继节点为SIGNAL状态的线程都需要阻塞,当前继节点不为SIGNAL,要设置为SIGNAL,即始终保持前继节点的SIGNAL状态。
- parkAndCheckInterrupt
/**
* 阻塞当前线程
* @return 如果当前线程被中断则返回true
*/
private final boolean parkAndCheckInterrupt() {
// 参数this为阻塞对象,用来标识当前线程在等待的对象,
// 该对象用来问题排查和系统监控
LockSupport.park(this);
// 清除中断标志,并返回清除之前的中断状态
return Thread.interrupted();
}
- selfInterrupt
/**
* 设置当前线程的中断标志
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
- cancelAcquire
/**
* 取消CANCEL状态的节点尝试的获取操作
*
* @param node CANCEL状态的节点
*/
private void cancelAcquire(Node node) {
// 节点为null则返回
if (node == null)
return;
// 清除线程引用,帮助GC
node.thread = null;
// 跳过取消的前继节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 保存前继节点的后继节点引用
Node predNext = pred.next;
// 设置当前节点的状态为取消状态
node.waitStatus = Node.CANCELLED;
// 如果当前节点为尾节点,则将前继节点设置为尾节点
// 并将尾节点的后继节点设置为null
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
}
// 如果当前节点不为尾节点,或者为尾节点,但是设置尾节点时失败,
// 此时当前节点可能存在后继节点,替代它成为尾节点
else {
// 如果后继节点能够被通知,则将当前节点的前继节点的后继节点设置
// 为当前节点后继节点,否则直接唤醒当前节点的后继节点。
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
将取消获取的节点的状态设置为CANCEL,并剔除,然后确保调整后的节点的前继节点为SIGNAL状态。
release方法流程
- release
/**
* 释放同步状态
* @return tryRelease返回true则返回true
*/
public final boolean release(long arg) {
// 修改同步状态成功
if (tryRelease(arg)) {
Node h = head;
// 如果head节点不为null,而且状态不为初始状态
// 则唤醒head后继节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
独占模式下,只有获取到同步状态的线程才能释放,所以不存在线程安全的问题
- tryRelease
/**
* 独占模式释放同步状态,不需要保证线程并发
* 需要实现此方法,为释放同步状态修改state值
*/
protected boolean tryRelease(long arg) {
throw new UnsupportedOperationException();
}
- unparkSuccessor
/**
* 唤醒node节点存在的一个后继节点
* 注意:唤醒的节点并不一定能获取到同步状态,
* 所以setHead方法并在这里调用,而是在acquireQueued
* 中成功获取同步状态后调用
*/
private void unparkSuccessor(Node node) {
/*
* 当节点的等待状态waitStatus为负值时,比如可能通知后继阻塞节点执行等,
* 则需要重置状态值为初始值,这里没有采用循环CAS设置,即使这一次尝试由于
* CAS失败或者等待线程修改状态而导致失败都是允许的。
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 如果后继节点为null或者为CANCELLED取消状态则从尾节点开始向前遍历直到
* 找到一个为非CANCELLED状态的节点。
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 从尾节点向前遍历到node节点之前找到非CANCELLED节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果遍历后找到非CANCELLED状态的节点则唤醒节点的线程
if (s != null)
LockSupport.unpark(s.thread);// 唤醒非CANCELLED状态的线程
}
唤醒head后继的有效节点,如果head后继节点为null或者为取消状态,则从后向前遍历找到第一个有效的节点,被唤醒的节点会删除前继为取消状态的节点(shouldParkAfterFailedAcquire()方法中),唤醒的节点加入对同步状态的获取中
注意:被唤醒的线程并不一定能获得同步状态
独占式超时同步状态获取
队列同步器AbstractQueuedSynchronizer源码解析-续1
共享式同步状态获取与释放
队列同步器AbstractQueuedSynchronizer源码解析-续2
同步队列的使用方式
队列同步器AbstractQueuedSynchronizer源码解析-AQS使用
总结
从唤醒的线程竞争获取同步状态来看,AQS的获取并非绝对公平的,因为每一个新增的节点获取立马竞争同步状态,如果此时恰巧获取成功则这种获取是非公平的,失败才会添加到等待队列当中,等待队列内部竞争才是FIFO的,是公平的。
如何做到公平获取:需要tryAcquire和tryAcquireShared自定义方法中需要调用hasQueuedPredecessors方法来判断等待队列中是否含有不为当前线程的head的后继节点,如果有,则说明当前线程并不能立马去获取同步状态,而是加入等待队列中以FIFO来公平获取同步状态。
hasQueuedPredecessors方法
重入公平锁tryAcquire方法实例
AQS中等待队列的每个节点可以是独占模式的也可以是共享模式的(即独占模式和共享模式都共享自同一FIFO队列),但是一般情况下,等待队列中的所有节点的等待模式一般为同一种模式,比如ReentrantLock重入锁,仅需重写对应的获取和释放的方法。也可以所有节点不为同一等待模式,如ReadWriteLock读写锁,需要重写不同模式的获取和释放方法。
ConditionObject被定义为AQS内部公共类,用来向外暴露给调用者;ConditionObject实现了Condition接口,用于在独占模式下。使用ConditionObject必须重写isHeldExclusively方法,因为ConditionObject多处调用了这个方法。ConditionObject详解