Java并发之ReentrantReadWriteLock

ReentrantReadWriteLock可重入读写锁,先从功能以及具体实现有一个简单的了解

一、两把锁

writerLock,readerLock分为读锁跟写锁,他们之间的共存与互斥进行一个简单罗列

写写互斥、读写互斥、写读可降级,读写不可升级、读读可并行

这是我的总结,另外竞争状态同样是使用state变量进行的,通过高16进行共享锁也就是读读线程之间的共享模式调度,低16位用于写锁的阻塞模式。
也就是说写锁重入次数是int最大取值范围的差不多一半,不过这也够用了。

二、同步器代码

        static final int SHARED_SHIFT   = 16;
        static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
        static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
        static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

        /** Returns the number of shared holds represented in count  */
        static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
        /** Returns the number of exclusive holds represented in count  */
        static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

这几个成员属性主要是做state值的切分与转换的,

SHARED_SHIFT =16 等于是将int的state 4字节 32位进行切分,16位
SHARED_UNIT 左移位16位也就是实际share模式(读锁时候state的)存储时候是以65536为底,
MAX_COUNT 读锁或者写锁最大的资源数,65535
EXCLUSIVE_MASK 字面意思独占模式的掩码,其实就是为了方便进行独占模式进行位&运算
sharedCount(int c) 通过无符号右移操作得出目前share资源的剩余数量
exclusiveCount(int c) 通过掩码的与运算,"抹掉"高位的读锁占用的变量值,不得不说你大爷还是你大爷,如果是我也就是加加减减那么用。

用于存储读锁的计数器在重入时候使用,代码如下

      static final class HoldCounter {
            int count = 0;
            // Use id, not reference, to avoid garbage retention
            final long tid = getThreadId(Thread.currentThread());
        }

读锁的重入计数是通过线程本地变量ThreadLocal里面存储了HoldCounter实现的

    static final class ThreadLocalHoldCounter
            extends ThreadLocal<HoldCounter> {
            public HoldCounter initialValue() {
                return new HoldCounter();
            }
        }

接下来对几个变量做一个简单说明就开始分析几个重要的方法

        当前线程对自身重入计数器的一个引用
        private transient ThreadLocalHoldCounter readHolds;
        sync对象内部缓存的最后一个获得读锁的线程计数器对象引用目的提升查询比较时候的性能
        private transient HoldCounter cachedHoldCounter;
        sync对象内部缓存的第一个获取读锁的线程
        private transient Thread firstReader = null;
        sync对象内部缓存的第一个获取读锁的线程重入次数
        private transient int firstReaderHoldCount;
       
        

两个抽象方法定义要求公平锁与非公平锁去实现在尝试获得锁时候是否进行当前线程的阻塞

        判断读锁是否要阻塞
        abstract boolean readerShouldBlock();
        判断写锁是否需要阻塞
        abstract boolean readerShouldBlock()
        公平锁实现
        static final class FairSync extends Sync {
        private static final long serialVersionUID = -2274990926593161451L;
        final boolean writerShouldBlock() {
            判断当前阻塞队列是否有等待节点
            return hasQueuedPredecessors();
        }
        final boolean readerShouldBlock() {
            判断当前阻塞队列是否有等待节点
            return hasQueuedPredecessors();
        }
    }
        非公平锁实现
        static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -8159625535654395037L;
        final boolean writerShouldBlock() {
            不阻塞尝试进行加锁
            return false; // writers can always barge
        }
        final boolean readerShouldBlock() {
            /* As a heuristic to avoid indefinite writer starvation,
             * block if the thread that momentarily appears to be head
             * of queue, if one exists, is a waiting writer.  This is
             * only a probabilistic effect since a new reader will not
             * block if there is a waiting writer behind other enabled
             * readers that have not yet drained from the queue.
             */
            判断当前是否有写锁,如果没有写锁就不阻塞
            return apparentlyFirstQueuedIsExclusive();
        }
    }

