特点
- 读读共享
- 读写互斥
- 写写互斥
结构
锁的状态表示
state
继承AQS的类都需要使用state变量代表锁的资源占用情况
高16位 表示读锁的上锁次数(由于读读共享,可能由多个线程累加),
低16位 表示写锁的重入次数, 由于写写互斥,读写互斥, 只能有一个线程重入累加而得
读锁 : ReadLock
lock() :加锁
/**
* Acquires the read lock.
*
* <p>Acquires the read lock if the write lock is not held by
* another thread and returns immediately.
*
* <p>If the write lock is held by another thread then
* the current thread becomes disabled for thread scheduling
* purposes and lies dormant until the read lock has been acquired.
*/
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
Sync.tryAcquireShared():
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
// 锁状态
int c = getState();
/**
* exclusiveCount(c)
*
* 写锁的重入的次数 state & 11111 1111 1111 1111
* static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
*
**/
// 如果此时有写锁,写锁重入次数应该 > 0 ,并且不是当前线程,由于读写互斥,写写互斥,不管当前方法是读写还是写锁,这种情况都会加锁失败
// 如果是当前线程持有的写锁则 把写锁降级为读锁
// 如果当前线程并没有写锁, 则直接 获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取读锁上锁次数
// state 右移 16位,把 用于 表示写锁的加锁次数 的低16位 给 抵消掉,剩下的就是 高16位
int r = sharedCount(c);
// 如果队列中没有线程在排队
// 这里会有公平非公平之分
// 公平锁 会判断 队列是否有线程在排队
// 非公平锁 只判断队头 是不是 写锁,是写锁 才会 等待
if (!readerShouldBlock() &&
r < MAX_COUNT &&
// cas
// 读锁上锁,直接 state + 1 0000 0000 0000 0000
// SHARED_UNIT 是 1 左移 16位
// 这样就可以对表示读锁上锁次数的高16位 + 1
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果上锁次数为0,当前锁没有线程持有其读锁
if (r == 0) {
// 第一次上读锁的线程设置为当前线程
firstReader = current;
firstReaderHoldCount = 1;
// 第一次上读锁的线程重入
} else if (firstReader == current) {
// 首次上读锁 线程重入次数 + 1
firstReaderHoldCount++;
} else {
// HoldCounter 主要有线程id和这个线程加锁次数count, 用于统计某个线程 加锁的次数
// 从缓存拿HoldCounter对象
HoldCounter rh = cachedHoldCounter;
// 如果缓存等于空 或者 缓存的线程id和当前线程id不相等
if (rh == null || rh.tid != getThreadId(current))
// 从线程的threadLocal里HoldCounter拿对象,如果没有,会初始化HoldCounter
// 里面的count = 0,并且赋值给cachedHoldCounter
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 对当前线程的HoldCounter的加锁次数进行 ++
rh.count++;
}
return 1;
}
// 感觉跟上面代码逻辑差不多 : cas 失败,再来一遍
return fullTryAcquireShared(current);
}
doAcquireShared :
自旋 ,自旋失败 入队,阻塞
private void doAcquireShared(int arg) {
// 创建Node节点,节点类型为SHARED,持有当前线程
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 1. 如果当前节点的前驱节点时头结点
// 2. 如果阻塞的线程被唤醒,由于是无限循环,会继续走到这里,头结点永远是空Node,这时会继续去拿锁。由于是读锁,可以接着上锁
if (p == head) {
// 1. 自旋一次
int r = tryAcquireShared(arg);
if (r >= 0) {
// 1. 自旋成功,设置新的Node为head,并唤醒后一个线程
// 2. 第二次循环进来会接着设置新的Node为head,并唤醒下一个等待读锁的线程
// 直到Node的状态不是Shared为止
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 1. 把前驱节点设置为Signal状态,阻塞自身节点
// 2. 如果线程被叫醒,会继续循环
// 3. 以此类推,一直唤醒等待读锁的线程,直到遇到不是SHARED状态的节点。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
读锁 Node 结构 :
{thread:线程id,nextWaiter = new Node() },没有waitStatus
setHeadAndPropagate
设置新的头节点,并释放后面的SHARED状态的节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 如果当前节点的后面一个节点是Shared状态
if (s == null || s.isShared())
// 释放后 一个节点
doReleaseShared();
}
}
doReleaseShared : 释放下一个线程(head.next)
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 叫醒下一个线程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
unparkSuccessor(h) :唤醒下一个线程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
// 找到最近一个非CANCELLED(1)状态的线程
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒它
LockSupport.unpark(s.thread);
}
总结
- 读锁加锁时,如果有写锁持有锁则上锁失败,进入队列排队, 如果是读锁则可以共享这把锁,不用排队,只是多了一个计数的步骤。
- 首次加锁失败,入队 后,如果自己是head, 会抢锁一次,抢锁成功,会唤醒队列后面所有SHARED节点
unlock() : 解锁
读锁状态 - 1,并判断读锁是否完全释放
public final boolean releaseShared(int arg) {
// 读锁状态 - 1。并判断是否完全释放
if (tryReleaseShared(arg)) {
// 如果读锁完全释放,那么唤醒下一个 等待写锁的线程 醒来抢写锁
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared(int unused)
state读锁状态-1,其他读锁上锁次数 -1
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 如果当前线程是第一个上读锁的线程
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
// 如果第一次上读锁的线程只上过一次读锁
if (firstReaderHoldCount == 1)
// 清空第一次上读锁的线程
firstReader = null;
else
// 第一次上读锁的线程上锁次数 --
firstReaderHoldCount--;
} else {
// 否则,从ThreadLocal中取出上读锁次数-1
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
// 高16读锁状态位 - 1 = (- 1 << 16 : 1 0000 0000 0000 0000)
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 返回 是不是 读锁完全释放。 读锁状态为 = 0
return nextc == 0;
}
}
doReleaseShared() : 释放下一个线程(head.next)
这个方法和读锁获取到锁之后,唤醒队列中下一个等待读锁的线程是一样的。如果下一个是SHARED状态的话,会调 doReleaseShared()。
而这里,如果读锁全部释放,就会唤醒 队列中 下一个等待写锁的 线程来获取写锁。
总结
读锁被唤醒时, 如果当前被持有锁是读锁,则自己也可以共享这把锁,并且唤醒队列中自己后面的一个状态SHARED的节点来抢锁。 以此类推,队列中最近的的所有等待读锁的线程都会被唤醒,并共享这把锁。
比如 W 表示写锁,R表示 读锁 队列中R1,R2,R3,R4,W1,R5 :
R1被唤醒, 抢到读锁时,会唤醒 R2,R3,R4 来共享锁,但是 R5不会,因为中间隔着一把写锁W1
写锁 : WriteLock
lock() :加锁
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
// 尝试加锁
if (!tryAcquire(arg) &&
// 尝试加锁失败,入队等待
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire(arg) :尝试加锁
这个方法和ReentrantLock实现的方法有一点点不一样 :
只要有读锁或者写锁(不是自己重入)就加锁失败
cas更改state失败表示加锁失败
如果是公平锁队列中有线程在排队,则加锁失败
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
// 获取到state
int c = getState();
// 获取写锁上锁次数
int w = exclusiveCount(c);
// 如果有读锁或者写锁
if (c != 0) {
// w==0表示没有写锁,但是c!=0,所以只有读锁
// 或者 : current != getExclusiveOwnerThread() 有写锁,但是别的线程上的,不是自己重入的情况
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 第一个判断 没return false的情况下,那么就是 没有读锁,并且 有写锁,但是是 当前线程重入
// 自己重入, 写状态 + 1
setState(c + acquires);
return true;
}
// 能走到这,就说明 c==0,因为 c!=0 在上面这个判断中 是肯定会返回的
// writerShouldBlock() 判断是否应该排队,公平锁返回的是 队列中是否有线程,非公平锁恒返回false
if (writerShouldBlock() ||
// cas失败
!compareAndSetState(c, c + acquires))
return false;
// 否则加锁成功
setExclusiveOwnerThread(current);
return true;
}
加锁失败,则入队睡眠
这个和 ReentrantLock是一样的
写锁 Node 结构 :和ReentrantLock的锁结构一样
总结
当(读锁 不存在 并且 没有写锁 ) 或者 写锁 只有 自己重入才能加锁成功
unlock():解锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 尝试解锁,返回是否释放锁的结果
if (tryRelease(arg)) {
Node h = head;
// 如果写锁已经完全释放,唤醒队列中的头节点.next
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(int releases)
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 低16位写锁状态位 减 1
int nextc = getState() - releases;
// 是否完全释放,低16位 == 0 : 重入锁需要解锁多次
boolean free = exclusiveCount(nextc) == 0;
if (free)
// 完全释放,设置为无锁状态
setExclusiveOwnerThread(null);
// 重新设置写锁状态位
setState(nextc);
return free;
}
总结
写锁解锁 减 低16位写状态就行,重入锁需多次解锁,直至完全释放,才会唤醒队列中等待的线程。
锁降级
在ReentrantReadWriteLock 是支持锁降级的。但不支持锁的升级。
锁降级
从写锁 降级到读锁 :
为什么可以 ?
因为是从写锁降级到读锁,写锁只能由一个线程持有,此时降级为读锁,其他线程上读锁或者写锁,因为此时写锁的状态还没释放。所以自始至终只可能存在同一个线程在同时持有读锁和者写锁。
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
// 是否有写锁
if (exclusiveCount(c) != 0 &&
// 有写锁,但是 写锁持有线程就是当前线程,锁降级到读锁
getExclusiveOwnerThread() != current)
return -1;
// 下面就是上读锁 逻辑了
int r = sharedCount(c);
// ................ 省略部分代码
return fullTryAcquireShared(current);
}
为什么不支持锁升级
可能会造成死锁。
锁升级 : 是读锁升级到写锁
假设有 A,B 和 C 三个线程,它们都已持有读锁。假设线程 A 尝试从读锁升级到写锁。那么它必须等待 B 和 C 释放掉已经获取到的读锁。如果随着时间推移,B 和 C 逐渐释放了它们的读锁,此时线程 A 确实是可以成功升级并获取写锁。
但是我们考虑一种特殊情况。假设线程 A 和 B 都想升级到写锁,那么对于线程 A 而言,它需要等待其他所有线程,包括线程 B 在内释放读锁。而线程 B 也需要等待所有的线程,包括线程 A 释放读锁。这就是一种非常典型的死锁的情况。谁都愿不愿意率先释放掉自己手中的锁。