ReentrantReadWriteLock
有互斥锁ReentrantLock还要增加一个ReentrantReadWriteLock的原因是在大量读的情况下,ReentrantLock比较慢。读是不存在资源互斥的,所以增加读写锁提高效率
- 读读操作是共享锁
- 读写操作是互斥的
- 写读操作是互斥的
- 写写操作是互斥的
- 当前线程拿到写锁后,可以重新获得读锁(写读可重入)
- 当前线程拿到读锁后,不可以拿写锁(读写不可重入)
读写锁也是AQS实现,控制锁的字段也是state。不同的是将state的高16位做为读锁的标识,state的低16位作为写锁。
锁重入的方式
- 写锁:跟互斥锁类似,直接通过state的低16直接累加就是写锁重入次数。由于采用低16位控制,所以重入次数减少
- 读锁:由于全部线程都能对读锁从入,那么就出现一个问题,如果只在AQS里面记录读锁重入次数,读锁就不知道每个线程自己从入的次数。比如线程A重入2次,线程B重入2次,state高16记录着4,但是不知道这个4该有谁来释放。所以需要由每个线程自己记录读锁重入次数。这个实现就是ThreadLocal
AQS的作者对读锁重入做了一些优化
- 在AQS里面如果你是第一个拿到读锁的线程,那么AQS里面有个firstReader和firstReaderHoldCount记录据说可以优化一些性能。
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
- 在AQS里面如果你是最后一个读锁也有一个地方保存,这个在后面分析源代码的时候再看
写锁操作
通过读Doug Lee的锁代码发现这个人很喜欢用if里面的&&呀||呀来判断是否需要执行。这就导致代码可读性变差。对于逻辑思维不严谨的人来说很容易写错。对于想学习代码的人其实也不友好。虽然显得代码简洁,但是可读性不强
// 高16位读,低16位写
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
00000000 00000000 00000000 00000000
SHARED_UNIT:将1向左移动16位
00000000 00000001 00000000 00000000
MAX_COUNT:将上面的数减1,就变成下面这个
00000000 00000000 11111111 11111111
EXCLUSIVE_MASK:
00000000 00000000 11111111 11111111
*/
// 写锁开始 arg=1
public final void acquire(int arg) {
// tryAcquire返回true,表示已经抢到写锁
// tryAcquire返回false,表示没有抢到写锁,那么就要执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
// acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 的过程和互斥锁逻辑类似
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* 按位与运算,假设目前state的二进制是:
* 00000000 00001111 00000000 00000101 (有写锁)
* & 00000000 00000000 11111111 11111111
* = 00000000 00000000 00000000 00000101 (w != 0那这就能够得出目前正是持有写锁的个数)
* 下面是只有读锁的情况,由于读锁和写锁互斥,所以如果存在读锁,w=0,不能上写锁
* 00000000 00001111 00000000 00000000 (只有读锁)
* & 00000000 00000000 11111111 11111111
* = 00000000 00000000 00000000 00000000 (w == 0表示只有读锁)
*
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
// 返回true表示抢到锁
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
// 这个方法就是获取state低16位,然后计算当前锁的次数
int w = exclusiveCount(c);
// c!=0 表示已经有人拿到写锁,或者是有人已经拿到读锁
if (c != 0) {
// w==0 表示只有读锁,有读锁不能上写锁,所以直接放回
// w!= 0 表示只有写锁,所以判断是不是重入,如果不是重入,直接拜拜
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果上面没有返回,那么表示w!=0,并且当前线程是重入的,所以增加重入次数。然后判断重入次数是否超过最大重入次数
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 设置state,并且返回true,当前线程拿到写锁
setState(c + acquires);
return true;
}
// 能够走到这里表示c==0,既没有写锁,也没有读锁
// 非公平锁直接返回false,根据||的逻辑需要继续走compareAndSetState。所以非公平锁直接就开始抢锁。如果抢失败了,直接返回false
// 公平锁就要看一下是否有队列中排队的数据:hasQueuedPredecessors
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 能够下来表示抢锁成功!!!
setExclusiveOwnerThread(current);
return true;
}
// 公平锁:writerShouldBlock实现
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
// 如果 h == t 表示队列里面是空的,所以(h != t) = false, false后面就不执行了,整个返回false
// 如果 h != t 是true,那么继续执行后面的。s=h.next == null说现在也没有
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
// 非公平锁实现writerShouldBlock
final boolean writerShouldBlock() {
return false; // writers can always barge
}
写锁释放
释放锁按照理论来说就是将state自减,然后在队列中唤醒一个节点
// 1.释放锁
// 2.唤醒队列里面的节点
public final boolean release(int arg) {
// 尝试释放锁
if (tryRelease(arg)) {
// 释放锁成功。唤醒队列里面的节点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
// 如果当前线程不是持有锁的线程,直接抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 释放锁
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}
// 判断当前线程是不是持有锁的线程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// node是head节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 唤醒的head的下一个节点。这个说明head是一个third为null的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
读锁
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 读锁次数r
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果r=0表示当前线程是一个持有读锁的线程,所以直接将firstReader设置为当前线程,这就是前面说的优化点
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
// 第一个持有锁的人重入,直接增加firstReaderHoldCount的次数
else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 处理最后一个
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}