AbstractQueuedSynchronizer 之 源码 逐行解析:
acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这段代码是 ReentrantLock 类中的 acquire(int arg) 方法的实现。
以下是对这段代码的逐行解析:
-
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)): 首先调用tryAcquire(arg)方法尝试非阻塞地获取同步状态,如果返回false,表示无法获取同步状态,则执行以下操作:-
addWaiter(Node.EXCLUSIVE): 创建一个新的Node节点,并将其作为独占模式的等待者加入到同步队列中。 -
acquireQueued(...): 将当前线程加入到同步队列中,并以阻塞的方式等待获取同步状态。 - 如果成功获取同步状态,则继续执行下一步操作;否则,当前线程会被阻塞在同步队列中。
-
selfInterrupt(): 在成功获取同步状态后,调用selfInterrupt()方法来检查当前线程是否被中断,并在需要时进行自我中断。
这段代码实现了 acquire(int arg) 方法,用于获取同步状态。它首先尝试非阻塞地获取同步状态,如果失败则将当前线程加入到同步队列中,并以阻塞的方式等待获取同步状态。在成功获取同步状态后,会进行自我中断的检查。
nonfairTryAcquire(int acquires)
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
nonfairTryAcquire(int acquires) 是 ReentrantLock 类中的一个方法,用于非公平地尝试获取同步状态。
以下是对 nonfairTryAcquire(int acquires) 方法的逐行解析:
final Thread current = Thread.currentThread();: 获取当前线程。int c = getState();: 获取当前同步状态的计数。-
if (c == 0) { ... }: 如果当前同步状态的计数为 0,表示锁当前是未被持有的状态,执行以下操作:-
if (compareAndSetState(0, acquires)) { ... }: 使用原子操作将同步状态从 0 更新为acquires,如果更新成功,则执行以下操作:-
setExclusiveOwnerThread(current);: 将当前线程设置为独占锁的拥有者线程。 - 返回
true,表示成功获取同步状态。
-
-
-
else if (current == getExclusiveOwnerThread()) { ... }: 如果当前线程是独占锁的拥有者线程,执行以下操作:-
int nextc = c + acquires;: 计算新的同步状态计数。 -
if (nextc < 0) throw new Error("Maximum lock count exceeded");: 如果新的同步状态计数溢出了,抛出错误。 -
setState(nextc);: 更新同步状态为新的计数。 - 返回
true,表示成功获取同步状态。
-
返回
false,表示无法获取同步状态。
这个方法的作用是尝试获取同步状态。首先,它检查当前同步状态的计数。如果计数为 0,表示锁当前是未被持有的状态,那么它会通过原子操作将同步状态更新为 acquires,并将当前线程设置为独占锁的拥有者线程,然后返回 true 表示成功获取同步状态。
如果当前同步状态的计数不为 0,那么它会检查当前线程是否已经是独占锁的拥有者线程。如果是,它会增加同步状态计数,并更新同步状态,然后返回 true 表示成功获取同步状态。
如果无法获取同步状态,即当前同步状态的计数不为 0,且当前线程不是独占锁的拥有者线程,那么它会返回 false 表示无法获取同步状态。
需要注意的是,这是在非公平模式下尝试获取同步状态的方法。非公平模式允许新到来的线程插队获取锁,而不关心等待队列中是否有其他等待线程。
addWaiter()
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter() 是 acquireQueued() 方法中的另一个辅助方法。它用于在等待队列的尾部添加一个新的节点,该节点表示正在等待获取同步状态的线程。
让我们逐行解析这段代码的逻辑:
Node node = new Node(Thread.currentThread(), mode);: 创建一个新的节点,使用当前线程和指定的模式(共享或独占)作为参数。Node pred = tail;: 获取等待队列的尾节点。-
if (pred != null) { ... }: 如果尾节点不为null,表示等待队列不为空,执行以下操作:-
node.prev = pred;: 将新节点的前驱节点设置为尾节点pred。 -
if (compareAndSetTail(pred, node)) { ... }: 使用原子操作将尾节点更新为新节点node,如果更新成功,则执行以下操作:-
pred.next = node;: 将尾节点的后继节点设置为新节点。 - 返回新节点。
-
-
enq(node);: 如果快速路径失败(尾节点为null或者更新尾节点失败),则调用enq(node)进入慢路径。返回新节点。
总体来说,addWaiter() 方法用于在等待队列的尾部添加一个新的节点,表示正在等待获取同步状态的线程。它首先尝试使用快速路径将新节点直接添加到等待队列的尾部,如果快速路径失败,则调用 enq(node) 方法进入慢路径。无论是快速路径还是慢路径,最终都会返回新节点。
enq(node)
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(node) 是 addWaiter() 方法中的慢路径逻辑,用于将新节点添加到等待队列的尾部。
让我们逐行解析这段代码的逻辑:
Node t = tail;: 获取等待队列的尾节点。-
if (t == null) { ... }: 如果尾节点为null,表示等待队列为空,执行以下操作:-
if (compareAndSetHead(new Node())) { ... }: 使用原子操作创建一个虚拟的头节点,并将其设置为头节点,如果设置成功,则执行以下操作:-
tail = head;: 将尾节点设置为头节点。
-
- 继续下一次循环。
-
-
else { ... }: 如果尾节点不为null,执行以下操作:-
node.prev = t;: 将新节点的前驱节点设置为尾节点t。 -
if (compareAndSetTail(t, node)) { ... }: 使用原子操作将尾节点更新为新节点node,如果更新成功,则执行以下操作:-
t.next = node;: 将尾节点的后继节点设置为新节点。 - 返回尾节点
t。
-
-
继续下一次循环。
总体来说,enq(node) 方法用于将新节点添加到等待队列的尾部。它通过循环尝试将新节点添加到尾部,如果等待队列为空,则先创建一个虚拟的头节点,并将尾节点设置为头节点。如果尾节点不为空,则将新节点的前驱节点设置为尾节点,并使用原子操作将尾节点更新为新节点。无论是创建虚拟头节点还是更新尾节点,都使用原子操作来保证线程安全性。最终,返回尾节点。
acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
这是 AbstractQueuedSynchronizer 中的 acquireQueued(final Node node, int arg) 方法的源代码。让我们逐行解析这段代码的逻辑:
boolean failed = true;: 初始化一个标志位,表示是否获取同步状态失败。try { ... } finally { ... }: 使用 try-finally 块来确保在方法执行结束后进行必要的清理操作。boolean interrupted = false;: 初始化一个标志位,表示是否在等待过程中被中断过。for (;;) { ... }: 这是一个无限循环,用于尝试获取同步状态。final Node p = node.predecessor();: 获取当前节点的前驱节点。if (p == head && tryAcquire(arg)) { ... }: 如果当前节点的前驱节点是头节点,并且可以成功获取同步状态(通过调用tryAcquire(arg)方法),则将当前节点设置为头节点,并断开前驱节点的后继引用,以便帮助垃圾回收。然后将失败标志位设置为 false,并返回中断标志位。if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()): 如果无法获取同步状态,根据一定的条件判断是否需要进行线程阻塞(通过调用shouldParkAfterFailedAcquire(p, node)方法)。如果需要阻塞,并且在阻塞过程中发生了中断,则将中断标志位设置为 true。if (failed) cancelAcquire(node);: 如果获取同步状态失败(即循环结束),则调用cancelAcquire(node)方法取消获取同步状态的操作。
总体来说,这段代码的逻辑是,在一个无限循环中,首先尝试获取同步状态。如果成功获取同步状态,则将当前节点设置为头节点,并返回中断标志位。如果无法获取同步状态,则根据一定的条件判断是否需要进行线程阻塞。如果需要阻塞,并且在阻塞过程中发生了中断,则将中断标志位设置为 true。在循环结束后,如果获取同步状态失败,则进行清理操作。该方法用于在等待队列中获取同步状态,并进行相应的处理。
shouldParkAfterFailedAcquire(p, node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire(p, node) 是 acquireQueued() 方法中的一个辅助方法。它用于判断在获取同步状态失败后,当前线程是否应该被阻塞。
让我们逐行解析这段代码的逻辑:
int ws = pred.waitStatus;: 获取前驱节点的等待状态。if (ws == Node.SIGNAL) { ... }: 如果前驱节点的等待状态是SIGNAL,表示前驱节点已经设置了等待状态,要求释放同步状态时发出信号,因此当前线程可以安全地进行阻塞。if (ws > 0) { ... }: 如果前驱节点的等待状态大于 0,表示前驱节点已被取消。在这种情况下,需要跳过被取消的前驱节点,并将当前节点的前驱节点设置为前驱节点的前驱节点,以便重试获取同步状态。然后将前驱节点的后继节点设置为当前节点。else { ... }: 如果前驱节点的等待状态为 0 或PROPAGATE,表示需要等待信号。但是,在此阶段不会立即进行阻塞。调用者需要重试以确保在阻塞之前不能获取同步状态。在这种情况下,将前驱节点的等待状态原子地设置为SIGNAL。return false;: 返回 false,表示当前线程不应该被阻塞。
总体来说,shouldParkAfterFailedAcquire() 方法根据前驱节点的等待状态来判断当前线程是否应该被阻塞。如果前驱节点的等待状态是 SIGNAL,表示前驱节点已经设置了等待状态,要求释放同步状态时发出信号,当前线程可以安全地进行阻塞。如果前驱节点的等待状态大于 0,表示前驱节点已被取消,需要跳过被取消的前驱节点,并将当前节点的前驱节点设置为前驱节点的前驱节点,以便重试获取同步状态。如果前驱节点的等待状态为 0 或 PROPAGATE,表示需要等待信号,但是在此阶段不会立即进行阻塞,需要重试以确保在阻塞之前不能获取同步状态。方法返回 false,表示当前线程不应该被阻塞。
parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
parkAndCheckInterrupt() 是 acquireQueued() 方法中的另一个辅助方法。它用于将当前线程进行阻塞,并在阻塞过程中检查是否发生了中断。
让我们解析这段代码的逻辑:
LockSupport.park(this);: 调用LockSupport类的park()方法,将当前线程进行阻塞。park()方法会阻塞当前线程,直到被其他线程调用unpark()方法或被中断。return Thread.interrupted();: 在线程被唤醒后,调用Thread.interrupted()方法来检查当前线程是否被中断。Thread.interrupted()方法会检查当前线程的中断状态,并清除中断状态。如果当前线程在阻塞过程中被中断,那么调用Thread.interrupted()方法会返回true,否则返回false。
总体来说,parkAndCheckInterrupt() 方法通过调用 LockSupport.park() 将当前线程进行阻塞,直到被其他线程调用 unpark() 方法或被中断。在线程被唤醒后,它调用 Thread.interrupted() 方法来检查当前线程是否被中断,并返回相应的结果。这个方法用于在阻塞过程中检查中断状态。
cancelAcquire(Node node)
cancelAcquire(Node node) 是 acquireQueued() 方法中的另一个辅助方法。它用于取消正在等待获取同步状态的节点。
这是 cancelAcquire() 方法的源代码:
private void cancelAcquire(Node node) {
// If node is cancelled already, do nothing
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// Find the valid predecessor to unlink from
Node predNext = pred.next;
// Mark the node as cancelled
node.waitStatus = Node.CANCELLED;
// If the node is the tail, update the tail to the valid predecessor
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If the node is not the tail, unlink it from the list
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
让我们逐行解析这段代码的逻辑:
if (node == null) { ... }: 首先检查传入的节点是否为null,如果是null,则直接返回,不执行取消操作。node.thread = null;: 将节点的线程引用置为null,表示该节点不再持有线程。Node pred = node.prev;: 获取节点的前驱节点。while (pred.waitStatus > 0) { ... }: 跳过已被取消的前驱节点,将节点的前驱节点设置为前驱节点的前驱节点。Node predNext = pred.next;: 获取前驱节点的后继节点。node.waitStatus = Node.CANCELLED;: 将节点的等待状态设置为CANCELLED,表示节点已被取消。-
if (node == tail && compareAndSetTail(node, pred)) { ... }: 如果节点是尾节点,并且成功将尾节点更新为前驱节点pred,则执行以下操作:-
compareAndSetNext(pred, predNext, null);: 将前驱节点pred的后继节点设置为null,断开与当前节点的连接。
-
-
else { ... }: 如果节点不是尾节点,则执行以下操作:- 检查前驱节点是否是头节点,并且前驱节点的等待状态是
SIGNAL或者非正数(表示需要信号),并且前驱节点持有线程不为null。- 如果满足条件,将当前节点的后继节点
node.next设置为前驱节点的后继节点predNext。
- 如果满足条件,将当前节点的后继节点
- 否则,调用
unparkSuccessor(node)方法唤醒当前节点的后继节点。
- 检查前驱节点是否是头节点,并且前驱节点的等待状态是
node.next = node;: 将节点的后继节点设置为节点自身,帮助垃圾回收。
总体来说,cancelAcquire() 方法用于取消正在等待获取同步状态的节点。它将节点的线程引用置为 null,标记节点为取消状态,并根据节点的位置和前驱节点的状态进行相应的操作。如果节点是尾节点,则将尾节点更新为前驱节点,并断开与当前节点的连接。如果节点不是尾节点,则检查前驱节点的状态,并根据条件进行相应的操作,包括设置当前节点的后继节点或唤醒当前节点的后继节点。
release(int arg)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
这段代码是一个ReentrantLock类中的release方法。让我们逐行解析它的功能:
tryRelease(arg):调用tryRelease方法,尝试释放锁并返回一个布尔值,指示是否成功释放锁。if (tryRelease(arg)):如果成功释放锁,则执行下面的代码块。Node h = head;:将头节点赋值给变量h,头节点是等待队列中的第一个节点。if (h != null && h.waitStatus != 0):检查头节点是否存在且其等待状态不为0。等待状态是用于控制线程等待和唤醒的标志。unparkSuccessor(h):调用unparkSuccessor方法,唤醒后继节点,使其能够竞争获取锁。return true;:返回true,表示成功释放锁。return false;:如果未成功释放锁,则返回false。
总体而言,这段代码的作用是释放锁并唤醒等待队列中的后继节点,以便它们可以竞争获取锁。
unparkSuccessor(Node node)
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
这段代码是ReentrantLock类中的unparkSuccessor方法。让我们逐行解析它的功能:
int ws = node.waitStatus;:获取节点的等待状态。if (ws < 0):如果等待状态小于0,则表示可能需要唤醒后继节点。compareAndSetWaitStatus(node, ws, 0);:尝试将节点的等待状态从负数改为0,以清除可能需要唤醒的标志。即使这个操作失败或者等待状态被等待线程修改,也没有问题。Node s = node.next;:将后继节点赋值给变量s,通常情况下,后继节点就是当前节点的下一个节点。if (s == null || s.waitStatus > 0):检查后继节点是否为null或者其等待状态大于0。如果是,则需要从尾节点向前遍历,找到实际未取消的后继节点。s = null;:将s置为null,准备进行后续的遍历查找。for (Node t = tail; t != null && t != node; t = t.prev):从尾节点开始向前遍历,直到找到当前节点为止。if (t.waitStatus <= 0):如果遍历到的节点的等待状态小于等于0,则将其赋值给s。if (s != null):如果找到了实际未取消的后继节点。LockSupport.unpark(s.thread);:使用LockSupport.unpark方法唤醒后继节点的线程。
总体而言,这段代码的作用是唤醒后继节点的线程。它首先尝试清除当前节点的等待状态,然后根据当前节点的后继节点是否为null或者其等待状态大于0来确定实际未取消的后继节点,并最终使用LockSupport.unpark方法唤醒后继节点的线程。