类结构
简介
跟ReentrantLock原理一样,都是可以重入,公平/非公平的互斥锁。内部虽然有一个读锁和写锁,但是用的都是同一个Sync。
在原理上跟ReentantLock不同的是,当所有线程去获取读锁的时候,是不会阻塞的。只有当线程持有写锁/读锁,另一个线程再去获取读锁/写锁的时候,才会阻塞。总之,读读不会阻塞,其余都会阻塞。(不包括两次获取锁都是同一线程的情况->锁升级)
锁升级是指当前线程持有读锁的时候,再去获取写锁,反之也是升级。但是需要注意,==只有持有写锁,再去获取读锁的时候,才能升级成功。== 持有读锁,再去获取写锁,必须要先释放读锁,才能去获取写锁。
还有跟ReentantLock不同的是,state不在是0可以获取锁,1获取不到锁。而是用int 的state位标识当前锁的状态。state的高16位代表读锁的状态,0代表可以获取读锁,1代表不能获取读锁。同理低16位没有1就是没有人获取写锁。
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
加锁流程
场景一:多线程获取读锁
代码
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
....
return 1;
}
return fullTryAcquireShared(current);
}
如果没有人获取写锁,这时候获取读锁非常的简单,就是state的高位+1。
场景一:多线程获取读锁写锁
加锁
- 一开始t1去获取锁,因为没有竞争,获取锁成功,setOwner为自己,state的低八位加一。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
...
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
- 然后t2又来获取读锁。
- 先是获取锁,发现写锁已经被占了,且不是当前线程,获取锁失败了。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
- 获取锁失败后,去调用doAcquireShared(1)方法。这个跟之前ReenrantLock的阻塞然后排队方法一样,都是创建一个空的Node作为head,然后把当前阻塞的节点跟在head后面。且设置head的waitStatus为-1.然后调用LockSuport.park停住线程。唯一不同的是,t2获取的是读锁,他的节点类型是Node.SHARED。
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
- 同理t3,t4都以此挂在了head后面,根据获取锁的不同分为Node.SHARED和Node.EXCLUSIVE。到此加锁的流程结束了。
解锁
- 第一步是t1先释放锁,把state的低八位减一。这时候如果state的低八位为0代表没重入,释放成功,setOwner为null。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
- 第二步,检查head是否不为空,且head的waitstatus是否不等于零,如果是就去unpark其他排队的线程。
唤醒其他线程之前,我们先看下其他获取读锁的线程是在哪儿被park的
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//① 获取前驱节点
if (p == head) {
int r = tryAcquireShared(arg); //② 再次获取锁,state 的高八位+1
if (r >= 0) {
setHeadAndPropagate(node, r); ③ 设置自己节点为头节点,然后唤醒后继节点如果是shared的,结束。
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) //注意注意: t2,t3都是在这里被park的,unpark后,从这里唤醒,回到上面第一步。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以下是如何唤醒其他线程
- t2是在doAcquireShared(1)复活,这个时候又一次循环后,进入tryAcquireShared(1)方法获取锁,获取锁后,state的高八位+1。
- t2获取锁成功后,第一件事就是setHeadAndPropagate把自己设置成head,然后唤醒下一个如果是shared的节点。同理下个节点唤醒后第一件事就是把自己设置成head,然后唤醒下一个,直到下一个的下一个不是shared或者已经是tail了为止,程序中止。
==注意因为被唤醒的是读锁线程,那他是有责任唤醒他后面连续排队的获取读锁线程,即读锁的传播==
- t2获取锁成功后,第一件事就是setHeadAndPropagate把自己设置成head,然后唤醒下一个如果是shared的节点。同理下个节点唤醒后第一件事就是把自己设置成head,然后唤醒下一个,直到下一个的下一个不是shared或者已经是tail了为止,程序中止。
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}