2025-06-24-ReentrantReadWriteLock

ReentrantReadWriteLock

有互斥锁ReentrantLock还要增加一个ReentrantReadWriteLock的原因是在大量读的情况下,ReentrantLock比较慢。读是不存在资源互斥的,所以增加读写锁提高效率

  • 读读操作是共享锁
  • 读写操作是互斥的
  • 写读操作是互斥的
  • 写写操作是互斥的
  • 当前线程拿到写锁后,可以重新获得读锁(写读可重入)
  • 当前线程拿到读锁后,不可以拿写锁(读写不可重入)

读写锁也是AQS实现,控制锁的字段也是state。不同的是将state的高16位做为读锁的标识,state的低16位作为写锁。

锁重入的方式

  • 写锁:跟互斥锁类似,直接通过state的低16直接累加就是写锁重入次数。由于采用低16位控制,所以重入次数减少
  • 读锁:由于全部线程都能对读锁从入,那么就出现一个问题,如果只在AQS里面记录读锁重入次数,读锁就不知道每个线程自己从入的次数。比如线程A重入2次,线程B重入2次,state高16记录着4,但是不知道这个4该有谁来释放。所以需要由每个线程自己记录读锁重入次数。这个实现就是ThreadLocal

AQS的作者对读锁重入做了一些优化

  • 在AQS里面如果你是第一个拿到读锁的线程,那么AQS里面有个firstReader和firstReaderHoldCount记录据说可以优化一些性能。
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
  • 在AQS里面如果你是最后一个读锁也有一个地方保存,这个在后面分析源代码的时候再看

写锁操作

通过读Doug Lee的锁代码发现这个人很喜欢用if里面的&&呀||呀来判断是否需要执行。这就导致代码可读性变差。对于逻辑思维不严谨的人来说很容易写错。对于想学习代码的人其实也不友好。虽然显得代码简洁,但是可读性不强

// 高16位读,低16位写
static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
00000000 00000000 00000000 00000000
SHARED_UNIT:将1向左移动16位
00000000 00000001 00000000 00000000
MAX_COUNT:将上面的数减1,就变成下面这个
00000000 00000000 11111111 11111111
EXCLUSIVE_MASK:
00000000 00000000 11111111 11111111
 */
// 写锁开始 arg=1
public final void acquire(int arg) {
    // tryAcquire返回true,表示已经抢到写锁
    // tryAcquire返回false,表示没有抢到写锁,那么就要执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
    // acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 的过程和互斥锁逻辑类似
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

/**
 * 按位与运算,假设目前state的二进制是:
 *   00000000 00001111 00000000 00000101  (有写锁)
 * & 00000000 00000000 11111111 11111111
 * = 00000000 00000000 00000000 00000101  (w != 0那这就能够得出目前正是持有写锁的个数)
 * 下面是只有读锁的情况,由于读锁和写锁互斥,所以如果存在读锁,w=0,不能上写锁
 *   00000000 00001111 00000000 00000000  (只有读锁)
 * & 00000000 00000000 11111111 11111111
 * = 00000000 00000000 00000000 00000000  (w == 0表示只有读锁)
 * 
 */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 返回true表示抢到锁
protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    // 这个方法就是获取state低16位,然后计算当前锁的次数
    int w = exclusiveCount(c);
    // c!=0 表示已经有人拿到写锁,或者是有人已经拿到读锁
    if (c != 0) {
        // w==0 表示只有读锁,有读锁不能上写锁,所以直接放回
        // w!= 0 表示只有写锁,所以判断是不是重入,如果不是重入,直接拜拜
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        // 如果上面没有返回,那么表示w!=0,并且当前线程是重入的,所以增加重入次数。然后判断重入次数是否超过最大重入次数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // 设置state,并且返回true,当前线程拿到写锁
        setState(c + acquires);
        return true;
    }
    // 能够走到这里表示c==0,既没有写锁,也没有读锁
    // 非公平锁直接返回false,根据||的逻辑需要继续走compareAndSetState。所以非公平锁直接就开始抢锁。如果抢失败了,直接返回false
    // 公平锁就要看一下是否有队列中排队的数据:hasQueuedPredecessors
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    // 能够下来表示抢锁成功!!!
    setExclusiveOwnerThread(current);
    return true;
}
// 公平锁:writerShouldBlock实现
public final boolean hasQueuedPredecessors() {
    
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    // 如果 h == t 表示队列里面是空的,所以(h != t) = false, false后面就不执行了,整个返回false
    // 如果 h != t 是true,那么继续执行后面的。s=h.next == null说现在也没有
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
// 非公平锁实现writerShouldBlock
final boolean writerShouldBlock() {
    return false; // writers can always barge
}

写锁释放

释放锁按照理论来说就是将state自减,然后在队列中唤醒一个节点

// 1.释放锁
// 2.唤醒队列里面的节点
public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 释放锁成功。唤醒队列里面的节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}


protected final boolean tryRelease(int releases) {
    // 如果当前线程不是持有锁的线程,直接抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    // 释放锁
    int nextc = getState() - releases;
    boolean free = exclusiveCount(nextc) == 0;
    if (free)
        setExclusiveOwnerThread(null);
    setState(nextc);
    return free;
}
// 判断当前线程是不是持有锁的线程
protected final boolean isHeldExclusively() {
    return getExclusiveOwnerThread() == Thread.currentThread();
}

// node是head节点
private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    // 唤醒的head的下一个节点。这个说明head是一个third为null的节点    
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

读锁

protected final int tryAcquireShared(int unused) {

    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;

    // 读锁次数r    
    int r = sharedCount(c);
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        // 如果r=0表示当前线程是一个持有读锁的线程,所以直接将firstReader设置为当前线程,这就是前面说的优化点
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } 
        // 第一个持有锁的人重入,直接增加firstReaderHoldCount的次数
        else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            // 处理最后一个
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);
}
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容