StampLock 之二

乐观读锁的获取

乐观读锁只需要获取,不需要释放。只要没有线程获取写锁,那么就能获取到乐观读锁。当然,当使用由乐观读锁读取的共享数据时,需要先调用validate方法验证数据是否过期,只需要判断从获取数据到现在这段时间内是否有线程获取读锁即可。从此可以看出,乐观读锁可以很好的提高吞吐量,但是它的使用也非常不稳定,如果对数据的了解与控制不清晰,那么很容易出现脏读问题。

> line: 629
public long tryOptimisticRead() {
    long s;
    // 只要没有线程获取写锁,那么就能成功获取乐观读锁
    return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
}

乐观读锁逻辑比较简单,就是判断写锁是否被占用(判断state第8位的值是否为1),如果写锁被占用则返回0,否则返回stamp位。
state初始值为256(1 0000 0000)

  01000 0000(WBIT)
& 10000 0000(state)
==============
  00000 0000

 1 0000 0000(state)
 1 1000 0000(SBITS)
 ==============
 1 0000 0000

检测乐观读锁

public boolean validate(long stamp) {

  // 通过UNSafe插入内存屏障,禁止重排序
  U.loadFence();
  return (stamp & SBITS) == (state & SBITS);
}
  • 返回true: 表示期间没有写锁,不关心读锁。

  • 返回false: 表示期间有写锁发生

  • SBITS为-128,用二进制表示是:1111 1111 1111 1000 0000

x xxxx xxxx(stamp)
1 1000 0000

1 yyyy yyyy(state)
1 1000 0000

SBITS后7位都是0,也就是不关心读锁,我们只关心stamp位和写位。

当我们获取乐观读时,如果此时已经有了写锁,那么返回stamp值为0,此时进行验证肯定为false。

获取普通读锁

