1. 总体思路
维护一个volatile修饰的int(state)来表示状态(例如通过1表示当前锁已经被持有,0表示当前锁资源空闲)。在具体实现时,采用模板方法模式,子类只能通过几个有限的方法,如getState(),setState(),compareAndSetState()等来控制同步状态。
// 加锁:
while (!tryAcquire(arg)) {
// enqueue thread if it is not already queued;
// possibly block current thread;
}
// 解锁
if (tryRelease(arg))
// unblock the first queued thread;
线程enqueue的策略借鉴了CLH锁的思路,但是由于没有原子操作可以同时设置prev和next,因此队列中的节点,强指向prev,弱指向next,即从队列头部开始根据next向后遍历可能失败,此时只能从队列尾部根据prev向前遍历。
核心内部类包括Node和ConditionObject。
1.1 Node
Node是对Thread的一层封装,除了Thread类型的字段外,还包括等待状态waitStatus,指向上一个节点的prev,指向下一个节点的next。Node可以设置2种模式:EXCLUSIVE或者SHARED。
1.2 ConditionObject
2. 核心逻辑
2.1 acquire
先尝试加锁,如果失败,则将线程入队列。以上2个步骤是非阻塞的,通过while + CAS完成,入队列成功后,再从队列中取出node,该步骤是阻塞的,同时也可能会被中断。
对于已入队列的线程可以保证先进先出,但是存在某些线程在入队列前的tryAcquire就获得成功,因此不能保证避免锁饥饿。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
等效为以下代码
public final void acquire(int arg) {
// 尝试以独占模式获取资源,非阻塞(一般是对state变量的CAS操作)
// 如果成功,则直接返回
// 否则,将线程入队列(以独占模式)再从队列中取出
if (tryAcquire(arg)) {
return;
} else {
// 将新Node插入队列,非阻塞(只有for循环和原子操作)
// 借鉴了CLH锁的模式:Atomic Swap
Node node = addWaiter(Node.EXCLUSIVE);
// 从队列中获取之前已入队的Node,阻塞(有park和unpark)
if (acquireQueued(node, arg)) {
selfInterrupt();
}
}
}
2.2 tryAcquire
由子类自由实现,Doug Lea在AQS源码的注释里提供了一个实现的样例,基本思路是state变量为0代表资源闲置(未加锁),如果通过CAS设置为1成功,说明某个线程获得了资源的控制权(加锁成功)。代码如下:
public boolean tryAcquire(int acquires) {
assert acquires == 1; // Otherwise unused
if (compareAndSetState(0, 1)) {
// 设置该线程为锁的持有者
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
2.3 addWaiter
enq采用延迟初始化:只有首次当有线程互相抢占资源时(即taill为null),才需要初始化head和tail。换言之,如果没有线程互相抢占资源,则队列的头尾两个节点永远不会被初始化。fast path尝试对某个线程T0进行一次直接入队的操作,如果此时没有并发线程,则T0的compareAndSetTail操作肯定会成功,不需要进行后续的enq操作,如果此时还存在其他多个并发线程记为T1和T2,若T0成功,则必然T1和T2会失败,失败线程会进入到enq的循环中,互相竞争插入到队列尾端,直到成功。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
2.4 enq
入队列的操作跟CLH Lock完全相同:将Node的prev指向原先的tail节点,并且通过原子的CAS操作将新节点Node插入到队列尾端。另外在入队成功后,还会将原先tail节点的next指向新节点Node。
next指针的作用是方便通知successor,但是next节点为null时并不代表该节点已经在队列尾端了,Doug Lea曾提到过[1]:
But because there are no applicable techniques for lock-free atomic insertion of double-linked list nodes using compareAndSet, this link is not atomically set aspart of insertion; it is simply assigned, after the insertion.
The next link is treated only as an optimized path. If a node's successor does not appear to exist (or appears to be cancelled) via its next field, it is always possible to start at the tail of the list and traverse backwards using the pred field to accurately check if there really is one.
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
2.5 acquireQueued
使用了经典的await/notify编程范式:
while(condition) {
// 休眠
}
通过一个循环,并在循环中检查是否满足条件,当满足某些特定条件时线程休眠。因为可能存在“虚假唤醒”,所以必须使用while。
其中,parkAndCheckInterrupt是对休眠动作的封装,不仅包含了park动作,还在线程被唤醒时,校验了中断标识位。因为Thread.interrupted()方法会清空中断标识,因此调用方需要单独设置“interrupted”标志位。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
假设某个线程T1在入队之后调用该方法准备出队列,此时锁被T0持有(注意T0获得锁的时候并没有并发,因此在tryAcquire的时候就成功了,无需入队出队动作),此时AQS中各个字段的快照是这样的:
head = dummy node;tail = Node1(T1的封装);state = 1;
则 Node1的predecessor是head,但是tryAcquire一定会失败,则进入第二个if语句。此时dummy node的ws = 0(默认值),设置ws = Node.SIGNAL,则当前线程被park。此时for循环就阻塞住了,需要等待锁的持有者T0释放锁(release方法)来触发唤醒线程。
2.5.1 failed变量的作用
cancelAcquire表示放弃获得锁资源,一开始猜测可能应该是node.predecessor()抛出了异常(猜测某些操作会把node的prev置为null),但是并没有找到实际的证据。从代码来看,最有可能抛异常的地方在tryAcquire(arg),翻阅了一下concurrent包下AQS的各个子类,包括ReentrantLock和ReentrantReadWriteLock都有可能抛出异常。
另外比较有趣的是,最开始JDK源码中并没有failed这个变量,是某次bug之后打上的补丁,相比于
try { ... }
catch (RuntimeException e) { handle(); throw e; }
以下写法是更加健壮和清晰的编程范式:
boolean failed = true;
try { ... ; failed = false; }
finally { if (failed) handle(); }
更详细的可以参考Martin Buchholz提交的bug[2]。
2.5.2 interrupted变量的作用
false表示获得锁的过程中线程没有被中断,true表示获得锁的过程中线程被中断了。
2.6 shouldParkAfterFailedAcquire
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
当某个节点(或者说线程)尝试抢占资源失败时,需要判断是否阻塞该线程。将该节点的上一个节点(pred)的waitStatus字段记为ws。
如果ws为Node.SIGNAL,表示pred后一个节点(即该节点)需要被unpark。返回true,接下来线程就会被park。
-
如果ws大于0,会将node指向pred的pred,即跳过pred节点。如果新的pred节点的waitStatus仍然大于0,则继续跳过。如果连等号改成下面的写法,或许会更容易理解一点。
do { node.prev = pred.prev; // 将node的prev指向上一个节点的上一个节点 pred = node.prev; // 对pred重新赋值,否则下次循环时,node.prev无法再向前移动 } while (node.prev.waitStatus > 0) // 如果该节点的waitStatus还是大于0,继续循环
如果ws等于0,将ws标记成Node.SIGNAL
2.7 cancelAcquire
2.8 release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
把head的继任者(Node1)unpark唤醒。此时for循环继续,AQS中各个字段的快照是这样的:
head = dummy node;tail = Node1(T1的封装);state = 0;
则 Node1的predecessor是head,假设此时没有并发,则tryAcquire成功,把Node2设置为head节点,返回interrupted标识。
3. 以一个例子结尾
初始状态:
head = tail = null;state = 0;
T0,T1,T2并发调用acquire方法,假设获得锁的顺序是T0 -> T1 -> T2,则:
-
T0调用tryAcquire方法成功,此时:
head = tail = null;state = 1;
-
T1调用tryAcquire方法失败,先调用addWaiter方法成功,入队列,此时:
head = dummy node;tail = N1;N1.prev = dummy node;state = 1;
T1调用acquireQueued方法尝试取出N1(加锁)。
第一个if语句,N1的prev是head,但此时T0未释放锁,因此tryAcquire方法不会成功,进入第二个if语句。
N1的prev节点,即head节点,ws = 0,置为 -1,线程休眠
-
T2调用tryAcquire方法失败,后调用addWaiter方法成功,入队列,此时:
head = dummy node;tail = N2;N2.prev = N1;N1.prev = dummy node;state = 1;
T2调用acquireQueued方法尝试取出N2(加锁)。
第一个if语句,N1的prev不是head,进入第二个if语句。
N2的prev节点,即N1节点,ws = 0,置为 -1,线程休眠。
-
T0释放锁,此时:
head = dummy node;tail = N2;N2.prev = N1;N1.prev = dummy node;state = 0;
T1被唤醒,继续for循环中的内容。
-
第一个if语句,N1的prev是head,此时T0已释放锁,(假设此时没有新线程进入)因此tryAcquire方法成功,将N1设置为head,标识着T1线程拥有了锁。此时:
head = N1;tail = N2;N2.prev = N1;state = 1;
T1释放锁时,操作类似。