ReentrantReadWriteLock可重入读写锁,先从功能以及具体实现有一个简单的了解
一、两把锁
writerLock,readerLock分为读锁跟写锁,他们之间的共存与互斥进行一个简单罗列
写写互斥、读写互斥、写读可降级,读写不可升级、读读可并行
这是我的总结,另外竞争状态同样是使用state变量进行的,通过高16进行共享锁也就是读读线程之间的共享模式调度,低16位用于写锁的阻塞模式。
也就是说写锁重入次数是int最大取值范围的差不多一半,不过这也够用了。
二、同步器代码
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
这几个成员属性主要是做state值的切分与转换的,
SHARED_SHIFT =16 等于是将int的state 4字节 32位进行切分,16位
SHARED_UNIT 左移位16位也就是实际share模式(读锁时候state的)存储时候是以65536为底,
MAX_COUNT 读锁或者写锁最大的资源数,65535
EXCLUSIVE_MASK 字面意思独占模式的掩码,其实就是为了方便进行独占模式进行位&运算
sharedCount(int c) 通过无符号右移操作得出目前share资源的剩余数量
exclusiveCount(int c) 通过掩码的与运算,"抹掉"高位的读锁占用的变量值,不得不说你大爷还是你大爷,如果是我也就是加加减减那么用。
用于存储读锁的计数器在重入时候使用,代码如下
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}
读锁的重入计数是通过线程本地变量ThreadLocal里面存储了HoldCounter实现的
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
接下来对几个变量做一个简单说明就开始分析几个重要的方法
当前线程对自身重入计数器的一个引用
private transient ThreadLocalHoldCounter readHolds;
sync对象内部缓存的最后一个获得读锁的线程计数器对象引用目的提升查询比较时候的性能
private transient HoldCounter cachedHoldCounter;
sync对象内部缓存的第一个获取读锁的线程
private transient Thread firstReader = null;
sync对象内部缓存的第一个获取读锁的线程重入次数
private transient int firstReaderHoldCount;
两个抽象方法定义要求公平锁与非公平锁去实现在尝试获得锁时候是否进行当前线程的阻塞
判断读锁是否要阻塞
abstract boolean readerShouldBlock();
判断写锁是否需要阻塞
abstract boolean readerShouldBlock()
公平锁实现
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
判断当前阻塞队列是否有等待节点
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
判断当前阻塞队列是否有等待节点
return hasQueuedPredecessors();
}
}
非公平锁实现
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
不阻塞尝试进行加锁
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
判断当前是否有写锁,如果没有写锁就不阻塞
return apparentlyFirstQueuedIsExclusive();
}
}
下面仔细分解一下读锁跟写锁的加锁与释放锁的相关代码
从命名就可以看出来哪些是用于写锁的
protected final boolean tryRelease(int releases) {
//因为写锁是独占模式,如果当前线程不是锁持有者则属于异常状态
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//因为支持重入所以不只是0跟1的关系
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
//当state减到0的时候进行竞争状态的释放,并修改当前写锁线程持有者为空
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
这部分是写锁的获取,写锁的获取稍微有些复杂
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
取出当前state的数值
int c = getState();
通过与运算算出当前独占锁下数值
int w = exclusiveCount(c);
如果是state不为0有可能有线程获得了读锁,也有可能是有现成获得写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
这表示两种情况下都失败
1.有读锁存在
2.有写锁,但是当前写锁持有线程不是当前线程
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
state不为0并且当前线程又为锁持有线程
锁重入增加state的值
setState(c + acquires);
return true;
}
如果当前state为0走入这个分支,判断是否需要等待,
或者尝试修改竞争状态失败返回加锁失败否则设置当前
线程为锁持有线程
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
下面是读锁的获取与释放逻辑相关的代码
个人认为读锁的获取相对值得自信分解,因为里面会涉及一个锁降级过程,并且还有一个令我想了挺长没get到李大爷的点的地方,就是一个死循环去获取读锁。前提条件是目前无写锁未被持有
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
当前如果有写锁,并且写锁线程不是自己就进入读锁获取逻辑
这里面是有含义的包含了锁降级的支持,
假设一个线程获得了写锁,那么第一个条件是成立的,他会判断持有线程是不是当前线程,如果是当前线程那么还是会执行下面读锁获取代码。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这是那个困扰我地方,为什么跟上面方法一样还要再来这么个方法,截止到目前我的理解是这样的,之所以可以这么任性的死循环其实是因为读锁数量限制造成的,以往比如信号量,他虽然会通过共享模式提高并发线程数,但是并发量会被严格限制,读锁比较特殊在源码一开始就有一段话
This lock supports a maximum of 65535 recursive write locks
and 65535 read locks. Attempts to exceed these limits result in
*{@link Error} throws from locking methods.
正是因为这个近乎不限制并发读数量的做法才允许这么任性的死循环,因为这个并发获得读锁其实是很快的,所以不需要进入阻塞队列进行排队,但是这里里面有一个前提是当前没有被写锁阻塞,因为如果写锁阻塞大量的循环是无效的依然会浪费性能,还是要去排队的。代码如下,就是为了在一次获得
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
另外还有两个尝试获得锁的方法一个是尝试获得写锁,一个是尝试获得读锁,基础逻辑为不进行是否应该阻塞当前线程判断,而是直接进行尝试,加锁成功则返回true否则返回false。这里面还是要特别注意读锁的获取依然采用的循环方式,还是因为不限制读锁数量的原因
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
if (!compareAndSetState(c, c + 1))
return false;
setExclusiveOwnerThread(current);
return true;
}
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
当当前没有写锁时候就一直进行循环添加当前线程读锁进行尝试
int c = getState();
锁降级
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
最后一个注意点就是condition的支持,写锁支持condition,读锁不支持
可以通过代码就看的出来
读锁中获得conditon方法
public Condition newCondition() {
throw new UnsupportedOperationException();
}
写锁中获得condition方法
public Condition newCondition() {
return sync.newCondition();
}
其他方法就不进行说明了,都是一些统计或者判断的方法,避免没有重点。
最后做一个总结吧
1、可重入读写锁利用state变量标记竞争状态具体是高16位用于读锁分配计数低16位用于写锁重入计数
2、读锁的最大并发量是65535个读并发,写锁支持65535次重入。
3、写锁的获取与重入跟ReetrantLock区别不算大,都是通过state进行重入计数的
4、锁关系总结如下,读锁被持有则写锁获取失败、写锁被持有则其他线程获取读锁或者写锁失败,但是本线程可以支持获取读锁(锁降级)。读锁与读锁是可以并发获取的。
5、读锁的重入计数是通过ThreadLocal里面的HoldCounter进行存储的。为了提升对比效率做了一些对象的缓存firsterReader,firstReaderHoldCount、cachedHoldCounter