AQS-ReentrantReadWriteLock

一、介绍

前面我们讲解过ReentrantLock,它是一个独占锁,也就是说它是无区别的处理任何操作
但是在实际情景中,假如我们对一个共享元素同时进行读操作时,是不会出现线程安全问题的,由此我们引申出一个读写分离的锁

共存关系

二、结构

ReentrantReadWriteLock

public class ReentrantReadWriteLock
        implements ReadWriteLock, java.io.Serializable {
    private static final long serialVersionUID = -6992448646407690164L;
 
    private final ReentrantReadWriteLock.ReadLock readerLock;
  
    private final ReentrantReadWriteLock.WriteLock writerLock;
    
    final Sync sync;
}

ReadLock

    public static class ReadLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -5992448646407690164L;
        private final Sync sync;
//征用一个ReentrantReadWriteLock的Sync对象
        protected ReadLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }
//读锁使用的是共享锁
        public void lock() {
            sync.acquireShared(1);
        }

        public void lockInterruptibly() throws InterruptedException {
            sync.acquireSharedInterruptibly(1);
        }
//重写tryLock()方法
        public boolean tryLock() {
            return sync.tryReadLock();
        }

//重写unLock()方法
        public void unlock() {
            sync.releaseShared(1);
        }
//读锁不能存在条件队列
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }
...

WriteLock

    public static class WriteLock implements Lock, java.io.Serializable {
        private static final long serialVersionUID = -4992448646407690164L;
        private final Sync sync;
//征用一个ReentrantReadWriteLock的Sync对象
        protected WriteLock(ReentrantReadWriteLock lock) {
            sync = lock.sync;
        }

//写锁使用的独占锁
        public void lock() {
            sync.acquire(1);
        }

//重写tryLock()方法
        public boolean tryLock( ) {
            return sync.tryWriteLock();
        }
//重写unLock()方法
        public void unlock() {
            sync.release(1);
        }
//写锁可以拥有条件等待队列
        public Condition newCondition() {
            return sync.newCondition();
        }
...

Sync

    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 6317671515068378041L;
//这里将为int型的state值按高低16位一分为二
//高位为读锁
//低位为写锁
        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//右移16位获取高位值
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
//确保拿下的只有低位16位值
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
}

ReadLock.lock() 获取锁

    public void lock() {
        sync.acquireShared(1);
    }
    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

ReentrantReadWriteLock中重写的tryAcquireShared()尝试获取读锁
步骤:
1)通过同步状态低16位判断,如果存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败;否则执行步骤2)。
2)通过readerShouldBlock判断当前线程是否应该被阻塞,如果不应该阻塞则尝试CAS同步状态;否则执行3)。
3)第一次获取读锁失败,通过fullTryAcquireShared再次尝试获取读锁。

        protected final int tryAcquireShared(int unused) {
/**演练:
  1. 如果另一个线程持有写锁定,则失败。
  2. 否则,此线程有资格进入锁定wrt状态,因此请问是否由于队列策略而应阻塞。
     如果不是,请尝试按CASing状态授予许可并更新计数。
     请注意,此步骤不会检查可重入获取,这会推迟到完整版本
     避免在更典型的非重入情况下必须检查保留计数。
  3. 如果第2步失败,或者由于线程显然不合格,或者CAS失败或饱和,请使用完全重试循环链接到版本。
*/
//获取当前线程
            Thread current = Thread.currentThread();
            int c = getState();
//判断队列是否存在写锁,存在且当前线程不是拥有写锁的线程则尝试获取锁失败
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
//获取读状态值
            int r = sharedCount(c);
//判断当前线程是否应该阻塞
            if (!readerShouldBlock() &&
//读锁取的是高16位,所以必须小于MAX_COUNT,否则会有int溢出
                r < MAX_COUNT &&
//CAS操作加1
                compareAndSetState(c, c + SHARED_UNIT)) {
//获取锁成功
//如果r==0说明之前没有读锁
                if (r == 0) {
//第一个读锁节点为当前线程
                    firstReader = current;
//第一个读锁线程重入次数为1
                    firstReaderHoldCount = 1;
//第一个读锁线程等于当前线程
                } else if (firstReader == current) {
//重入次数加1
                    firstReaderHoldCount++;
//若当前获取读锁线程不是第一个读锁线程
                } else {
//读锁重入计数缓存器
                    HoldCounter rh = cachedHoldCounter;
//若缓存不属于当前线程
// 再从ThreadLocal中获取
// readHolds本身是一个ThreadLocal,里面存储的是HoldCounter
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
//rh的次数为0,则将它存入ThreadLocal
                    else if (rh.count == 0)
                        readHolds.set(rh);
//都不是则直接+1
                    rh.count++;
                }
//成功获取锁
                return 1;
            }
//继续尝试获取读锁
            return fullTryAcquireShared(current);
        }

