再看 AQS


title: 再看 AQS
date: 2021/06/09 09:29


ReentrantLock$NonfairSync

知识准备

AQS:AQS的全称是AbstractQueuedSynchronizer,这个类也是在java.util.concurrent.locks下面,提供了一个FIFO的队列,可以用于构建锁的基础框架,内部通过原子变量state来表示锁的状态,当state大于0的时候表示锁被占用,如果state等于0时表示没有占用锁,ReentrantLock是一个重入锁,表现在state上,如果持有锁的线程重复获取锁时,它会将state状态进行递增,也就是获得一个信号量,当释放锁时,同时也是释放了信号量,信号量跟随减少,如果上一个线程还没有完成任务,则会进行入队等待操作。

AQS 主要字段:

/**
 * 头节点指针,通过setHead进行修改
 */
private transient volatile Node head;

/**
 * 队列的尾指针
 */
private transient volatile Node tail;

/**
 * 同步器状态
 */
private volatile int state;

AQS需要子类实现的方法

AQS是提供了并发的框架,它内部提供一种机制,它是基于模板方法的实现,整个类中没有任何一个abstract的抽象方法,取而代之的是,需要子类去实现的那些方法通过一个方法体抛出UnsupportedOperationException异常来让子类知道,告知如果没有实现模板的方法,则直接抛出异常。

方法名 方法描述
tryAcquire 以独占模式尝试获取锁,独占模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒
tryRelease 尝试独占模式下释放状态
tryAcquireShared 尝试在共享模式获得锁,共享模式下调用acquire,尝试去设置state的值,如果设置成功则返回,如果设置失败则将当前线程加入到等待队列,直到其他线程唤醒
tryReleaseShared 尝试共享模式下释放状态
isHeldExclusively 是否是独占模式,表示是否被当前线程占用

AQS是基于FIFO队列实现的,那么队列的Node节点又是存放的什么呢?

Node结点:作为获取锁失败线程的包装类, 组合了Thread引用, 实现为FIFO双向队列。 下图为Node结点的属性描述

字段名 类型 默认值 描述
SHARED Node new Node() 一个标识,指示节点使用共享模式等待
EXCLUSIVE Nodel Null 一个标识,指示节点使用独占模式等待
CANCELLED int 1 节点因超时或被中断而取消时设置状态为取消状态
SIGNAL int -1 当前节点的后节点被park,当前节点释放时,必须调用unpark通知后面节点,当后面节点竞争时,会将前面节点更新为SIGNAL
CONDITION int -2 标识当前节点已经处于等待中,通过条件进行等待的状态
PROPAGATE int -3 共享模式下释放节点时设置的状态,被标记为当前状态是表示无限传播下去
0 int 不属于上面的任何一种状态
waitStatus int 0 等待状态,默认初始化为0,表示正常同步等待,
pre Node Null 队列中上一个节点
next Node Null 队列中下一个节点
thread Thread Null 当前Node操作的线程
nextWaiter Node Null 指向下一个处于阻塞的节点

通过上面的内容我们可以看到waitStatus其实是有5个状态的,虽然这里面0并不是什么字段,但是他是waitStatus状态的一种,表示不是任何一种类型的字段。

加锁过程

ReentrantLock$NonfairSync#lock() 加锁流程

💡为什么是非公平锁?

非公平锁原因

与公平锁区别

与公平锁区别

💡为什么 head 节点是一个 Dummy(哑元,不关联线程)节点

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      // 尝试获取锁,如果获取失败会走这段逻辑
      // 此处关注 addWaiter() 方法,第一个参数是新节点的模式,独占还是共享;
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
addWaiter 方法
enq 方法

注:此时哑元对象的 waitStatus 值为 0。

addWaiter() 方法执行后的结构

💡Node 的 waitStatus 变化