public long readLock() {
  long s = state, next;
  // whead == wtail时,队列为空,表示当前没有线程在排队
  return ((whead == wtail && (s & ABITS) < RFULL &&
           U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
           // acquireRead的第一个参数为false,标识不处理线程中断
          next : acquireRead(false, 0L));
}

当获取读锁成功时,会将state值加1。当条件不满足时(队列不为空,CAS失败,或者读锁的个数已经大于等于126),都会进入到acquireRead()方法,这个方法主要分为两个大的for循环,代码比较长我就不贴出来了。

它的主要逻辑如下:

  1. 如果当前队列没有排队的线程,该线程是第一个准备排队的元素,就有很大机会获取到锁,所以会先自旋尝试获取到锁(自旋次数为64),如果获取到了将state值加1,如果读锁次数已经超出了126,则使用readerOverflow记录超出的读锁数目。如果未获取到锁,则进入第2步
 private long tryIncReaderOverflow(long s) {
 
  // 如果读锁的记录数等于126,
  if ((s & ABITS) == RFULL) {
      // 将state的值加1
      // CAS之后state的后七位的值是127,state的整个值是383(1 0111 1111)。
      if (U.compareAndSwapLong(this, STATE, s, s | RBITS)) {
          // 将readOverflow的值加1
          ++readerOverflow;
          state = s;
          return s;
      }
  }
  else if ((LockSupport.nextSecondarySeed() &
            OVERFLOW_YIELD_RATE) == 0)
      Thread.yield();
  return 0L;
}
  1. 初始化入队排队节点,形成链表关系。和AQS一样的是head节点被当成哨兵节点或者正持有锁正在运行的线程(虚拟)。所以head节点的thread为null,mode的值为WMODE。这样会形成
    head --> node <-- tail这样的节点关系

  2. 如果又有新的线程获取读锁失败,会将新的线程加入到cowait栈中

  3. 对于不是第一个获取读锁的线程,它会先加入到cowait这个栈中,然后再通过CAS获取锁,如果获取失败就被阻塞,等待被唤醒或者由于超时中断被取消。

  4. 会对第一个获取读锁的线程进行优待,因为它前面没有在排队的线程,所以这一层还是在自旋获取锁,只是自旋次数再增加,首先会自旋1024次获得锁,如果还未获取到,在自旋2048次。
    如果还未等到锁释放,就阻塞当前线程,等待被唤醒,直到获得锁或者超时中断被取消。

第1-4步属于一个大的循环,第5步骤属于另外一个大的循环。

同时当获取读锁线程被唤醒获取到锁后,它同时也会唤醒挂在它身上的cowait栈中的线程。

从分析中可以看到StampedLock中通过大量的自旋操作可以一定程度避免线程阻塞,只要线程执行操作够快,释放锁比较及时,可以说几乎不会存在阻塞。

释放读锁

释放读锁的代码比较简单,主要操作如下

首先判断stamp是否合法,如果不合法则抛出IllegalMonitorStateException异常
如果读锁个数小于126,则通过CAS将state值减去1,如果释放的是最后一个读锁,则需要唤醒队列中的节点。
如果读锁个数已经溢出了,则将readerOverflow减去1

public void unlockRead(long stamp) {
  long s, m; WNode h;
  // 自旋
  for (;;) {
      // 判断stamp是否合法
      if (((s = state) & SBITS) != (stamp & SBITS) ||
          (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
          throw new IllegalMonitorStateException();
      // 读锁个数小于126
      if (m < RFULL) {
          if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
              // 释放最后一个读锁时,需要唤醒下一个节点
              if (m == RUNIT && (h = whead) != null && h.status != 0)
                  release(h);
              break;
          }
      }
      // 读锁个数饱和溢出,需要减少readerOverflow
      else if (tryDecReaderOverflow(s) != 0L)
          break;
  }
}

获取写锁

public long writeLock() {
  long s, next;
  return ((((s = state) & ABITS) == 0L &&
           U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
          next : acquireWrite(false, 0L));
}

如果state中的第8位等于0并且CAS设置state值成功,则获取锁成功,否则进入到acquireWrite方法。

acquireWrite的逻辑也分为两部分,分别是两个for循环。

第一个for循环逻辑如下

 for (int spins = -1;;) { // spin while enqueuing
    long m, s, ns;

    // 如果当前写锁标志位仍然为0,即没有其他线程获取到写锁
    if ((m = (s = state) & ABITS) == 0L) {
        // CAS将state的值加128,实质是将写锁状态位加1
        if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
            return ns;
    }
    // 其他线程获取到了写锁,则设定自旋次数为64次
    else if (spins < 0)
        spins = (m == WBIT && wtail == whead) ? SPINS : 0;
    else if (spins > 0) {
        // 随机减少自旋次数
        if (LockSupport.nextSecondarySeed() >= 0)
            --spins;
    }
    // 自旋仍未获取到写锁则执行下面的逻辑
    // 初始化CLH队列(主要是whead,wtail节点)
    // p指向了wtail节点
    else if ((p = wtail) == null) {
        WNode hd = new WNode(WMODE, null);
        if (U.compareAndSwapObject(this, WHEAD, null, hd))
            wtail = hd;
    }
    // 初始化节点
    else if (node == null)
        node = new WNode(WMODE, p);

    // 最终形成 whead --> node <-- wtail
    else if (node.prev != p)
        node.prev = p;
    else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
        p.next = node;
        // 形成链表关系之后自旋结束
        break;
    }
}

第二个for循环代码也比较长,我就不贴出来了。它的核心逻辑如下

  • 如果仍然只有当前线程在等待锁,则会先自旋1024次去获取写锁,如果获取失败,则在自旋2048次再次去获取。如果都获取失败,则进入第二步。

  • 阻塞当前线程等待被唤醒

释放写锁

public void unlockWrite(long stamp) {
  WNode h;
  // 验证stamp是否合法
  if (state != stamp || (stamp & WBIT) == 0L)
    throw new IllegalMonitorStateException();

  // 释放写锁,stamp位(版本号)加1
  // stamp+WBIT会将state的第8位置为0,就相当于释放了写锁
  state = (stamp += WBIT) == 0L ? ORIGIN : stamp;

  // 如果头结点不为空,并且状态不为0,,则调用release方法唤醒下一个节点
  if ((h = whead) != null && h.status != 0)
      release(h);
}
private void release(WNode h) {
  if (h != null) {
    WNode q; Thread w;
    U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
    // 如果头结点的下一个节点为空或者其状态为已取消
    if ((q = h.next) == null || q.status == CANCELLED) {
        // 从尾结点向前遍历找到可用节点
        for (WNode t = wtail; t != null && t != h; t = t.prev)
            if (t.status <= 0)
                q = t;
    }
    // 唤醒q节点所在的线程
    if (q != null && (w = q.thread) != null)
        U.unpark(w);
  }
}

释放写锁过程总结如下

  • 将state的第8位置为0(释放写锁),并将stamp位(version)加1
  • 唤醒下一个节点

读写锁之间的转换

除了提供了乐观读锁外,StampedLock相比ReentrantReadWriteLock还提供了读写锁之间的转换,而ReentrantReadWriteLock只提供了写锁到读锁的锁降级特性。

尝试转换为写锁

此时有三种情况,乐观读锁转换为读锁,悲观读锁转换为读锁,写锁转换为写锁。

  • 乐观读锁的stamp & ABITS一定是0,可通过乐观读锁的获取方法发现这个特点。当尝试转换时,会通过tryConvertToWriteLock方法的第一个if语句进行判断能否成功转换。其中的(a!=0)的作用是:现在state&ABITS == 0,即没有线程获取了写锁或悲观读锁,但是提供的stamp不符合这个结论,所以stamp检验失败。
  • 虽然不会有人想要在获取写锁时依然调用此方法,但是为了预防这种情况,必须要是当前获取了写锁的线程调用此方法才能成功。
  • 当悲观读锁想要转换为写锁时,必须此时只有自己一个线程获取了悲观读锁才可以,否则此线程在修改数据时,其他的读线程是无法感知到数据的更新的。
>line: 739
public long tryConvertToWriteLock(long stamp) {
    long a = stamp & ABITS, m, s, next;
    // 验证stamp
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        // 当前没有悲观读锁或写锁
        if ((m = s & ABITS) == 0L) {
            // 但是提供的stamp记录不相符,失败
            if (a != 0L)
                break;
            // 尝试获取写锁,成功则返回
            if ((next = tryWriteLock(s)) != 0L)
                return next;
        }
        // 如果当前线程是已经获取了写锁的线程
        else if (m == WBIT) {
            if (a != m)
                break;
            return stamp;
        }
        // 如果此时只有自己一个读线程获取了悲观读锁
        else if (m == RUNIT && a != 0L) {
            if (casState(s, next = s - RUNIT + WBIT)) {
                VarHandle.storeStoreFence();
                return next;
            }
        }
        else
            break;
    }
    return 0L;
}

尝试转换为悲观读锁

尝试转换为悲观读锁依然有三种情况。

  • 写锁转换为悲观读锁。写锁有资格转换为读锁,这并不会引发问题,只要提供的stamp符合当前的写锁state即可(注意:写锁每次释放时都会更新版本,所以每个获取写锁的state都不会相同)。
  • 乐观读锁转换为悲观读锁。只要当前没有其他线程获取了写锁,那么就能安全转换。
  • 悲观读锁转换为悲观读锁。此种情况和上面的写锁转写锁一样,已经是读锁了那就无需转换,只需验证stamp合法性然后直接返回即可。
public long tryConvertToReadLock(long stamp) {
    long a, s, next; WNode h;
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            STATE.setVolatile(this, next = unlockWriteState(s) + RUNIT);
            if ((h = whead) != null && h.status != 0)
                release(h);
            return next;
        }
        else if (a == 0L) {
            // optimistic read stamp
            if ((s & ABITS) < RFULL) {
                if (casState(s, next = s + RUNIT))
                    return next;
            }
            else if ((next = tryIncReaderOverflow(s)) != 0L)
                return next;
        }
        else {
            // already a read stamp
            if ((s & ABITS) == 0L)
                break;
            return stamp;
        }
    }
    return 0L;
}

