AbstractQueuedSynchronizer
ReentrantLock主要内部通过Sync来完成锁的实现,Sync内部是通过AQS同步器来完成这一工作的,下面来看看AQS这个抽象类。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic {@code int} value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released.
基于FIFO等待队列为锁和相关的同步器提供了一个基础框架。该类用一个int值代表了锁状态。子类必须定义方法对这个int值做操作,规定int值代表锁的获取或释放意义。
This class supports either or both a default exclusive mode and a shared mode. When acquired in exclusive mode, attempted acquires by other threads cannot succeed. Shared mode acquires by multiple threads may (but need not) succeed. This class does not "understand" these differences except in the mechanical sense that when a shared mode acquire succeeds, the next waiting thread (if one exists) must also determine whether it can acquire as well. Threads waiting in the different modes share the same FIFO queue. Usually, implementation subclasses support only one of these modes, but both can come into play for example in a {@link ReadWriteLock}. Subclasses that support only exclusive or only shared modes need not define the methods supporting the unused mode
该类支持默认的独占模式或共享模式。当独占模式有线程获取到锁,其他锁无法再成功获取锁。共享模式可能由多个线程获取锁,下一个等待线程必须判断自己是否也能获取锁。不同模式的等待线程使用的是同一个FIFO队列。通常,子类实现支持其中一种模式,也可以同时使用例如ReadWriteLock。子类支持独占模式或共享模式则不需要再去定义不使用的方法。
This class defines a nested {@link ConditionObject} class that can be used as a {@link Condition} implementation。
该类定义了一个内嵌的ConditionObjec用作Condition的实现类,条件变量另外文章会介绍。
- java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
Wait queue node class.The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node. A "status" field in each node keeps track of whether a thread should block. A node is signalled when its predecessor releases. Each node of the queue otherwise serves as a specific-notification-style monitor holding a single waiting thread. The status field does NOT control whether threads are granted locks etc though. A thread may try to acquire if it is first in the queue. But being first does not guarantee success; it only gives the right to contend. So the currently released contender thread may need to rewait.
等待队列的节点类。该等待队列是CLH锁队列的一个变体。CLH锁通常用于自旋锁。这里我们用在阻塞同步器中,相同的策略用来保存一些关于前辈节点与线程的控制信息。“status”字段用来跟踪每个节点的线程是否应该阻塞。当前一个节点的前辈节点释放锁,该节点会signalled。队列中的每个节点也是一个特定的持有等待线程的通知监视器。虽然"status"字段不能控制线程是否能获取到锁。一个线程回去尝试竞争锁如果它在队列头部,但并不一定保证成功,也可能会失败需要重新等待。
总的来说,同步器的实现基于一个FIFO队列,通过一个int值表示状态,子类通过继承实现用类似acquire和release方式来操作这个状态。而多线程环境中对状态操作需要保证原子性,主要使用同步器提供的以下三个方法来操作:
/**
* The synchronization state.
*/
private volatile int state;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
Node是AQSFIFO同步队列构成单元,AQS中的同步队列包括head,tail节点和state状态。线程竞争获取锁时,失败了会构成Node加入队列tail,而锁的释放都是从head开始的。对于同步器维护的状态state,多个线程对其的获取将会产生一个链式的结构。下面来看一Node结构:
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
首先是waitStatus这个字段:
- SIGNAL: 后续节点将要blocked,所以当前该节点在releases或者cancels时必须upark后续节点。为了避免竞争,acquire方法必须首先确保处于signal状态,然后尝试获取锁,失败则进入block。
- 该节点因为超时或被打断而取消。
- 该节点正在一个Condition队列中。
- 一个共享释放状态,应该继续向后传播。该状态只会出现在队列head,在doReleaseShared时保证状态传播继续,即使有其他操作介入干扰。
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev; //前驱节点
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next; //后继节点
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread; //入队时的持有线程
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter; //连接条件变量等待队列的下一个节点。
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
以上是Node的结构和api。接下来看看AQS的api。
- AbstractQueuedSynchronizer #tryAcquire(int arg) : boolean
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
独占模式尝试获取锁时使用。尝试获取前营先查询是否允许操作state,若允许则尝试获取锁,成功返回true,若独占模式不支持获取则这里抛出UnsupportedOperationException。
- AbstractQueuedSynchronizer# tryRelease : boolean
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
用作独占模式中释放锁时对state状态的操作。只有当完全释放,state代表锁的释放时返回true(例如state==0),此时其他线程会尝试竞争锁。
- AbstractQueuedSynchronizer# tryAcquireShared(int arg) : int
/**
* Attempts to acquire in shared mode. This method should query if
* the state of the object permits it to be acquired in the shared
* mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread.
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return a negative value on failure; zero if acquisition in shared
* mode succeeded but no subsequent shared-mode acquire can
* succeed; and a positive value if acquisition in shared
* mode succeeded and subsequent shared-mode acquires might
* also succeed, in which case a subsequent waiting thread
* must check availability. (Support for three different
* return values enables this method to be used in contexts
* where acquires only sometimes act exclusively.) Upon
* success, this object has been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
共享模式里尝试获取锁。同样在获取锁之前先查询当前状态state是否允许获取锁,如果获取失败,返回负值,线程进入队列等待;成功获取锁且后面没有后续线程等待返回0;成功获取且有后续线程也获取成功则返回一个正整数。
- AbstractQueuedSynchronizer#tryReleaseShared : boolean
/**
* Attempts to set the state to reflect a release in shared mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this release of shared mode may permit a
* waiting acquire (shared or exclusive) to succeed; and
* {@code false} otherwise
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if shared mode is not supported
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
共享莫尝试设置state值以反映一次释放锁操作。如果这次释放后允许其他等待线程获取锁,则返回true。其他情况则返回false。
- AbstractQueuedSynchronizer# isHeldExclusively() : boolean
/**
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
检查排他模式下同步状态是否被占用。
- AbstractQueuedSynchronizer# acquire(int arg) : void
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
独占模式下获取锁,忽略interrupt。
- 尝试获取(!tryAcquire(arg))
- 获取失败则加入等待队列 (acquireQueued(addWaiter(Node.EXCLUSIVE), arg))),首先调用addWaiter将线程封装成Node插入队尾:
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
如果成功加入队尾则返回node节点,不成功则enq(node)自旋插入队尾,直到成功返回node节点。然后接下来进行acquireQueued。
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //首先获取前驱节点
if (p == head && tryAcquire(arg)) {//如果前驱节点是头节点 且 再次尝试获取锁成功,则当前线程已获取锁
setHead(node);//将当前节点设置为头节点
p.next = null; // help GC 前驱节点出列
failed = false;
return interrupted; //方法返回是否被打断过
}
if (shouldParkAfterFailedAcquire(p, node) && //检查前驱节点状态,是否需要挂起当前节点
parkAndCheckInterrupt())//挂起当前节点并打断
interrupted = true;//设置当前线程为打断状态
}
} finally {
if (failed)
cancelAcquire(node);//若一直未能获取锁异常中断则取消当前线程
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- 重新获取锁或异常挂起后检查是否中断过,调用selfInterrupt()。
- AbstractQueuedSynchronizer# release(int arg) : boolean
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
独占模式释放操作,成功取消一个或多个线程返回true。
- 当前节点线程尝试释放锁。
- 若释放成功则调用uparkSuccessor唤醒后继节点线程。
- AbstractQueuedSynchronizer# acquireShared() : void
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
共享模式下的获取锁操作,忽略interrupts。
- 获取成功立即返回。
- 获取失败当前线程组装成SHARED.NODE加入队尾。
- 自旋检查前节点是否是头节点,若是头节点尝试释放,释放失败说明当前锁仍被占用,继续自旋,直到获取锁则设置自身为头节点并waitStatus设置为可传播状态,并检查是否需要打断,然后返回结束昂发。
- 若中途异常且未获得锁则取消节点。
- AbstractQueuedSynchronizer# releaseShared(int arg) : boolean
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
/**
* Release action for shared mode -- signals successor and ensures
* propagation. (Note: For exclusive mode, release just amounts
* to calling unparkSuccessor of head if it needs signal.)
*/
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
共享模式的释放操作,unlock时使用,如果成功释放锁且唤醒线程则返回true。
- 尝试释放锁。
- 有线程成功唤醒后,继续循环尝试释放后继节点。
- 若后继节点状态SIGNAL表明后继节点的后继节点需要唤醒,尝试唤醒。
- 若后继节点状态0则尝试设置为PROPAGATE。
- 直到头节点改变结束循环