超详细!AQS(AbstractQueuedSynchronizer)源码解析

Doug Lea 在 java.util.concurrent(JUC)中提供一套基础工具用于帮助开发者更加方便的开发并发程序,包括 LockSemaphoreCountDownLatchCyclicBarrier等等,而实现这些类的实现都借助了一个能够控制多个线程的并发访问的工具,那就是 AbstractQueuedSynchronizer(AQS)。

AQS 的数据结构形式如下图所示,其维护了一个 FIFO 的双向队列,尝试获取锁的线程都以节点的形式存在于队列中


aqs.png

源码分析

在对源码分析之前,首先需要了解一些基础的内容。

首先,锁分为两种,独占锁和共享锁,顾名思义,独占锁是指最多同时只能有一个线程获取到锁,而共享锁则允许最多 n 个线程同时获取到锁。根据在获取锁的过程中是否响应中断请求,可分为响应中断和不响应中断的请求。

其次,每个节点都有其对应的状态,初始状态为0。

// 等待超时或被中断,取消获取锁
static final int CANCELLED =  1;
// 说明该节点的后续被挂起了,当释放锁或取消时,需要唤醒后继节点
static final int SIGNAL    = -1;
// 表示节点处于Condition队列中
static final int CONDITION = -2;
// 用于共享式锁,表示下一次尝试获取共享锁时,需要无条件传播下去
static final int PROPAGATE = -3;

为了更好的理解源码,我会通过在源码的基础上增加注释的方式对源码进行解释(英文注释为源码本来的注释)。对于有方法调用的地方,可以直接跳到对应方法的讲解,按流程一步步理解,也可以通过注释了解整个方法的步骤,再细看之前调用的每个方法。

独占锁

不响应中断的独占锁获取

/**
* 不响应中断的独占锁获取入口
* 其中 tryAcuquire() 方法为获取锁的抽象方法,返回 true 表示获取锁成功,需要实现类根据获取锁的方式自己定义
*/
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 如果 tryAcquire() 获取锁失败,则通过 addWaiter() 加入到同步队列中,再通过 acquireQueued() 不断尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //由于不响应中断,如果检测到中断,acquireQueued() 会返回 true,进入方法体
        // 由于检测时使用了 Thread.interrupted(),中断标志被重置,需要恢复中断标志
        selfInterrupt();
}

/**
* 将线程信息包装成一个 Node 加入到同步队列的队尾中
*/
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 尝试通过一次 CAS 将节点加入到队尾
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 走到这里说明要么有竞争 CAS 失败,要么同步器队列还没初始化即 pred == null
    enq(node);
    return node;
}

private Node enq(final Node node) {
    // 无限循环 CAS 直到将节点加入到队尾中
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
* 因获取锁失败而加入同步队列中的线程在这里不断尝试获取锁
* 返回中断状态交由上层函数处理
*/
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            // 由于这是一个先进先出的队列,只有当自己的前驱是头结点(头结点表示已经获取到锁)时,才能轮到自己来争夺锁
            if (p == head && tryAcquire(arg)) {
                // tryAcquire() 返回 true 说明获取锁成功
                // 将 nod e节点设置为 head,此外 setHead() 是不需要 CAS 的,因为不会有竞争
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 获取失败后,查看是否需要被挂起,如果需要挂起,检查是否有中断信息
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    // 复习一下,SIGNAL 说明该节点的后续被挂起了,当释放锁或取消时,需要唤醒后继节点
    // 如果前驱节点已经是 SIGNAL 状态了 说明当前线程可以安心被挂起了,等待前驱来唤醒自己
    if (ws == Node.SIGNAL)
        return true;
    // ws > 0 说明前驱节点被取消了(CANCELLED == 1),需要跳过被取消的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 将前驱节点通过CAS改为 SIGNAL 状态,但最后还是会返回 false 
        // 如果在下一次循环中如果还是没拿到锁,则会进入该方法第一个判断,返回true,
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

private final boolean parkAndCheckInterrupt() {
    // 挂起线程
    LockSupport.park(this);
    return Thread.interrupted();
}

响应中断的独占锁获取

/**
* 方法入口
*/
public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

/**
* 和不响应中断的获取方法唯一不同的是,在检测到中断后是抛出中断异常而不是返回true,其他没有区别
*/
private void doAcquireInterruptibly(int arg)
    //...
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
    // ...
}

带超时的响应中断的独占锁获取

/**
* 方法入口
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}

/**
 * 基本上和之前的差不多,如果超时了就直接返回 false,挂起线程时也使用了带计时的 parkNanos
 */
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    // 如果超时了 返回false
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            // 注意这里 nanosTimeout > spinForTimeoutThreshold(默认1000纳秒)时才挂起,小于这个阈值时直接自旋,不再挂起
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

独占锁释放

/**
 * 和加锁一样,这里的 tryRelease() 也是抽象方法,需要子类自己实现
 * 实际工作就是唤醒后继节点而已,出队的操作也是在获取锁的时候由后继结点完成的
 */
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        // 如果 h.waitStatus == 0 ,说明不是 SIGNAL 状态,没有需要唤醒的节点,直接返回
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
        
    // 如果后继节点已经取消了,那么重新调整后继直到没有取消的为止
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 如果有未取消的后继,唤醒他
    if (s != null)
        LockSupport.unpark(s.thread);
}