尝试转换为乐观读锁。

  • 写锁转换为乐观读锁。只需要验证stamp合法性即可成功转换。
  • 乐观读锁转换为乐观读锁。直接返回。
  • 悲观读锁转换为乐观读锁。第一个if语句(m = s & ABITS) == 0L验证stamp的合法性,此时stamp只会是悲观读锁返回的,但是s & ABITS==0表示当前没有线程获取了悲观读锁,所以stamp不合法。后两个if语句针对当前reader是否溢出作出不同的举措。
public long tryConvertToOptimisticRead(long stamp) {
    long a, m, s, next; WNode h;
    VarHandle.acquireFence();
    while (((s = state) & SBITS) == (stamp & SBITS)) {
        if ((a = stamp & ABITS) >= WBIT) {
            // write stamp
            if (s != stamp)
                break;
            return unlockWriteInternal(s);
        }
        else if (a == 0L)
            // already an optimistic read stamp
            return stamp;
        else if ((m = s & ABITS) == 0L) // invalid read stamp
            break;
        else if (m < RFULL) {
            if (casState(s, next = s - RUNIT)) {
                if (m == RUNIT && (h = whead) != null && h.status != 0)
                    release(h);
                return next & SBITS;
            }
        }
        else if ((next = tryDecReaderOverflow(s)) != 0L)
            return next & SBITS;
    }
    return 0L;
}

总结以及注意

  • StampedLock没有使用AQS,而是依靠自己实现的同步状态(long state占64位)和变异的CLH队列
  • StampedLock使用state的低7位标识读锁数量(超出126的使用readerOverflow字段记录),第8位标识写锁,高56位记录锁的版本,每次释放/获取写锁版本号都会加1
  • StampedLock中读锁和读锁不阻塞,读锁写锁相互阻塞,写锁和写锁也相互阻塞
  • StampedLock的连续多个读锁线程,只有第一个是在CLH队列中,后面的会挂在第一个线程的cowait栈中
  • StampedLock唤醒第一个读线程后,读线程会唤醒它cowait栈的所有读线程(acquireRead()方法中)
  • StampedLock不支持公平锁,也不支持Condition
  • StampedLock支持锁的降级和锁的升级
  • StampedLock中的乐观读操作是无锁的
  • StampedLock中使用了大量的自旋+CAS操作,适合持有锁时间比较短的任务,持有锁时间长的话不仅会浪- 费CPU而且仍然会阻塞自己。
  • 线程阻塞在 StampedLock 的 readLock() 或者 writeLock() 上时,此时调用该阻塞线程的 interrupt() 方法,会导致 CPU 飙升。所以,使用 StampedLock 一定不要调用中断操作,如果需要支持中断功能,一定使用可中断的悲观读锁 readLockInterruptibly() 和写锁 writeLockInterruptibly()
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,544评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,430评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,764评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,193评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,216评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,182评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,063评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,917评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,329评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,543评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,722评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,425评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,019评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,671评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,825评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,729评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,614评论 2 353

推荐阅读更多精彩内容