ReadLock.unlock()

 public void unlock() {
        sync.releaseShared(1);
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

ReentrantReadWriteLock中重写的tryAcquireShared()尝试释放读锁
步骤:
1)如果当前线程是第一个读线程,则直接进行释放锁或者重入次数-1操作
2)如果当前线程不是第一个读线程,则找到对饮的重入次数,进行释放或者-1操作
3)自旋更改state值

        protected final boolean tryReleaseShared(int unused) {
//获取当前线程
            Thread current = Thread.currentThread();
//第一线程是当前线程的情况下
            if (firstReader == current) {
                // assert firstReaderHoldCount > 0;
//重入次数为1直接释放锁
                if (firstReaderHoldCount == 1)
                    firstReader = null;
//不为1重入次数-1
                else
                    firstReaderHoldCount--;
//第一线程不是当前线程
            } else {
//找到当前线程的缓存
                HoldCounter rh = cachedHoldCounter;
                if (rh == null || rh.tid != getThreadId(current))
                    rh = readHolds.get();
                int count = rh.count;
//移除缓存
                if (count <= 1) {
                    readHolds.remove();
                    if (count <= 0)
                        throw unmatchedUnlockException();
                }
//重入次数减1
                --rh.count;
            }
//自旋减1
            for (;;) {
//CAS高位减1
                int c = getState();
                int nextc = c - SHARED_UNIT;
                if (compareAndSetState(c, nextc))
                    // Releasing the read lock has no effect on readers,
                    // but it may allow waiting writers to proceed if
                    // both read and write locks are now free.
                    return nextc == 0;
            }
        }

WriteLock.lock()

    public void lock() {
        sync.acquire(1);
    }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

ReentrantReadWriteLock中重写的tryAcquire()尝试获取写锁
步骤:
1)判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行2);否则执行5)。
2)判断同步状态state的低16位(w)是否为0。如果w=0,说明其他线程获取了读锁,返回false;如果w!=0,说明其他线程获取了写锁,执行步骤3)。
3)判断获取了写锁是否是当前线程,若不是返回false,否则执行4);
4)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程以获取写锁,更新是线程安全的),返回true。
5)此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,否则返回false。如果需要阻塞则返回false。

        protected final boolean tryAcquire(int acquires) {
            /*
             * Walkthrough:
             * 1. If read count nonzero or write count nonzero
             *    and owner is a different thread, fail.
             * 2. If count would saturate, fail. (This can only
             *    happen if count is already nonzero.)
             * 3. Otherwise, this thread is eligible for lock if
             *    it is either a reentrant acquire or
             *    queue policy allows it. If so, update state
             *    and set owner.
             */
            Thread current = Thread.currentThread();
            int c = getState();
//独占锁的获取次数
            int w = exclusiveCount(c);
//状态不为0,说明有锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
//如果独占锁没有被获取过,说明存在读锁。
//或者当前线程不是获取写锁的线程
//获取锁失败
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
//如果溢出,抛错误
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                // Reentrant acquire
// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
                setState(c + acquires);
                return true;
            }
//c==0
//如果该线程需要阻塞,或者CAS失败,则获取锁失败
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
//获取锁成功,将当前线程设为占有锁的线程
            setExclusiveOwnerThread(current);
            return true;
        }

WriteLock.unlock()

    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantReadWriteLock中重写的tryRelease()尝试获取写锁

protected final boolean tryRelease(int releases) {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }

# Java并发编程--ReentrantReadWriteLock

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容