共享锁

不响应中断的共享锁获取

在实现上,共享锁和独占锁在实现上的核心区别在于:

队列中的线程节点尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点。

/**
* 方法入口,tryAcquireShared为抽象方法:
* 返回小于0表示获取失败
* 等于0表示当前线程获取到锁,但后续线程获取不到,即不需要传播后续节点
* 大于0表示后续线程也能获取到,需要传播后续节点
*/
public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                // >=0表示获取锁成功
                if (r >= 0) {
                    // 这里和独占模型不同,除了设置头结点后还需要向后传播
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head;
    setHead(node);
    
    // 如果propagate > 0 或者 h.waitStatus < 0(PROPAGATE) 需要唤醒后继节点
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        // 如果后继结点是独占结点,就不唤醒了
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 队列里至少有2个节点,否则没有传播必要
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                // 和共享锁不同的是,这个方法可以在setHeadAndPropagate和releaseShared两个方法中被调用
                // 存在一个线程正获取完锁向后传播,另一个线程释放锁的情况,所以需要CAS控制
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            // ws == 0 表明是队列的最后一个节点,那么CAS为PROPAGATE,表明下一次tryShared时,需要传播
            // 如果失败说明有新后继节点将其改为了SIGNAL后挂起了,那么继续循环传播
            else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        // 如果head改变了,说明有新的排队的线程获取到了锁,再次检查
        if (h == head)                   // loop if head changed
            break;
    }
}

共享锁释放

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 释放成功后,往后传播
        doReleaseShared();
        return true;
    }
    return false;
}

Condition

Condition 提供了线程之间的通信机制,和 synchronize 中的 wait() 和 notify() 的作用是一样的,并且同一个锁可以有多个 condition。

Condition是一个接口,实际上 lock.newCondition() 返回的是 AQS 的内部类 ConditionObject。其核心的两个方法就是 await() 和 signal()。

当调用await()时,线程加入到等待队列中等待,和同步队列相似,也是一个FIFO的队列,但虽然用的数据结构相同,等待队列只用了单向的功能。其维护的数据结构图如下所示:

condition.png
/**
* 将当前线程信息包装加入等待队列中并挂起线程等待唤醒
* 由于能调用 await() 的线程一定是获取到锁的,所以下面的操作都不需要额外的CAS操作来处理线程竞争
*/
public final void await() throws InterruptedException {
     if (Thread.interrupted())
        throw new InterruptedException();
     // 加入到等待队列中
     Node node = addConditionWaiter();
    // 释放锁,fullyRelease() 调用的是独占锁的释放方法realse(state),即一次释放所有的重入锁, state记录了重入的次数
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 只要还没有被 signal() 给加入到同步队列,就挂起,除非被中断
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        // 如果被中断了 跳出循环,返回 0 或 THROW_IE 或 REINTERRUPT
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    // 到这里为止,不管怎么出的循环,都已经被加入同步队列了(要么被 signal() 加入,要么在中断检测方法中加入)
    // ----------------------------------------------------------
    // 别忘了 acquireQueued() 返回的获取锁的过程中是否被中断了
    // 如果在获取锁的过程中被中断了,并且之前的 interruptMode != THROW_IE,那么也视为在 signal() 之后被中断,设为REINTERRUPT
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    // 只有在 signal() 前中断的线程还会在等待队列中留有节点,才会满足这个条件
    if (node.nextWaiter != null)
        // 将状态不是CONDITION的节点从队列中删除
        unlinkCancelledWaiters(); 
    if (interruptMode != 0)
        // 抛出异常 或 重置中断标识位
        reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果最后一个等待队列被取消了,清除出去
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 这个方法就是从头到尾遍历一遍链表将状态不为 CONDITION 的节点删除等待队列,代码略
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

/**
* 如果没有被中断,返回 0
* 如果在被signal之前中断了,返回 THROW_IE,表示需要抛出异常
* 如果在signal之后中断了,返回 REINTERRUPT,表示不抛出,只恢复中断位
*/
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 如果CAS成功了,说明还没有被signal加入同步队列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 由于没有signal,这里需要加入同步队列,才能之后争夺锁
        enq(node);
        return true;
    }
    /*
     * If we lost out to a signal(), then we can't proceed
     * until it finishes its enq().  Cancelling during an
     * incomplete transfer is both rare and transient, so just
     * spin.
     */
     // 说明已经被signal了,防止还没被加入到同步队列的情况
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

/**
* 从等待队列中找到第一个线程唤醒
*/
public final void signal() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        // 出队
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

/**
* 清空等待队列, 将等待的节点按顺序加入到同步队列中
*/
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}


final boolean transferForSignal(Node node) {
    // 如果CAS失败,说明被cancell了
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    // 将节点加入到同步队列中,注意enq()会返回node的前驱节点p
    Node p = enq(node);
    int ws = p.waitStatus;
    // 因为此时节点还是挂起的,按照同步队列的结构,需要将前驱结点的状态改为SIGNAL
    // 如果前驱被取消了,或者CAS前驱状态为SIGNAL失败了,那么就唤醒线程,让其自己走去获取锁的步骤,虽然线程可能会被再次挂起,但这是无害的操作
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

文章中的图片来源于《Java并发编程的艺术》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352

推荐阅读更多精彩内容