ReentrantLock是独占锁,只允许一个线程执行;CountDownLatch,Semaphore等是共享锁;它们分别利用了AQS的独占与共享功能;那么如果在读操作远多于写操作的情况下该如何选择?读写锁,之前的文章中介绍了如何自己实现一个读写锁,还实现了重入功能,读读,写写,读写,写读四种重入。现在来看看JUC包下的ReentrantReadWriteLock的实现。
先来大致了解下ReentrantReadWriteLock:
- 读锁是个共享锁,写锁是个独占锁。读锁同时被多个线程获取,写锁只能被一个线程获取。读锁与写锁不能同时存在。
- 一个线程可以多次重复获取读锁和写锁
- 锁降级:获取写锁的线程又获取了读锁,之后释放写锁,就完成了一次锁降级。
- 锁升级:不支持升级。获取读锁的线程去获取写锁的化会造成死锁。
- 重入数:读锁和写锁的最大重入数为65535
- 公平与非公平两种模式
AQS维护了一个int值,表示同步状态;对于ReentrantLock,state会在0与1之间变化,1表示已被占有后续线程入队列等待,0表示free。对于CountDownLatch,会先将state赋予个大于0的值,在该值变为0后唤醒等待队列中的线程。那么如何用它来即表示读锁又表示写锁呢?读锁我们是允许多个线程同步运行的,我们还允许重入,那么拿什么来记录每个线程读锁的重入数?
针对上面两个问题,对于同步状态status,高16位表示所有线程持有的读锁总数,低16位为一个线程的写锁总数,包括重入。
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
采用ThreadLocal来记录每个线程锁持有的读锁数目。
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
private transient ThreadLocalHoldCounter readHolds;
private transient HoldCounter cachedHoldCounter;
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
- HoldCounter 静态内部类用来记录一个线程的读锁重入数,以及id;
- ThreadLocalHoldCounter继承了ThreadLocal,实现了initialValue方法,作用是在没有set前调用get的话initialValue会被调用,HoldCounter对象会被存储到Entry里,并返回它。变量名为readHolds,它用来存储/获取线程的读锁数量。因为读锁是共享的,我们利用同步状态的高16位来记录总数,用threadlocal来记录每个线程所持有的读锁数目。对于写锁来说它是独占锁,低16位代表的就是当前线程持有的写锁数目。
- cachedHoldCounter:它是一种优化的手段,为了避免频繁的调用ThreadLocalHoldCounter的读取,更改甚至删除操作,于是缓存最新一个成功获取锁的线程的HoldCounter,意思是当一个线程需要记录值的时候会先检查自己是否是cachedHoldCounter中缓存的那个线程,是的话就不用再从readHolds中获取了,减少对ThreadLocal的操作。
- firstReader 与firstReaderHoldCount:代表首个获取读锁的线程与其所持有的读锁数,该读锁数不会存储进readHolds,这是种优化,针对只有一个线程的情况,避免频繁操作readHolds。
ReentrantReadWriteLock继承了ReadWriteLock,这两个方法分别返回读锁与写锁。ReentrantReadWriteLock内部实现了两个类:ReadLock&WriteLock分别实现读锁与写锁。
public interface ReadWriteLock {
Lock readLock();
Lock writeLock();
}
同ReentrantLock一样内部实现了非公平与公平两种同步器:NonfairSync &FairSync ,继承自同一同步器Sync。
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}
只定义了writerShouldBlock & readerShouldBlock两种方法,它们作用在获取锁的过程中,决定当前线程是否该阻塞。
一 读锁
1.获取读锁
public void lock() {
sync.acquireShared(1);
}
定位到AQS的acquireShared,该方法之前介绍过。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
来看看Sync重写的tryAcquireShared方法
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
写锁不为零且持有写锁的并非本线程,则返回-1,之后在acquireShared中将线程的节点放入到等待队列中。写锁不为零但是正是本线程持有的,则代表写读重入。之后在readerShouldBlock返回false与CAS操作成功后,更新HoldCounter 的值,这里会对之前提到的firstReader ,firstReaderHoldCount 或cachedHoldCounter进行相应的操作。
如果CAS失败或者readerShouldBlock返回true,则会调用fullTryAcquireShared,该方法会继续尝试获取读锁,可以看成是tryAcquireShared的升级版。
先来看看readerShouldBlock()方法:
公平模式下,根据队列中当前线程之前有没有等待的线程来决定。
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
非公平模式
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
调用apparentlyFirstQueuedIsExclusive()
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
- 返回true代表等待队列head.next节点是等待写锁的线程,该方法的目的是不让写锁一直等待下去;比如在上一篇自己实现的读写锁中,通过增加一个写请求变量来防止写饥饿,让写锁的优先级高于读锁。这里有相似的目的。
- 这个方法是不可靠的,因为在检测过程中队列结构是在变化的,;但是我们并不依赖于它的准确表达,它更多是一种探测,一种优化,我们希望它来防止写锁的饥饿;而且并不是该方法返回了true,线程就会被放入阻塞队列退出竞争,来看fullTryAcquireShared的逻辑
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
之前说fullTryAcquireShared是tryAcquireShared的升级版,它处理了CAS失败和readerShouldBlock返回true的情况;
- 先检查写锁是否被其他线程占用;
- 调用readerShouldBlock检查当前线程是否应该进入等待队列,返回true也不代表该线程要进入等待队列,我们看它的处理逻辑:如果firstReader == current代表当前只有你一个读线程,那么不用等待可以获取读锁;只有在rh.count == 0(意味着该线程没有持有读锁)的情况下返回-1代表线程要进入等待队列。为什么?持有读锁的线程不能进入同步队列?
- 之后便是CAS,成功便更改HoldCounter值返回1,代表获取读锁成功,否则循环再次检查,总之不能轻易的将线程放入等待队列,容易造成死锁。
上面问题的解答:一个线程是不能随便放入队列中等待的,容易造成死锁,看下面两种情况:
- 如果一个线程持有读锁,重入失败被放入等待队列,若等待队列中排在它前面的线程里有等待写锁的线程,那么就会造成死锁,因为读锁与写锁是互斥的。
- 假设一个线程持有写锁进行“锁降级申请”,被放入同步队列,那么不仅之后的读写线程都会被放入队列,队列中之前有等待线程,无论等待的是读或写锁都将造成死锁。
回到fullTryAcquireShared
- 它先判断是否有写锁,如果有且就是本线程就不会进行readerShouldBlock判断,直接CAS,这样便解决了情况2的问题;
- 针对情况1线程持有读锁的情况,即使readerShouldBlock返回true,rh.count == 0不符合不会返回-1,也就不会将线程放入队列。
总结就是fullTryAcquireShared,采用for循环方式让线程不断判断与尝试,且只有在一种情况下才会将线程放入队列:readerShouldBlock返回true(原因可能是公平模式或者第一个等待线程(head.next)在等待写锁),当前线程不是第一个读线程且没有持有读锁。
2.释放读锁
public void unlock() {
sync.releaseShared(1);
}
定位到AQS的releaseShared,之前介绍过。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
来看看同步器Syn实现的tryReleaseShared
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
1.如果是firstReader ,就对它及firstReader进行修改;2.如果不是,就对readHolds进行修改;3. 自旋CAS修改status
返回true代表status == 0,表示既没有读锁也没有写锁。
3. tryLock
public boolean tryLock() {
return sync.tryReadLock();
}
调用了同步器Syn的tryReadLock
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
感觉与tryAcquireShared很像,不同在于tryReadLock只尝试获取读锁一次,成功就返回true,否则false;这是由于方法用途不同,所以设计自然不同。
二, 写锁
1,获取写锁
public void lock() {
sync.acquire(1);
}
定位到AQS的acquire中,AQS文章里介绍过。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
来看看同步器Syn重写的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
一 :c != 0下分两种 1. w == 0 代表有读锁;2. w != 0 && current != getExclusiveOwnerThread() 代表有其他写锁;以上两种返回false,线程要进入等待队列。否则就设置status值,返回true,获取写锁成功。
二 :c == 0情况下要看writerShouldBlock的情况,返回false就会去CAS更改同步状态,成功就将AOS里的exclusiveOwnerThread设置位当前线程,最后返回true;
注:情况一用setState更改同步状态,情况二用compareAndSetState?情况一执行到setState这步说明当前线程已持有写锁,是在重入,其他线程都会被排斥不同担心线程安全问题,所以setState就可以,同步状态status是volatile的。情况二里当前即无读锁也无写锁,当前线程始于其他线程在竞争,所以要利用CAS来保证原子性。
持有读锁线程不能申请写锁,即不能升级,从tryAcquire可以看出持有读锁线程一定会返回false,也就是会被放入队列中等待写锁,但是它持有的读锁将不会被释放,那么写锁就不肯能获取到,虽然不影响读锁的获取,但所有写锁都将不能被获取到。
来看看writerShouldBlock方法
非公平模式:直接返回false,也就是写请求可以插队,即写优先级高;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
公平模式:考虑的是当前队列是否有等待的线程
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
2,释放写锁
public void unlock() {
sync.release(1);
}
定位到AQS中的release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
来看看同步器Syn重写的tryRelease
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
注意返回值:返回true代表写锁个数为0,也就是写锁可用;返回false表示写锁仍被当前线程占着,可能是因为当前线程重入了写锁。
3,tryLock
public boolean tryLock( ) {
return sync.tryWriteLock();
}
定位到同步器Syn
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
tryWriteLock方法看上去跟tryAcquire方法真的很像。唯一的区别在于,tryWriteLock忽略的writerShouldBlock方法;该方法的调用就是去抢写锁,抢不到返回false就行了。