public final void acquire(int arg) {
  if (!tryAcquire(arg) &&
      // 尝试获取锁,如果获取失败会走这段逻辑
      // 此处关注 acquireQueued() 方法
      acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
    selfInterrupt();
}
acquireQueued 方法
image-20210609133658012

此时锁的状态:

image-20210609133755157

💡非公平锁的两次尝试获取锁?

  1. lock() 时通过 cas 尝试获取锁(所有加锁的线程都会尝试)
  2. acquireQueued() 时,判断前序节点是否是 head,如果是的话,则尝试获取。(只有前序节点是 head 时才会尝试)

💡为什么 Node 中的字段都使用 volatile 来修饰?

因为要通过 cas 来修改他的值,cas 即比较并交换,如果比较的是 cpu 缓存中的值那毫无意义,所以必须要保证比较的这个变量要具有可见性(即,用的时候从主存拿,写完刷入主存)

解锁过程

// Sync 继承自 AQS
static final class NonfairSync extends Sync {
    // 解锁实现
    public void unlock() {
        sync.release(1);
    }

    // AQS 继承过来的方法, 方便阅读, 放在此处
    public final boolean release(int arg) {
        // 尝试释放锁, 进入 (一)
        if (tryRelease(arg)) {
            // 队列头节点 unpark
            Node h = head;
            if (
                    // 队列不为 null
                    h != null &&
                            // waitStatus == Node.SIGNAL 才需要 unpark
                            h.waitStatus != 0
                    ) {
                unparkSuccessor(h);
            }
            return true;
        }
        return false;
    }

    // (一) Sync 继承过来的方法, 方便阅读, 放在此处
    protected final boolean tryRelease(int releases) {
        // state-- 针对锁重入
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        // 支持锁重入, 只有 state 减为 0, 才释放成功
        if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
        }
        setState(c);
        return free;
    }

    // (二) AQS 继承过来的方法, 方便阅读, 放在此处
    private void unparkSuccessor(Node node) {
        // 如果状态为 Node.SIGNAL 尝试重置状态为 0
        // 不成功也可以
        int ws = node.waitStatus;
        if (ws < 0) {
            compareAndSetWaitStatus(node, ws, 0);
        }
        // unpark AQS 中等待的线程, 进入 (二)
        // 找到需要 unpark 的节点, 但本节点从 AQS 队列中脱离, 是由唤醒节点完成的
        Node s = node.next;
        // 不考虑已取消的节点, 从 AQS 队列从后至前找到队列最前面需要 unpark 的节点
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0) s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
}

此部分源码分为两部分:

  1. 释放锁,如果 state 释放后值为 0,则返回 true。
  2. 如果返回 true,表示锁已经被完全释放了,则将 head 节点的 ws 从 -1 改为 0,唤醒 head.next 线程,让他重新去争抢锁(下图红色框部分)

此时锁状态:

image
image

但是由于不公平性,锁也有可能会被其他线程抢占,如果被抢占的话则会再次将前序节点 ws 改为 -1,然后中断线程,变为下面这种状态:

image

ReentrantReadWriteLock$NonfairSync

知识准备

读写锁与 ReentrantReadWriteLock#Sync 的关系

image

ReentrantReadWriteLock#Sync 继承于AQS实现读写锁与实现普通互斥锁的主要区别在于需要分别记录读锁状态及写锁状态,并且等待队列中需要区别处理两种加锁操作。
Sync使用state变量同时记录读锁与写锁状态,将int类型的state变量分为高16位与第16位,高16位记录读锁状态,低16位记录写锁状态,如下图所示:

image

写锁加锁 & 释放过程

写锁(独占锁)这部分与 ReentrantLock 大同小异,简单看下 tryAcquire 方法:

protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0) c!=0 w=0 代表有读锁 || 非重入 -> 返回 false;中断当前线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)   // 当重入超过最大限制(65535 2^16)则报错
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire 重入
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||  // 如果需要阻塞,公平锁的实现会检查队列中是否有其他线程,非公平锁直接返回 false 不需要阻塞
        !compareAndSetState(c, c + acquires))   // 或者 cas 失败,则返回 false;之后就和 ReentrantLock 一样了,加入队列然后 for(;;) 中断
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

读锁加锁过程

AQS.acquireShared(arg)

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

ReentrantReadWriteLock$Sync.tryAcquireShared(arg)

