title: 再看 AQS
date: 2021/06/09 09:29
ReentrantLock$NonfairSync
知识准备
AQS:AQS的全称是AbstractQueuedSynchronizer
,这个类也是在java.util.concurrent.locks
下面,提供了一个FIFO的队列,可以用于构建锁的基础框架,内部通过原子变量state
来表示锁的状态,当state
大于0的时候表示锁被占用,如果state等于0时表示没有占用锁,ReentrantLock
是一个重入锁,表现在state
上,如果持有锁的线程重复获取锁时,它会将state
状态进行递增,也就是获得一个信号量,当释放锁时,同时也是释放了信号量,信号量跟随减少,如果上一个线程还没有完成任务,则会进行入队等待操作。
AQS 主要字段:
/**
* 头节点指针,通过setHead进行修改
*/
private transient volatile Node head;
/**
* 队列的尾指针
*/
private transient volatile Node tail;
/**
* 同步器状态
*/
private volatile int state;
AQS需要子类实现的方法
AQS是提供了并发的框架,它内部提供一种机制,它是基于模板方法的实现,整个类中没有任何一个abstract的抽象方法,取而代之的是,需要子类去实现的那些方法通过一个方法体抛出UnsupportedOperationException异常来让子类知道,告知如果没有实现模板的方法,则直接抛出异常。
方法名 | 方法描述 |
---|---|
tryAcquire | 以独占模式尝试获取锁,独占模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryRelease | 尝试独占模式下释放状态 |
tryAcquireShared | 尝试在共享模式获得锁,共享模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒 |
tryReleaseShared | 尝试共享模式下释放状态 |
isHeldExclusively | 是否是独占模式,表示是否被当前线程占用 |
AQS是基于FIFO队列实现的,那么队列的Node节点又是存放的什么呢?
Node结点:作为获取锁失败线程的包装类, 组合了Thread引用, 实现为FIFO双向队列。 下图为Node结点的属性描述
字段名 | 类型 | 默认值 | 描述 |
---|---|---|---|
SHARED | Node | new Node() | 一个标识,指示节点使用共享模式等待 |
EXCLUSIVE | Nodel | Null | 一个标识,指示节点使用独占模式等待 |
CANCELLED |
int | 1 | 节点因超时或被中断而取消时设置状态为取消状态 |
SIGNAL |
int | -1 | 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面节点,当后面节点竞争时,会将前面节点更新为SIGNAL
|
CONDITION |
int | -2 | 标识当前节点已经处于等待中,通过条件进行等待的状态 |
PROPAGATE |
int | -3 | 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去 |
0 |
int | 不属于上面的任何一种状态 | |
waitStatus | int | 0 | 等待状态,默认初始化为0,表示正常同步等待, |
pre | Node | Null | 队列中上一个节点 |
next | Node | Null | 队列中下一个节点 |
thread | Thread | Null | 当前Node操作的线程 |
nextWaiter | Node | Null | 指向下一个处于阻塞的节点 |
通过上面的内容我们可以看到waitStatus其实是有5个状态的,虽然这里面0并不是什么字段,但是他是waitStatus状态的一种,表示不是任何一种类型的字段。
加锁过程
💡为什么是非公平锁?
与公平锁区别
💡为什么 head 节点是一个 Dummy(哑元,不关联线程)节点
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 尝试获取锁,如果获取失败会走这段逻辑
// 此处关注 addWaiter() 方法,第一个参数是新节点的模式,独占还是共享;
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
注:此时哑元对象的 waitStatus 值为 0。
💡Node 的 waitStatus 变化
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 尝试获取锁,如果获取失败会走这段逻辑
// 此处关注 acquireQueued() 方法
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
此时锁的状态:
💡非公平锁的两次尝试获取锁?
- lock() 时通过 cas 尝试获取锁(所有加锁的线程都会尝试)
- acquireQueued() 时,判断前序节点是否是 head,如果是的话,则尝试获取。(只有前序节点是 head 时才会尝试)
💡为什么 Node 中的字段都使用 volatile 来修饰?
因为要通过 cas 来修改他的值,cas 即比较并交换,如果比较的是 cpu 缓存中的值那毫无意义,所以必须要保证比较的这个变量要具有可见性(即,用的时候从主存拿,写完刷入主存)
解锁过程
// Sync 继承自 AQS
static final class NonfairSync extends Sync {
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 继承过来的方法, 方便阅读, 放在此处
public final boolean release(int arg) {
// 尝试释放锁, 进入 (一)
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
if (
// 队列不为 null
h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0
) {
unparkSuccessor(h);
}
return true;
}
return false;
}
// (一) Sync 继承过来的方法, 方便阅读, 放在此处
protected final boolean tryRelease(int releases) {
// state-- 针对锁重入
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// (二) AQS 继承过来的方法, 方便阅读, 放在此处
private void unparkSuccessor(Node node) {
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws < 0) {
compareAndSetWaitStatus(node, ws, 0);
}
// unpark AQS 中等待的线程, 进入 (二)
// 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
Node s = node.next;
// 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0) s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
}
此部分源码分为两部分:
- 释放锁,如果 state 释放后值为 0,则返回 true。
- 如果返回 true,表示锁已经被完全释放了,则将 head 节点的 ws 从 -1 改为 0,唤醒 head.next 线程,让他重新去争抢锁(下图红色框部分)
此时锁状态:
但是由于不公平性,锁也有可能会被其他线程抢占,如果被抢占的话则会再次将前序节点 ws 改为 -1,然后中断线程,变为下面这种状态:
ReentrantReadWriteLock$NonfairSync
知识准备
读写锁与 ReentrantReadWriteLock#Sync 的关系
ReentrantReadWriteLock#Sync 继承于AQS
,实现读写锁与实现普通互斥锁的主要区别在于需要分别记录读锁状态及写锁状态,并且等待队列中需要区别处理两种加锁操作。
Sync
使用state
变量同时记录读锁与写锁状态,将int
类型的state
变量分为高16位与第16位,高16位记录读锁状态,低16位记录写锁状态,如下图所示:
写锁加锁 & 释放过程
写锁(独占锁)这部分与 ReentrantLock 大同小异,简单看下 tryAcquire
方法:
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0) c!=0 w=0 代表有读锁 || 非重入 -> 返回 false;中断当前线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 当重入超过最大限制(65535 2^16)则报错
throw new Error("Maximum lock count exceeded");
// Reentrant acquire 重入
setState(c + acquires);
return true;
}
if (writerShouldBlock() || // 如果需要阻塞,公平锁的实现会检查队列中是否有其他线程,非公平锁直接返回 false 不需要阻塞
!compareAndSetState(c, c + acquires)) // 或者 cas 失败,则返回 false;之后就和 ReentrantLock 一样了,加入队列然后 for(;;) 中断
return false;
setExclusiveOwnerThread(current);
return true;
}
读锁加锁过程
AQS.acquireShared(arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock$Sync.tryAcquireShared(arg)
protected final int tryAcquireShared(int unused) {
/*1. 如果另一个线程持有写锁,则失败。 2. 否则,该线程有资格获得锁写入状态,因此询问它是否应该因为队列策略而阻塞。如果没有,请尝试通过 CASing 状态和更新计数来授予。请注意,步骤不检查可重入获取,它被推迟到完整版本以避免在更典型的非可重入情况下检查保持计数。 3. 如果第 2 步由于线程显然不符合条件或 CAS 失败或计数饱和而失败,则链接到具有完整重试循环的版本。
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current) // 如果有写锁,并且不是当前线程,则获取读锁失败,返回 -1 进入后续(排队)流程
return -1;
int r = sharedCount(c); // 获取写锁的 state 值
if (!readerShouldBlock() && // 当前获取读锁线程是否需要阻塞;公平锁会查看队列是否具有节点,非公平锁会查看队列中第一个等待元素是否是独占锁,为了避免饥饿
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 如果不需要阻塞 && 小于 65535 && cas 成功,下面的代码和读锁重入相关
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current); // 这个是完整的获取共享锁的代码,相较于这部分代码基本上只多了 for(;;) 重试 cas 操作
}
AQS.doAcquireShared(arg)
注:红框部分同样属于释放锁流程。
此时锁状态:
此时有线程获取写锁的状态:
💡如何体现锁降级?
💡readerShouldBlock() 所要解决的饥饿何时会发生?
写锁在队列里排队,但是一直有线程来获取读锁
💡读锁的重入是怎么做的呢?
if (!readerShouldBlock() && // 当前获取读锁线程是否需要阻塞;公平锁会查看队列是否具有节点,非公平锁会查看队列中第一个等待元素是否是独占锁,为了避免饥饿
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) { // 如果不需要阻塞 && 小于 65535 && cas 成功,下面的代码和读锁重入相关
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
firstReader
指向在无锁状态下第一个获取读锁的线程,firstReaderHoldCount
记录第一个获取读锁的线程持有当前锁的计数(主要用于重入)。
读锁释放锁过程
AQS.releaseShared(arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { // 如果读锁全都释放
doReleaseShared(); // 唤醒后续节点
return true;
}
return false;
}
ReentrantReadWriteLock$Sync.tryReleaseShared(arg)
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) { // 如果第一个 read 是当前线程,并且重入计数为 1,则将 firstReader 置为 null,否则重入计数--
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else { // 操作 HoldCounter 进行重入计数的修改
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove(); // 如果计数为 0,则从 ThreadLocal 移除
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) { // 自旋 + cas,修改 state 值
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0; // 如果 == 0 表示读锁完全释放
}
}
此时状态为:
AQS.doReleaseShared()
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // cas 修改前序节点的 ws,目的是避免多个获取读锁的线程来释放锁,导致重复 unpark
continue; // loop to recheck cases
unparkSuccessor(h); // 如果后续节点是 SHARED 的,则会修改 head 节点,如果他的后续节点也是 SHARED,则会再次调用当前方法
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 这个和信号量的一个 bug 相关 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020
continue; // loop on failed CAS
}
if (h == head) // loop if head changed,可能是因为上面 unpark 修改了 head 节点,不知道
break;
}
}
唤醒之后则会继续走下面红框的流程:
Semaphore
他是采用共享锁实现,节点类型为 SHARED
初始化时定义了 state 的值,调用 acquire() 方法对 state - 1,判断这个值是否小于 0,如果小于 0 则进入 FIFO 队列,否则 cas 修改 state 值(失败自旋)。
调用 release() 方法时对 state + 1,并唤醒后续节点
CountDownLatch
他是采用共享锁实现,节点类型为 SHARED
初始化时定义了 state 的值,await() 方法是尝试获取锁(只有 state 值为 0 才会获取成功),countdown()方法是释放锁,只有所有都释放完成才会返回 true 唤醒队列中的线程。
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
try {
System.out.println("t1 await");
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("t1");
}).start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
latch.countDown();
System.out.println("t2 countdown");
}).start();
System.out.println("main await");
latch.await();
System.out.println("main");
}
>>
t1 await
main await
t2 countdown
main
t1