下面仔细分解一下读锁跟写锁的加锁与释放锁的相关代码
从命名就可以看出来哪些是用于写锁的

 protected final boolean tryRelease(int releases) {
            //因为写锁是独占模式,如果当前线程不是锁持有者则属于异常状态
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //因为支持重入所以不只是0跟1的关系
            int nextc = getState() - releases;
            boolean free = exclusiveCount(nextc) == 0;
            //当state减到0的时候进行竞争状态的释放,并修改当前写锁线程持有者为空
            if (free)
                setExclusiveOwnerThread(null);
            setState(nextc);
            return free;
        }
        这部分是写锁的获取,写锁的获取稍微有些复杂
        protected final boolean tryAcquire(int acquires) {
          
            Thread current = Thread.currentThread();
            取出当前state的数值
            int c = getState();
            通过与运算算出当前独占锁下数值
            int w = exclusiveCount(c);
            如果是state不为0有可能有线程获得了读锁,也有可能是有现成获得写锁
            if (c != 0) {
                // (Note: if c != 0 and w == 0 then shared count != 0)
                if (w == 0 || current != getExclusiveOwnerThread())
                    这表示两种情况下都失败
                    1.有读锁存在
                    2.有写锁,但是当前写锁持有线程不是当前线程
                    return false;
                if (w + exclusiveCount(acquires) > MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                state不为0并且当前线程又为锁持有线程
                锁重入增加state的值
                setState(c + acquires);
                return true;
            }
            如果当前state为0走入这个分支,判断是否需要等待,
            或者尝试修改竞争状态失败返回加锁失败否则设置当前
            线程为锁持有线程
            if (writerShouldBlock() ||
                !compareAndSetState(c, c + acquires))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }

下面是读锁的获取与释放逻辑相关的代码
个人认为读锁的获取相对值得自信分解,因为里面会涉及一个锁降级过程,并且还有一个令我想了挺长没get到李大爷的点的地方,就是一个死循环去获取读锁。前提条件是目前无写锁未被持有

 protected final int tryAcquireShared(int unused) {
           
            Thread current = Thread.currentThread();
            int c = getState();
            当前如果有写锁,并且写锁线程不是自己就进入读锁获取逻辑
            这里面是有含义的包含了锁降级的支持,
            假设一个线程获得了写锁,那么第一个条件是成立的,他会判断持有线程是不是当前线程,如果是当前线程那么还是会执行下面读锁获取代码。
            
            if (exclusiveCount(c) != 0 &&
                getExclusiveOwnerThread() != current)
                return -1;
            int r = sharedCount(c);
            if (!readerShouldBlock() &&
                r < MAX_COUNT &&
                compareAndSetState(c, c + SHARED_UNIT)) {
                if (r == 0) {
                    firstReader = current;
                    firstReaderHoldCount = 1;
                } else if (firstReader == current) {
                    firstReaderHoldCount++;
                } else {
                    HoldCounter rh = cachedHoldCounter;
                    if (rh == null || rh.tid != getThreadId(current))
                        cachedHoldCounter = rh = readHolds.get();
                    else if (rh.count == 0)
                        readHolds.set(rh);
                    rh.count++;
                }
                return 1;
            }
            return fullTryAcquireShared(current);
        }

这是那个困扰我地方,为什么跟上面方法一样还要再来这么个方法,截止到目前我的理解是这样的,之所以可以这么任性的死循环其实是因为读锁数量限制造成的,以往比如信号量,他虽然会通过共享模式提高并发线程数,但是并发量会被严格限制,读锁比较特殊在源码一开始就有一段话

This lock supports a maximum of 65535 recursive write locks
and 65535 read locks. Attempts to exceed these limits result in
*{@link Error} throws from locking methods.

正是因为这个近乎不限制并发读数量的做法才允许这么任性的死循环,因为这个并发获得读锁其实是很快的,所以不需要进入阻塞队列进行排队,但是这里里面有一个前提是当前没有被写锁阻塞,因为如果写锁阻塞大量的循环是无效的依然会浪费性能,还是要去排队的。代码如下,就是为了在一次获得


final int fullTryAcquireShared(Thread current) {
           /*
            * This code is in part redundant with that in
            * tryAcquireShared but is simpler overall by not
            * complicating tryAcquireShared with interactions between
            * retries and lazily reading hold counts.
            */
           HoldCounter rh = null;
           for (;;) {
               int c = getState();
               if (exclusiveCount(c) != 0) {
                   if (getExclusiveOwnerThread() != current)
                       return -1;
                   // else we hold the exclusive lock; blocking here
                   // would cause deadlock.
               } else if (readerShouldBlock()) {
                   // Make sure we're not acquiring read lock reentrantly
                   if (firstReader == current) {
                       // assert firstReaderHoldCount > 0;
                   } else {
                       if (rh == null) {
                           rh = cachedHoldCounter;
                           if (rh == null || rh.tid != getThreadId(current)) {
                               rh = readHolds.get();
                               if (rh.count == 0)
                                   readHolds.remove();
                           }
                       }
                       if (rh.count == 0)
                           return -1;
                   }
               }
               if (sharedCount(c) == MAX_COUNT)
                   throw new Error("Maximum lock count exceeded");
               if (compareAndSetState(c, c + SHARED_UNIT)) {
                   if (sharedCount(c) == 0) {
                       firstReader = current;
                       firstReaderHoldCount = 1;
                   } else if (firstReader == current) {
                       firstReaderHoldCount++;
                   } else {
                       if (rh == null)
                           rh = cachedHoldCounter;
                       if (rh == null || rh.tid != getThreadId(current))
                           rh = readHolds.get();
                       else if (rh.count == 0)
                           readHolds.set(rh);
                       rh.count++;
                       cachedHoldCounter = rh; // cache for release
                   }
                   return 1;
               }
           }
       }

另外还有两个尝试获得锁的方法一个是尝试获得写锁,一个是尝试获得读锁,基础逻辑为不进行是否应该阻塞当前线程判断,而是直接进行尝试,加锁成功则返回true否则返回false。这里面还是要特别注意读锁的获取依然采用的循环方式,还是因为不限制读锁数量的原因

 final boolean tryWriteLock() {
            Thread current = Thread.currentThread();
            int c = getState();
            if (c != 0) {
                int w = exclusiveCount(c);
                if (w == 0 || current != getExclusiveOwnerThread())
                    return false;
                if (w == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
            }
            if (!compareAndSetState(c, c + 1))
                return false;
            setExclusiveOwnerThread(current);
            return true;
        }
 final boolean tryReadLock() {
            Thread current = Thread.currentThread();
            for (;;) {
                当当前没有写锁时候就一直进行循环添加当前线程读锁进行尝试
                int c = getState();
                锁降级
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)
                    return false;
                int r = sharedCount(c);
                if (r == MAX_COUNT)
                    throw new Error("Maximum lock count exceeded");
                if (compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null || rh.tid != getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return true;
                }
            }
        }

最后一个注意点就是condition的支持,写锁支持condition,读锁不支持
可以通过代码就看的出来
读锁中获得conditon方法

        
        public Condition newCondition() {
            throw new UnsupportedOperationException();
        }

写锁中获得condition方法

      public Condition newCondition() {
            return sync.newCondition();
        }

其他方法就不进行说明了,都是一些统计或者判断的方法,避免没有重点。
最后做一个总结吧

1、可重入读写锁利用state变量标记竞争状态具体是高16位用于读锁分配计数低16位用于写锁重入计数
2、读锁的最大并发量是65535个读并发,写锁支持65535次重入。
3、写锁的获取与重入跟ReetrantLock区别不算大,都是通过state进行重入计数的
4、锁关系总结如下,读锁被持有则写锁获取失败、写锁被持有则其他线程获取读锁或者写锁失败,但是本线程可以支持获取读锁(锁降级)。读锁与读锁是可以并发获取的。
5、读锁的重入计数是通过ThreadLocal里面的HoldCounter进行存储的。为了提升对比效率做了一些对象的缓存firsterReader,firstReaderHoldCount、cachedHoldCounter

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351