ReentrantReadWriteLock与ReentrantLock
说到ReentrantReadWriteLock, 首先要做的是与ReentrantLock划清界限. 它和后者都是单独的实现, 彼此之间没有继承或实现的关系.
ReentrantLock实现了标准的互斥操作, 也就是一次只能有一个线程持有锁, 也即所谓独占锁的概念. 显然这个特点在一定程度上面减低了吞吐量, 实际上独占锁是一种保守的锁策略, 在这种情况下任何"读/读", "写/读", "写/写"操作都不能同时发生.但是同样需要强调的一个概念是, 锁是有一定的开销的, 当并发比较大的时候, 锁的开销就比较客观了. 所以如果可能的话就尽量少用锁, 非要用锁的话就尝试看能否改造为读写锁.
ReadWriteLock描述的是: 一个资源能够被多个读线程访问, 或者被一个写线程访问, 但是不能同时存在读写线程. 也就是说读写锁使用的场合是一个共享资源被大量读取操作, 而只有少量的写操作(修改数据). ReadWriteLock接口的代码如下:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
* @return the lock used for reading.
*/
Lock readLock();
/**
* Returns the lock used for writing.
* @return the lock used for writing.
*/
Lock writeLock();
}
ReentrantReadWriteLock的特性
ReentrantReadWriteLock有以下几个特性:
1.公平性
非公平锁(默认) 这个和独占锁的非公平性一样, 由于读线程之间没有锁竞争, 所以读操作没有公平性和非公平性, 写操作时, 由于写操作可能立即获取到锁, 所以会推迟一个或多个读操作或者写操作. 因此非公平锁的吞吐量要高于公平锁.
公平锁利用AQS的CLH队列, 释放当前保持的锁(读锁或者写锁)时, 优先为等待时间最长的那个写线程分配写入锁, 当前前提是写线程的等待时间要比所有读线程的等待时间要长. 同样一个线程持有写入锁或者有一个写线程已经在等待了, 那么试图获取公平锁的(非重入)所有线程(包括读写线程)都将被阻塞, 直到最先的写线程释放锁. 如果读线程的等待时间比写线程的等待时间还有长, 那么一旦上一个写线程释放锁, 这一组读线程将获取锁.
2.重入性
读写锁允许读线程和写线程按照请求锁的顺序重新获取读取锁或者写入锁. 当然了只有写线程释放了锁, 读线程才能获取重入锁.
写线程获取写入锁后可以再次获取读取锁, 但是读线程获取读取锁后却不能获取写入锁.
另外读写锁最多支持65535个递归写入锁和65535个递归读取锁.
3.锁降级
写线程获取写入锁后可以获取读取锁, 然后释放写入锁, 这样就从写入锁变成了读取锁, 从而实现锁降级的特性.
4.锁升级
读取锁是不能直接升级为写入锁的. 因为获取一个写入锁需要释放所有读取锁.
5.锁获取中断
读取锁和写入锁都支持获取锁期间被中断. 这个和独占锁一致.
5.条件变量
写入锁提供了条件变量(Condition)的支持, 这个和独占锁一致, 但是读取锁却不允许获取条件变量, 将得到一个UnsupportedOperationException异常.
ReentrantReadWriteLock中的state
在ReentrantLock中该字段用来描述有多少线程获持有锁. 在独占锁的时代这个值通常是0或者1(如果是重入的就是重入的次数), 在共享锁的时代就是持有锁的数量. ReadWriteLock的读、写锁是相关但是又不一致的, 所以需要两个数来描述读锁(共享锁)和写锁(独占锁)的数量. 显然现在一个state就不够用了. 于是在ReentrantReadWrilteLock里面将这个字段一分为二, 高位16位表示共享锁的数量, 低位16位表示独占锁的数量(或者重入数量). 2^16-1=65536, 这就是上节中提到的为什么共享锁和独占锁的数量最大只能是65535的原因了.
本文主要分析以下几个过程:
ReentrantReadWriteLock的创建(公平锁/非公平锁)
上锁: lock()(读取锁/写锁)
解锁: unlock()(读取锁/写锁)
ReentrantReadWriteLock的内部结构
ReentrantReadWriteLock的内部主要结构如下:
ReentrantReadWriteLock-->ReadWriteLock
NonfairSync/FairSync-->Sync-->AbstractQueuedSynchronizer-->AbstractOwnableSynchronizer
ReadLock/WriteLock-->Lock
注意 对应关系: "子类/实现"-->"父类/接口". 其中, NonfairSync, FairSync,Sync,ReadLock和WriteLock是ReentrantReadWriteLock的内部类.
ReentrantReadWriteLock的创建
ReentrantReadWriteLock的构造函数
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
可以看到与ReentrantLock一样, 锁的主体部分依然是Sync(FairSync/NonFaireSync), 但是增加了readerLock和writerLock两个实例. 参看ReadLock和WriterLock的构造函数可知两个实例共享一个sync实例.
/**
* ReadLock构造函数
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* WriteLock构造函数
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
写入锁的获取
主要流程与ReentrantLock类似.
1.持有锁线程数非0(c=getState()不为0), 如果写线程数(w)为0(那么读线程数就不为0)或者独占锁线程(持有锁的线程)不是当前线程就返回失败, 或者写入锁的数量(其实是重入数)大于65535就抛出一个Error异常. 否则进行2.
2.持有锁线程数为0(c=0的情况)时, 如果当前线程需要阻塞(writerShouldBlock, 这是FairSync与NonFairSync的区别)那么就返回失败, 如果增加写线程数失败也返回失败. 否则进行3.
3.设置独占线程(写线程)为当前线程, 返回true.
涉及到的函数:
ReentrantReadWriteLock.WriteLock->lock()
public void lock() {
sync.acquire(1);
}
AbstractQueuedSynchronizer->acquire(int arg)
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantReadWriteLock.Sync->tryAcquire(int acquires)(主体函数)
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
接下来看writerShouldBlock在ReentrantReadWriteLock.FairSync与ReentrantReadWriteLock.NonFairSync中的实现.
ReentrantReadWriteLock.NonFairSync->writerShouldBlock()
final boolean writerShouldBlock() {
return false; // writers can always barge
}
即ReentrantReadWriteLock.NonFairSync在获取写入锁的过程中永远不会阻塞.
ReentrantReadWriteLock.FairSync->writerShouldBlock()
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
ReentrantReadWriteLock.FairSync的实现与ReentrantLock一致, 如果AQS队列不为空并且当前线程不是在AQS的队列头那么就阻塞线程, 直到队列前面的线程处理完锁逻辑.
读取锁的获取
读取锁的获取过程比写入锁稍显复杂,主要流程如下:
1.如果写线程持有锁(也就是独占锁数量不为0), 并且独占线程不是当前线程, 那么就返回失败. 因为允许写入线程获取锁的同时获取读取锁. 否则进行2.
2.如果读线程请求锁数量达到了65535(包括重入锁), 那么就跑出一个错误Error, 否则进行3
3.如果读线程不用等待(实际上是是否需要公平锁), 并且增加读取锁状态数成功, 那么就返回成功, 否则进行4.
4.fullTryAcquireShared(内部包含一个无限循环, 实际上是过程3的不断尝试直到CAS计数成功或者被写入线程占有锁)
涉及的代码如下:
ReentrantReadWriteLock.ReadLock->lock()
public void lock() {
sync.acquireShared(1);
}
AbstractQueuedSynchronizer->acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock.Sync->tryAcquireShared(int unused)
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
ReentrantReadWriteLock.Sync->fullTryAcquireShared(Thread current)
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读取锁的释放
读取锁的释放其实就是一个不断尝试的CAS操作, 直到修改状态成功. 前面说过state的高16位描述的共享锁(读取锁)的数量, 所以每次都需要减去2^16, 这样就相当于读取锁数量减1. 实际上SHARED_UNIT=1<<16.
ReentrantReadWriteLock.ReaderLock->unlock()
public void unlock() {
sync.releaseShared(1);
}
AbstractQueuedSynchronizer->releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
ReentrantReadWriteLock.Sync->tryReleaseShared(int unused)
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
HoldCounter
HoldCounter保存了当前线程持有共享锁(读取锁)的数量, 包括重入的数量. 那么这个数量就必须和线程绑定在一起.
在Java里面将一个对象和线程绑定在一起, 就只有ThreadLocal才能实现了. 所以毫无疑问HoldCounter就应该是绑定到线程上的一个计数器.
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = Thread.currentThread().getId();
}
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
小结
使用ReentrantReadWriteLock可以推广到大部分读, 少量写的场景, 因为读线程之间没有竞争, 所以比起sychronzied, 性能好很多.