前言
AQS是大部分同步锁的核心框架,比如ReentrantLock,闭锁 (CountDownLock),读写锁 (ReentrantReadWriteLock)都是基于AQS实现的。我们将通过源码来彻底讲清楚AQS的基本原理。
AQS最核心的内容
AQS中有一个最核心的状态值state,通过赋予state不同的意义实现不同的功能。比如ReentrantLock定义状态值state的意义获取到排他锁的线程重入的次数,如果state为0,就说明当前没有线程获取到排他锁,如果state大于0,则说明当前已经有一个线程获取到了排他锁,其他线程想要获取排他锁就会阻塞。再不如ReentrantReadWriteLock将状态值state的高16定义为获取到共享锁的线程们重入的次数的总和,将状态值state的低16位定义位获取到排他锁的线程重入的次数。
AQS的另一个核心东西就是阻塞队列,在线程获取共享锁或排他锁失败后都要被加入到阻塞队列并等待被唤醒。拿最复杂的ReentrantReadWriteLock打个比方,当state状态值的低16位大于0时,说明当前已经有一个线程获取到了排他锁,这时其他线程不管想要获取什么锁都会被加入阻塞队列并被挂起。当state状态值的高16位大于 0,就说明有线程获取了读锁,并且阻塞队列为空,这时只有想要获取读锁的线程才能获取锁成功,想要获取写锁的线程会加入阻塞队列并被挂起。当state状态值的高16位大于 0,阻塞队列的第一个线程尝试获取排他锁,这时线程不管想要获取什么锁都会被加入阻塞队列并被挂起。当state状态值的高16位大于0,阻塞队列的第一个线程尝试获取共享锁,这时想要获取共享锁的线程可以尝试获取共享锁(非公平竞争)。AQS的阻塞队列通过它的静态内部类Node实现,Node类的源码如下所示。
/*AbstractQueuedSynchronizer$Node*/
static final class Node {
// 节点想要获取共享锁的标志
static final Node SHARED = new Node();
// 节点想要获取排他锁的标志
static final Node EXCLUSIVE = null;
/** waitStatus = 1,指示当前节点对应的线程已经被取消 */
static final int CANCELLED = 1;
// waitStatus = -1,指示后面的节点对应的线程需要释放
static final int SIGNAL = -1;
// waitStatus = -2,用于条件队列
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
// 指向当前节点的前一个节点
volatile Node prev;
// 指向当前节点的下一个节点
volatile Node next;
// 当前阻塞的线程
volatile Thread thread;
// 用于指示当前线程请求共享锁还是排他锁,EXCLUSIVE(null)表示请求排他锁,SHARED表示请求的是共享锁
Node nextWaiter;
// 返回当前节点的线程是否请求共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}
// 返回当前节点的前一个节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS核心方法源码解析
下面是AQS主要的核心参数
/*AbstractQueuedSynchronizer*/
// 指向阻塞队列的头节点
private transient volatile Node head;
// 指向阻塞队列的尾节点
private transient volatile Node tail;
// 最核心的状态值,可以被赋予不同的意义
private volatile int state;
// 用于初始化阻塞队列的方法
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 用于给阻塞队列加入新节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 用于唤醒传入节点对应的线程
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// 当前节点node对应的线程在尝试获取
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 用于挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
// 用于唤醒阻塞队列中的线程
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
// 用于判断节点在尝试加锁失败后是否需要挂起,当前节点位node,pred是node的前一个节点,pred对应的线程在不被取消的前提下(pred.waitStatus!=1),如果pred.waitStatus == 0,则将pred.waitStatus 设置位-1,并返回false,让node对应的线程在尝试获取一次锁。如果pred.waitStatus == -1,则表示node中的线程需要被挂起。返回true,node对应的线程被挂起。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 用于插如想要获取共享锁的线程对应的节点,并让该线程在尝试获取两次共享锁(前提是当前的节点的前一个节点是head节点),如果还是失败则挂起线程
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 用于插如想要获取排他锁的线程对应的节点,并让该线程在尝试获取两次排他锁(前提是当前的节点的前一个节点是head节点),如果还是失败则挂起线程
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
总结
说到底AQS只提供了各种操作阻塞队列的方法,至于什么时候阻塞线程,什么时候唤醒线程,则是通过定义状态值state的意义来控制。控制状态值又掌握在具体实现的类的手上。即通过实现tryAcquire()和tryRelease()来实现排他锁的加锁和解锁的逻辑,通过实现tryAcquireShared()和tryReleaseShared()来实现共享锁的加锁和解锁的具体逻辑。加共享锁失败后,可以调用doAcquireShared(int arg)方法创建共享类型的阻塞节点并加入阻塞队列,并在两次尝试获取共享锁失败后挂起当前线程。共排他锁失败后,可以调用doAcquireInterruptibly(int arg)方法创建排他类型的阻塞节点并加入阻塞队列,并在两次尝试获取排他锁失败后挂起当前线程。