protected final int tryAcquireShared(int unused) {
    /*1. 如果另一个线程持有写锁,则失败。 2. 否则,该线程有资格获得锁写入状态,因此询问它是否应该因为队列策略而阻塞。如果没有,请尝试通过 CASing 状态和更新计数来授予。请注意,步骤不检查可重入获取,它被推迟到完整版本以避免在更典型的非可重入情况下检查保持计数。 3. 如果第 2 步由于线程显然不符合条件或 CAS 失败或计数饱和而失败,则链接到具有完整重试循环的版本。
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();
    int c = getState();
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)   // 如果有写锁,并且不是当前线程,则获取读锁失败,返回 -1 进入后续(排队)流程
        return -1;
    int r = sharedCount(c); // 获取写锁的 state 值
    if (!readerShouldBlock() && // 当前获取读锁线程是否需要阻塞;公平锁会查看队列是否具有节点,非公平锁会查看队列中第一个等待元素是否是独占锁,为了避免饥饿
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {   // 如果不需要阻塞 && 小于 65535 && cas 成功,下面的代码和读锁重入相关
        if (r == 0) {
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) {
            firstReaderHoldCount++;
        } else {
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        return 1;
    }
    return fullTryAcquireShared(current);   // 这个是完整的获取共享锁的代码,相较于这部分代码基本上只多了 for(;;) 重试 cas 操作
}

AQS.doAcquireShared(arg)

image-20210611103946920

注:红框部分同样属于释放锁流程。

此时锁状态:

image

此时有线程获取写锁的状态:

image

💡如何体现锁降级?

image-20210611105002384

💡readerShouldBlock() 所要解决的饥饿何时会发生?

写锁在队列里排队,但是一直有线程来获取读锁

💡读锁的重入是怎么做的呢?

if (!readerShouldBlock() && // 当前获取读锁线程是否需要阻塞;公平锁会查看队列是否具有节点,非公平锁会查看队列中第一个等待元素是否是独占锁,为了避免饥饿
    r < MAX_COUNT &&
    compareAndSetState(c, c + SHARED_UNIT)) {   // 如果不需要阻塞 && 小于 65535 && cas 成功,下面的代码和读锁重入相关
    if (r == 0) {
        firstReader = current;
        firstReaderHoldCount = 1;
    } else if (firstReader == current) {
        firstReaderHoldCount++;
    } else {
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            cachedHoldCounter = rh = readHolds.get();
        else if (rh.count == 0)
            readHolds.set(rh);
        rh.count++;
    }
    return 1;
}

firstReader指向在无锁状态下第一个获取读锁的线程,firstReaderHoldCount记录第一个获取读锁的线程持有当前锁的计数(主要用于重入)。

image-20210611112532488

读锁释放锁过程

AQS.releaseShared(arg)

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {    // 如果读锁全都释放
        doReleaseShared();  // 唤醒后续节点
        return true;
    }
    return false;
}

ReentrantReadWriteLock$Sync.tryReleaseShared(arg)

protected final boolean tryReleaseShared(int unused) {
    Thread current = Thread.currentThread();
    if (firstReader == current) {   // 如果第一个 read 是当前线程,并且重入计数为 1,则将 firstReader 置为 null,否则重入计数--
        // assert firstReaderHoldCount > 0;
        if (firstReaderHoldCount == 1)
            firstReader = null;
        else
            firstReaderHoldCount--;
    } else {    // 操作 HoldCounter 进行重入计数的修改
        HoldCounter rh = cachedHoldCounter;
        if (rh == null || rh.tid != getThreadId(current))
            rh = readHolds.get();
        int count = rh.count;
        if (count <= 1) {
            readHolds.remove(); // 如果计数为 0,则从 ThreadLocal 移除
            if (count <= 0)
                throw unmatchedUnlockException();
        }
        --rh.count;
    }
    for (;;) {  // 自旋 + cas,修改 state 值
        int c = getState();
        int nextc = c - SHARED_UNIT;
        if (compareAndSetState(c, nextc))
            // Releasing the read lock has no effect on readers,
            // but it may allow waiting writers to proceed if
            // both read and write locks are now free.
            return nextc == 0;  // 如果 == 0 表示读锁完全释放
    }
}

此时状态为:

image

AQS.doReleaseShared()

private void doReleaseShared() {
    /*
     * Ensure that a release propagates, even if there are other
     * in-progress acquires/releases.  This proceeds in the usual
     * way of trying to unparkSuccessor of head if it needs
     * signal. But if it does not, status is set to PROPAGATE to
     * ensure that upon release, propagation continues.
     * Additionally, we must loop in case a new node is added
     * while we are doing this. Also, unlike other uses of
     * unparkSuccessor, we need to know if CAS to reset status
     * fails, if so rechecking.
     */
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))    // cas 修改前序节点的 ws,目的是避免多个获取读锁的线程来释放锁,导致重复 unpark
                    continue;            // loop to recheck cases
                unparkSuccessor(h); // 如果后续节点是 SHARED 的,则会修改 head 节点,如果他的后续节点也是 SHARED,则会再次调用当前方法
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))   // 这个和信号量的一个 bug 相关 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6801020
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed,可能是因为上面 unpark 修改了 head 节点,不知道
            break;
    }
}

唤醒之后则会继续走下面红框的流程:

image-20210611103946920
image

Semaphore

他是采用共享锁实现,节点类型为 SHARED

初始化时定义了 state 的值,调用 acquire() 方法对 state - 1,判断这个值是否小于 0,如果小于 0 则进入 FIFO 队列,否则 cas 修改 state 值(失败自旋)。

调用 release() 方法时对 state + 1,并唤醒后续节点

CountDownLatch

他是采用共享锁实现,节点类型为 SHARED

初始化时定义了 state 的值,await() 方法是尝试获取锁(只有 state 值为 0 才会获取成功),countdown()方法是释放锁,只有所有都释放完成才会返回 true 唤醒队列中的线程。

public static void main(String[] args) throws InterruptedException {
    CountDownLatch latch = new CountDownLatch(1);

    new Thread(() -> {
        try {
            System.out.println("t1 await");
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("t1");
    }).start();

    new Thread(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        latch.countDown();
        System.out.println("t2 countdown");
    }).start();

    System.out.println("main await");
    latch.await();
    System.out.println("main");
}

>> 
t1 await
main await
t2 countdown
main
t1
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,125评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,293评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,054评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,077评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,096评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,062评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,988评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,817评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,266评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,486评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,646评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,375评论 5 342
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,974评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,621评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,642评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,538评论 2 352

推荐阅读更多精彩内容