更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》
AbstractQueuedSynchronizer (AQS)
AQS 的功能可以分为两类,独占功能 和 共享功能。
它的所有子类中:
- 要么实现并使用了它独占功能的 API,例如
ReentrantLock
RenntrantReadWriteLock
- 要么使用了共享锁的功能,例如
CountDownLatch
- 而不会同时使用两套 API,即便是它最有名的子类 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的
AQS 的基本思想
AQS 使用标志位 + 队列的方式,记录获取锁、竞争锁、释放锁等一系列锁的状态。
-
获取锁:判断当前状态
waitStatus
是否允许获取锁- 如果是就获取锁,修改当前状态,并且如果进了队列就从队列中移除
- 否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败
- 另外如果是阻塞线程,那么线程就需要进入阻塞队列
- 释放锁:修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。
要支持上面两个操作就必须有下面的条件:
-
原子性操作同步器的状态位:
volatile int waitStatus;
- 它是一个 volatile 变量,确保了可见性
- 使用 CAS 操作来更新 waitStatus,确保了原子性
- waitStatus 可能的状态包括:
- CANCELLED = 1: 节点操作因为超时或者对应的线程被 interrupt。节点不应该留在此状态,一旦达到此状态将从CHL队列中踢出。
-
SIGNAL = -1: 节点的继任节点是(或者将要成为)BLOCKED状态(例如通过 LockSupport.park() 操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。 - CONDITION = -2:表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
- 0: 正常状态,新生的非CONDITION节点都是此状态。
- 非负值标识节点不需要被通知(唤醒)。
阻塞和唤醒线程:在 JDK 5.0 以后利用 JNI 在 LockSupport 类中实现了此特性。
LockSupport.park()
LockSupport.park(Object)
LockSupport.parkNanos(Object, long)
LockSupport.parkNanos(long)
LockSupport.parkUntil(Object, long)
LockSupport.parkUntil(long)
LockSupport.unpark(Thread)
上面的API中 park() 是在当前线程中调用,导致线程阻塞,带参数的 Object 是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。-
一个有序的队列:采用 CHL 列表来解决有序的 FIFO 队列的问题
- 对于入队列(enqueue):采用 CAS 操作,每次比较尾结点是否一致,然后插入的到尾结点中。
- 对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。
AQS 核心字段
-
private volatile int state;
:描述的有多少个线程取得了锁,对于互斥锁来说state<=1。 -
private transient volatile Node head;
:等待队列的头 -
private transient volatile Node tail;
:等待队列的尾,head 与 tail 构成了一个 FIFO 队列
Node 节点的属性包括:
-
volatile Node prev;
:此节点的前一个节点。节点的 waitStatus 依赖于前一个节点的状态。 -
volatile Node next;
:此节点的后一个节点。后一个节点是否被唤醒依赖于当前节点是否被释放。 -
volatile Thread thread;
:节点绑定的线程。 -
volatile int waitStatus;
:描述节点的状态 -
Node nextWaiter;
:下一个等待条件(Condition)的节点,由于 Condition 是独占模式,因此这里有一个简单的队列来描述 Condition 上的线程节点。
AQS 独占锁
有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。
例如 ReentrantLock
RenntrantReadWriteLock
就是独占锁。
比如 ReentrantLock
中锁的实现 Sync
继承了 AbstractQueuedSynchronizer
,同时包括了公平锁 FairSync
和非公平锁NonfairSync
:
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
/**
* Base of synchronization control for this lock. Subclassed
* into fair and nonfair versions below. Uses AQS state to
* represent the number of holds on the lock.
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
......
static final class NonfairSync extends Sync {
......
static final class FairSync extends Sync {
......
AQS 共享锁
例如 CountDownLatch
就是共享锁。
在 Java CyclicBarrier VS CountDownLatch 中关于 CountDownLatch
的示例中,使用了 private static CountDownLatch countDownLatch = new CountDownLatch(5);
就是表示该锁可能被 5 个线程共享。
和 ReentrantLock
类似,CountDownLatch
内部也有一个叫做 Sync
的内部类,同样也是用它继承了AbstractQueuedSynchronizer
。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
- 其中的构造方法
setState(count)
就是设置private volatile int state;
,描述的有多少个线程取得了锁` - 其中的
tryAcquireShared()
方法判断了state
是否为 0,即计数器是否为 0。
引用:
深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)
深度解析Java 8:AbstractQueuedSynchronizer的实现分析(下)
深入浅出 Java Concurrency (7): 锁机制 part 2 AQS