AbstractQueuedSynchronizer 队列同步器源码分析

AbstractQueuedSynchronizer 队列同步器(AQS)

队列同步器 (AQS), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 FIFO 的队列完成资源获取的排队工作。(摘自《Java并发编程的艺术》)

我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。

变量定义

private transient volatile Node head;

head 同步队列头节点

private transient volatile Node tail;

tail 同步队列尾节点

private volatile int state;

state 同步状态值

Node - 同步队列节点定义

volatile int waitStatus;

waitStatus 节点的等待状态,可取值如下 :

  • 0 : 初始状态
  • -1 : SIGNAL 处于该状态的节点,说明其后置节点处于等待状态; 若当前节点释放了锁可唤醒后置节点
  • -2 : CONDITION 该状态与 Condition 操作有关后续在说明
  • -3 : PROPAGATE 该状态与共享式获取同步状态操作有关后续在说明
  • 1 : CANCELLED 处于该状态的节点会取消等待,从队列中移除
volatile Node prev;

prev 指向当前节点的前置节点

volatile Node next;

next 指向当前节点的后置节点

volatile Thread thread;

thread 节点对应的线程也是指当前获取锁失败的线程

Node nextWaiter;

acquire()

独占模式下获取同步状态, 既是当前只允许一个线程获取到同步状态

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

从 acquire 方法中我们可以大概猜测下,获取锁的过程如下:

  • tryAcquire 尝试获取同步状态, 具体如何判定获取到同步状态由子类实现
  • 当获取同步状态失败时,执行 addWaiter 创建独占模式下的 Node 并将其添加到同步队列尾部
  • 加入同步队列之后,再次尝试获取同步状态,当达到某种条件的时候将当前线程挂起等待唤醒

下面具体看下各个阶段如何实现:

private Node addWaiter(Node mode) {
    // 绑定当前线程 创建 Node 节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 判断同步队列尾节点是否为空
    if (pred != null) {
        // node 的前置节点指向队列尾部
        node.prev = pred;
        // 将同步队列的 tail 移动指向 node
        if (compareAndSetTail(pred, node)) {
            // 将原同步队列的尾部后置节点指向 node
            pred.next = node;
            return node;
        }
    }
    // tail 为空说明同步队列还未初始化
    // 此时调用 enq 完成队列的初始化及 node 入队
    enq(node);
    return node;
}
private Node enq(final Node node) {
    // 轮询的方式执行
    // 成功入队后退出
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 创建 Node, 并将 head 指向该节点
            // 同时将 tail 指向该节点
            // 完成队列的初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // node 的前置节点指向队列尾部
            node.prev = t;
            // 将同步队列的 tail 移动指向 node
            if (compareAndSetTail(t, node)) {
                // 将原同步队列的尾部后置节点指向 node
                t.next = node;
                return t;
            }
        }
    }
}

从代码中可以看出通过 CAS 操作保证节点入队的有序安全,其入队过程中如下图所示:

AQS节点入队过程
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 
        for (;;) {
            // 获取当前节点的前置节点
            final Node p = node.predecessor();
            // 判断前置节点是否为 head 头节点
            // 若前置节点为 head 节点,则再次尝试获取同步状态
            if (p == head && tryAcquire(arg)) {
                // 若获取同步状态成功
                // 则将队列的 head 移动指向当前节点
                setHead(node);
                // 将原头部节点的 next 指向为空,便于对象回收
                p.next = null; // help GC
                failed = false;
                // 退出轮询过程
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * This node has already set status asking a release
         * to signal it, so it can safely park.
         */
        // 若前置节点状态为 -1 ,则说明后置节点 node 可以安全挂起了
        return true;
    if (ws > 0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        do {
            // ws > 0 说明前置节点状态为 CANCELLED , 也就是说前置节点为无效节点
            // 此时从前置节点开始向队列头节点方向寻找有效的前置节点
            // 此操作也即是将 CANCELLED 节点从队列中移除
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        // 若前置节点状态为初始状态 则将其状态设为 -1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
private final boolean parkAndCheckInterrupt() {
    // 将当前线程挂起
    LockSupport.park(this);
    // 被唤醒后检查当前线程是否被挂起
    return Thread.interrupted();
}

从 acquireQueued 的实现可以看出,节点在入队后会采用轮询的方式(自旋)重复执行以下过程:

  • 判断前置节点是否为 head, 若为 head 节点则尝试获取同步状态; 若获取同步状态成功则移动 head 指向当前节点并退出循环
  • 若前置节点非 head 节点或者获取同步状态失败,则将前置节点状态修改为 -1, 并挂起当前线程,等待被唤醒重复执行以上过程

如下图所示:

AQS-节点自旋活动图

接下来我们看看同步状态释放的实现。

release

释放同步状态

public final boolean release(int arg) {
    // 尝试释放同步状态
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 唤醒后置节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        // 将 head 节点状态改为 0
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 获取后置节点
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 唤醒后置节点上所阻塞的线程
        LockSupport.unpark(s.thread);
}

从上述代码,我们可以明白释放同步状态的过程如下:

  • 调用 tryRelease 尝试释放同步状态,同样其具体的实现由子类控制
  • 成功释放同步状态后,将 head 节点状态改为 0
  • 唤醒后置节点上阻塞的线程

如下图所示(红色曲线表示节点自旋过程) :

AQS-释放锁

acquireInterruptibly()

独占模式下获取同步状态,不同于 acquire 方法,该方法对中断操作敏感; 也就是说当前线程在获取同步状态的过程中,若被中断则会抛出中断异常

public final void acquireInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        // 检查线程是否被中断
        // 中断则抛出中断异常由调用方处理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 不同于 acquire 的操作,此处在唤醒后检查是否中断,若被中断直接抛出中断异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            // 抛出中断异常后最终执行 cancelAcquire
            cancelAcquire(node);
    }
}
private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
    if (node == null)
        return;

    node.thread = null;

    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0)
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.
    Node predNext = pred.next;

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    node.waitStatus = Node.CANCELLED;

    // If we are the tail, remove ourselves.
    // 若当前节点为 tail 节点,则将 tail 移动指向 node 的前置节点
    if (node == tail && compareAndSetTail(node, pred)) {
        // 同时将node 前置节点的 next 指向 null
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            // 当前节点位于队列中部    
            Node next = node.next;
            if (next != null && next.waitStatus <= 0)
                // 将前置节点的 next 指向 node 的后置节点
                compareAndSetNext(pred, predNext, next);
        } else {
            // 若 node 的前置节点为 head 节点则唤醒 node 节点的后置节点
            unparkSuccessor(node);
        }

        node.next = node; // help GC
    }
}

从 acquireInterruptibly 的实现可以看出,若线程在获取同步状态的过程中出现中断操作,则会将当前线程对应的同步队列等待节点从队列中移除并唤醒可获取同步状态的线程。

tryAcquireNanos()

独占模式超时获取同步状态,该操作与acquireInterruptibly一样对中断操作敏感,不同在于超过等待时间若未获取到同步状态将会返回

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
        doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    // 计算等待到期时间
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                // 超时时间到期直接返回
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                // 按指定时间挂起s
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

节点的状态

同步队列中的节点在自旋获取同步状态的过程中,会将前置节点的状态由 0 初始状态改为 -1 SIGNAL, 若是中断敏感的操作则会将状态由 0 改为 1

同步队列中的节点在释放同步状态的过程中会将同步队列的 head 节点的状态改为 0, 也即是由 -1 变为 0;

小结

本文主要分析了独占模式获取同步状态的操作,其大概流程如下:

  • 在获取同步状态时,AQS 内部维护了一个同步队列,获取状态失败的线程会被构造一个节点加入到队列中并进行一系列自旋操作
  • 在释放同步状态时,唤醒 head 的后置节点去获取同步状态
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 220,884评论 6 513
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 94,212评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 167,351评论 0 360
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,412评论 1 294
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,438评论 6 397
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 52,127评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,714评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,636评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 46,173评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,264评论 3 339
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,402评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 36,073评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,763评论 3 332
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,253评论 0 23
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,382评论 1 271
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,749评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,403评论 2 358

推荐阅读更多精彩内容