前言
ReentrantReadWriteLock可以说是最复杂的锁实现类,这篇文章带你弄懂ReentrantReadWriteLock实现读写锁的所有细节,在阅读本文之前,读者需要了解一些前置的知识点,比如AQS,公平锁以及非公平锁,CAS等等,了解了这些知识点后,可以更加轻松的理解ReentrantReadWriteLock,因为ReentrantReadWriteLock是在这些的基础上构建的。
ReentrantReadWriteLock总体逻辑
AQS中维护这一个阻塞线程队列,在该阻塞线程队列为空时,读线程可以同时获取读锁,在所有读线程释放读锁之前,如果有写线程想要获取写锁,则阻塞该线程,并加入阻塞队列中。后续的读写线程在第一个写线程之前的所有读线程释放锁之前都会通过尾插的方式加入阻塞队列。当第一个写线程之前的所有读线程释放锁后,才会陆续唤醒队列中的线程尝试加锁。
ReentrantReadWriteLock的基本内部单元
Sync继承AQS,读写锁的核心业务逻辑都在AQS中,不管是公平锁还是非公平锁,读锁还是写锁,他们都只是调用Sync中的方法实现公平竞争和非公平竞争,加锁和解锁的业务逻辑。写锁调用Sync#tryAcquire()方法加锁,调用Sync#tryRelease()方法解锁。读锁调用Sync#tryAcquireShared()加锁,调用Sync#tryReleaseShared()加锁。而这些方法的本质都是通过改变AQS#state的值来改变锁状态。
// 读锁
private final ReentrantReadWriteLock.ReadLock readerLock;
// 写锁
private final ReentrantReadWriteLock.WriteLock writerLock;
// 抽象静态内部类,继承AQS,读写锁的核心的业务逻辑都在Sync类中
final Sync sync;
// 非公平锁,继承Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false;
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
// 公平锁,继承Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
// 读锁
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
// 构造函数
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 读锁的加锁方法
public void lock() {
sync.acquireShared(1);
}
// 读锁的解锁方法
public void unlock() {
sync.releaseShared(1);
}
}
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
// 构造函数
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
// 写锁加锁方法
public void lock() {
sync.acquire(1);
}
// 写锁解锁方法
public void unlock() {
sync.release(1);
}
}
下面来看下抽象内部类Sync的基本组成,Java中不管哪个并发类都是通过定义AQS中int类型的状态码state实现的。ReentrantReadWriteLock将状态码state分为高16位和低16位。状态码state的高16位用于存储并发读取的线程数(读线程重入的次数也加入计数),即状态码state的高16位=(线程1的重入次数+线程2的重入次数+...+线程n的重入次数)。状态码state的低16位用于存储单个写线程的重入次数(写锁为互斥锁,只有一个线程能获取写锁)。
abstract static class Sync extends AbstractQueuedSynchronizer {
static final int SHARED_SHIFT = 16;
// 用于高16加一操作(高16位用于存储并发读取的线程数)
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
// 最大并发读取的线程数(读线程的重入也加入计数) = 最大的单个写线程的重入次数 = 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
// 16位的二进制1,用于计算低16位的写单个写进程冲入次数
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
// 返回高16位并发读的线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
// 返回低16位单个写进程重入次数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 用于保存线程id和对应的重入次数
static final class HoldCounter {
int count = 0;
final long tid = getThreadId(Thread.currentThread());
}
// 继承自ThreadLocal,用于保存除了第一个读线程外,其他读取线程各自的重入次数
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
// 用于保存除了第一个读线程外,其他读取线程的重入次数
private transient ThreadLocalHoldCounter readHolds;
// 用于上一次读读线程的id和重入次数
private transient HoldCounter cachedHoldCounter;
// 用于报错第一个读线程对象
private transient Thread firstReader = null;
// 用于保存第一个读线程的重入次数
private transient int firstReaderHoldCount;
Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
abstract boolean readerShouldBlock();
abstract boolean writerShouldBlock();
// 写线程解锁方法
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 写线程加锁方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
// 读线程解锁方法
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
// 读线程加锁方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
if (firstReader == current) {
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh;
}
return 1;
}
}
}
}
ReentrantReadWriteLock初始化过程
下面先来看下ReentrantReadWriteLock的初始化过程的源码
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
}
}
/*ReentrantReadWriteLock#ReentrantReadWriteLock()*/
public ReentrantReadWriteLock() {
this(false);
}
/*ReentrantReadWriteLock#ReentrantReadWriteLock(boolean fair)*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
/*ReentrantReadWriteLock$ReadLock#ReadLock(ReentrantReadWriteLock lock)*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/*ReentrantReadWriteLock$WriteLock#WriteLock(ReentrantReadWriteLock lock)*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
我们可以看到,默认初始化为非公平锁。我们看读写锁的初始化逻辑,他们都传入了当前ReentrantReadWriteLock对lock,并将sync参数设置为lock.sync。读写锁的业务逻辑都是通过调用sync中的方法实现,这里就可以看出sync才是实现读写锁的关键类。
ReentrantReadWriteLock读锁加锁过程
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
readLock.lock();
}
}
/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取AQS中的状态码state
int c = getState();
// 如果低16位的独占写线程的重入次数不为0,并且独占线程不等于当前线程则返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取高16位所有读线程的重入次数之和
int r = sharedCount(c);
// 如果阻塞队列为空或者阻塞队列第一个线程获取读锁&&高16位所有读线程的重入次数之和小于65535则当前线程允许获取读锁&&使用CAS将高16位所有读线程的重入次数之和加一成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果是第一个获取读锁的线程,则使用firstReader记录第一个获取读锁的线程对象,并设置第一个获取读锁的线程的重入次数firstReaderHoldCount为1。
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果当前线程是第一次获取锁的线程
} else if (firstReader == current) {
// 将第一次获取锁的线程的重入次数加1
firstReaderHoldCount++;
} else {
// 如果不是第一个获取读锁的线程
// 获取上一次获取读锁的线程的相关记录(上次获取读锁的线程的id和重入次数)
HoldCounter rh = cachedHoldCounter;
// 如果没有上一个获取获取读锁的线程的信息(第一个获取读锁的线程不算)||上一个获取读锁的线程id不等于当前的线程的id
if (rh == null || rh.tid != getThreadId(current))
// 从TreadLocal中获取当前线程的信息(重入信息),将上次获取读锁的线程信息设置为当前获取读锁的线程信息。
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
// 当前线程的重入次数加1
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
非公平获取读锁:在线程获取读锁的过程中,有一步很关键,就是通过NonfairSync#readerShouldBlock()判断阻塞队列的对头的等待线程是请求读锁还是写锁。如果对头的等待线程请求的是读锁,则当前线程不需要加入阻塞队列,可以非公平的竞争读锁。如果对头的等待线程获取的写锁,则当前线程加入阻塞队列并挂起。
/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
return nextWaiter == SHARED;
}
ReentrantReadWriteLock写锁加锁过程
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock writeLock = readWriteLock.writeLock();
writeLock.lock();
}
}
/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/*ReentrantReadWriteLock$Sync#tryAcquire()*/
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
// 获取AQS中的状态码state
int c = getState();
// 获取低16位独占线程的重入次数
int w = exclusiveCount(c);
if (c != 0) {
// 如果独占线程的重入次数w为0(状态码c不为0,独占线程的重入次数w为0说明当前有读锁还没释放,不能加写锁)|| 独占线程的重入次数w为不为零但是当前线程不是获得写锁得独占线程。
if (w == 0 || current != getExclusiveOwnerThread())
// 加锁失败
return false;
// 独占线程的重入次数w加1后如果大于最大得重入次数,则抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 独占线程的重入次数w加1,不需要CAS进行加一,因为是独占线程地重入(不存在并发)
setState(c + acquires);
// 加锁成功
return true;
}
// 在状态码c=0时,说明当前没有任何线程获取锁。写线程可以直接获取写锁,不需要阻塞,所以writerShouldBlock()直接返回false, 并使用CAS对写独占线程的重入次数加1
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设独占线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
/*ReentrantReadWriteLock$NonfairSync#writerShouldBlock*/
final boolean writerShouldBlock() {
return false; // writers can always barge
}
写锁的加锁过程比较简单,直接注释就能看懂,这里不在赘述。
ReentrantReadWriteLock读锁阻塞过程
线程在获取读锁时,有两种情况会导致线程阻塞:
- 当前已经有线程获取写锁,并且还没有释放写锁。
- 已经有线程获取了读锁同时没有释放读锁并且阻塞队列的第一个阻塞线程请求写锁。
第一种情况很好理解,当已经有线程获取了写锁时,任何其他线程尝试加锁都会失败并并加入阻塞队列。我们下面来看下第二种情况。
public class LearnReadWriteLock {
public static void main(String[] args){
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
Lock readLock = readWriteLock.readLock();
Lock writeLock = readWriteLock.writeLock();
// 线程1获取读锁
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
}
}, "Thread1-tryRead");
thread1.start();
// 线程2获取写锁
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
writeLock.lock();
}
}, "Thread2-tryWrite");
thread2.start();
// 线程3获取读锁
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
readLock.lock();
}
}, "Thread3-tryRead");
thread3.start();
}
}
加入阻塞队列。head指向阻塞队列的头节点(头节点不存具体阻塞线程的信息,只是用来指向阻塞队列的第一个阻塞线程对应的Node),head指向队列的节点,tail指向队列的尾节点。
我们先开看下thread请求写锁的失败后的操作,即调用acquireQueued(addWaiter(Node.EXCLUSIVE), arg)的源码,下面来看下阻塞队列的插入过程addWaiter(Node.EXCLUSIVE)。Node.EXCLUSIVE为空,表示当前阻塞线程请求的是写锁。
/*ReentrantReadWriteLock$WriteLock#lock()*/
public void lock() {
sync.acquire(1);
}
/*AbstractQueuedSynchronizer#acquire()*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.EXCLUSIVE(Node EXCLUSIVE = null);
// 为当前阻塞线程创建Node节点用于存储阻塞线程的基本信息
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果尾节点为空,则创建首尾节点,并使用尾插法在尾节点tail后面插入当前node
enq(node);
return node;
}
/*AbstractQueuedSynchronizer$Node#Node(Thread thread, Node mode)*/
Node(Thread thread, Node mode) {
// nextWaiter = null代表当前阻塞线程请求获取写锁
this.nextWaiter = mode;
// 用于thread用于存储阻塞线程对象
this.thread = thread;
}
/*AbstractQueuedSynchronizer$Node#enq(final Node node)*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
// 使用CAS使head指向新建的头节点
if (compareAndSetHead(new Node()))
// 由于当前阻塞队列只有一个头节点,因此头节点就是尾节点
tail = head;
} else {
// 新的阻塞节点的前一个节点指尾节点tail
node.prev = t;
// 使用CAS将尾节点指针tail指向当前节点
if (compareAndSetTail(t, node)) {
// 前任尾节点的后一个节点指向新的尾节点
t.next = node;
return t;
}
}
}
}
addWaiter(Node.EXCLUSIVE)执行完后,节点插入到阻塞队列,如下图所示,head指向阻塞队列的头节点(头节点不存具体阻塞线程的信息,只是用来指向阻塞队列的第一个阻塞线程对应的Node),head指向队列的节点,tail指向队列的尾节点。在线程2对应的阻塞节点插入阻塞队列后,阻塞队列如下图所示:
线程2对应的阻塞节点插入到阻塞队列后,线程2还未挂起。还要继续执行acquireQueued(node, arg)方法,下面来看下acquireQueued(final Node node, int arg)源码。
/*AbstractQueuedSynchronizer#acquireQueued(final Node node, int arg)*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前一个节点
final Node p = node.predecessor();
// 前一个节点如果是头节点则再次尝试获取写锁
if (p == head && tryAcquire(arg)) {
// 如果写锁获取成功,将当前线程对应的阻塞节点从阻塞队列中移除
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// shouldParkAfterFailedAcquire(p,node)会将当前节点的前一个节点的waitStatus设为-1,并再次进去for循环,如果还是不能获取写锁则挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued(final Node node, int arg)方法会进行两次for循环让线程2尝试获取写锁,当然尝试获取写锁的前提的线程2对应的阻塞节点的前一个节点是头节点。显然这里满足这个条件,于是线程2会两次尝试获取写锁,但是还是以失败告终,线程2最终调用parkAndCheckInterrupt()挂起。在线程2挂起后阻塞队列图下图所示。
最后我们来看看线程3获取读锁的过程,判断线程3是否需要阻塞的条件是队列的第一个阻塞线程请求的是否是读锁,如果请求的是读锁,线程3尝试获取读锁(非公平)。如果请求的是写锁,则线程3需要阻塞。
/*ReentrantReadWriteLock$ReadLock#lock()*/
public void lock() {
sync.acquireShared(1);
}
/*AbstractQueuedSynchronizer#acquireShared()*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
/*ReentrantReadWriteLock$Sync#tryAcquireShared()*/
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 获取AQS中的状态码state
int c = getState();
// 如果低16位的独占写线程的重入次数不为0,并且独占线程不等于当前线程则返回-1。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取高16位所有读线程的重入次数之和
int r = sharedCount(c);
// 如果阻塞队列为空或者阻塞队列第一个线程获取读锁&&高16位所有读线程的重入次数之和小于65535则当前线程允许获取读锁&&使用CAS将高16位所有读线程的重入次数之和加一成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
/*ReentrantReadWriteLock$NonfairSync#readerShouldBlock()*/
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
/*AbstractQueuedSynchronizer#apparentlyFirstQueuedIsExclusive()*/
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
/*AbstractQueuedSynchronizer#isShared()*/
final boolean isShared() {
return nextWaiter == SHARED;
}
显然,当前阻塞队列的第一个阻塞线程2尝试获取的是写锁,接下来运行阻塞方法doAcquireShared(arg); doAcquireShared(arg)会将线程3对应的阻塞节点插入到阻塞队列中,并两次尝试获取读锁,如果还是获取不到,则挂起当前线程。显然这里线程3最后还是会获取读锁失败。
/*AbstractQueuedSynchronizer#doAcquireShared()*/
private void doAcquireShared(int arg) {
// 插入当前线程对应的阻塞节点到阻塞队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果当前线程对应的阻塞节点的前一个节点是头节点,直接尝试获取读锁
if (p == head) {
// 获取读锁成功
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// shouldParkAfterFailedAcquire(p,node)会将当前节点的前一个节点的waitStatus设为-1,并再次进去for循环,如果还是不能获取读锁则挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/*AbstractQueuedSynchronizer#acquire()*/
private Node addWaiter(Node mode) {// Node.SHARED(Node SHARED = new Node();)
// 为当前阻塞线程创建Node节点用于存储阻塞线程的基本信息
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 在尾部插入线程3的阻塞节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
在线程3挂起后阻塞队列如下图所示。
总结
以上就是ReentrantReadWriteLock的获取读写锁的全部内容,关于解锁部分的原理,这里不在赘述。需要注意一下几点:
- 如果已经有线程获取了写锁,则其他任何线程如果尝试获取读写锁都会失败。
- 如果已经有线程获取了读锁,其他线程尝试获取读锁时,需要判断阻塞队列的第一个阻塞线程尝试获取的锁的类型,如果是读锁,则可以非公平的竞争读锁。如果是写锁,则加入阻塞队列,在两次尝试获取读锁失败后被挂起。