队列同步器AbstractQueuedSynchronizer源码解析

AQS是用来构建锁或者其他同步组件的基础框架,能够实现大部分同步需求的基础,AQS基于volatile变量提供的锁的内存语义和CAS原子操作指令来实现多线程的同步机制。state同步状态量为整型,并发locks包中还提供了AbstractQueuedSynchronizer的镜像long类型state版本AbstractQueuedLongSynchronizer,支持更多的线程获取同步状态。

原理解析

等待队列节点

static final class Node {
    /**共享节点模式*/
    static final Node SHARED = new Node();
    /** 独占节点模式 */
    static final Node EXCLUSIVE = null;

    /** waitStatus 状态的常量表示 */
    static final int CANCELLED =  1;
    static final int SIGNAL    = -1;
    static final int CONDITION = -2;
    static final int PROPAGATE = -3;
    volatile int waitStatus;

    // 前继节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 入队的线程
    volatile Thread thread;

    // 等待在condition上的下一个节点(独占模式)或者是特定的值SHARED用来标示此节点是共享模式
    Node nextWaiter;

    // 如果当前节点等待在共享模式上则返回true
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前继节点,如果前继节点为null则抛出异常
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    // 默认构造函数:用来初始化头节点或者SHARED共享模式标志
    Node() {
    }
    // 被addWaiter调用
    Node(Thread thread, Node mode) {
        this.nextWaiter = mode;
        this.thread = thread;
    }
    // 被Condition使用
    Node(Thread thread, int waitStatus) {
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

等待状态详解:

  1. CANCELLED 取消状态

由于在等待队列中等待的线程等待超时或者被中断,需要从等待队列中取消等待,之后这个状态的节点状态将不再发生变化。处于这种状态的节点会被踢出队列,被GC回收

  1. SIGNAL 通知状态

当前节点的线程释放了同步状态或者被取消,将会唤醒后继等待(阻塞)的节点,使得后继节点的线程得以运行。

  1. CONDITION 条件等待状态

当前节点等待(阻塞)在Condition上,当其他线程对Condition调用signal方法后,该节点将会从等待队列中转移到同步队列中,加入对同步状态的获取中。

  1. PROPAGATE 传播状态

表示在下一次共享模式同步状态获取将会无条件地被传播下去。使用共享模式的head节点有可能处于这种状态。

  1. INITIAL 初始状态

新建节点处于这种状态

分析思路:分析主要方法的调用流程,洞察实现原理

独占式同步状态获取与释放

主要方法是acquire和release

acquire方法流程
  1. acquire 独占式同步状态的获取,获取过程中忽略中断,但是最终会设置中断标志
    /**
     * 向外提供的独占式获取同步状态的方法
     *
     * 
     * 第一次尝试获取同步状态,成功则返回,
     * 失败则将当前线程构造成节点并添加到等待队列
     * 并死循环获取同步状态
     * 如果线程被中断则设置当前线程的中断状态
     */
    public final void acquire(long arg) {
        // 第一次尝试获取同步状态,成功则返回
        if (!tryAcquire(arg) && 
            // 失败则构造节点,入队,死循环获取
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // 设置中断标志
            selfInterrupt();
    }
  1. tryAcquire
    /**
    * 需要重写的自定义方法,该方法需要保证线程
    * 安全的获取同步状态(CAS,用AQS提供的方法实现)
    * @return 获取成功则返回true
    */
   protected boolean tryAcquire(long arg) {
       throw new UnsupportedOperationException();
   }
  1. addWaiter
    /**
     * 为当前线程以及给定模式创建新节点并入队
     *
     * @param mode 节点等待模式
     * @return node 新添加的节点
     */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // 进行一次入队尝试,失败则有enq的死循环CAS来保证最后能入队
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        // 循环CAS来保证最后能入队
        enq(node);
        return node; // 返回新添加的等待节点
    }
  1. enq
    /**
     * 线程安全地插入节点到等待队列中(死循环CAS)
     * 当尾节点为null时插入前需要初始化头结点
     * @param node 入队的节点
     * @return t 入队节点的前继节点
     */
    private Node enq(final Node node) {
        // 循环CAS将节点入队
        for (;;) { 
            Node t = tail;
            // 当尾节点为null时表明:则说明等待队列为空
            // 需要初始化一个头尾相等的等待队列。
            // head和tail的延迟初始化。
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))// 初始化需要 CAS 设置头节点
                    tail = head; // 头尾相等
            }
            // 尾节点不为null,说明等待队列不为空,可以插入 
            else {
                node.prev = t; // 插入节点和尾节点前向关联,防止断裂
                if (compareAndSetTail(t, node)) { // CAS 插入尾节点
                    t.next = node; // 后向关联
                    return t; // 返回插入节点的前继节点
                }
            }
        }
    }
  1. acquireQueued
 /**
  * 每一个加入等待队列的节点线程通过自旋获取同步状态
  * 判断自己的前继节点是否为头结点,如果是则尝试获取同步状态
  * 成功则返回,否则根据前继节点是否为SIGNAL来阻塞线程
  *
  * @param node 入队的节点
  * @return 等待过程中发生中断则返回true
  */
 final boolean acquireQueued(final Node node, long arg) {
     // 获取失败标志初始化为true(第一次确实失败了)
     boolean failed = true;
     try {
         // 线程中断标志
         boolean interrupted = false;
         // 自旋获取同步状态
         for (;;) {
             // 如果为null抛出异常
             final Node p = node.predecessor();

             // 判断前继节点是否为head节点,
             // 如果是则尝试获取同步状态
             if (p == head && tryAcquire(arg)) {

                 // 成功获取同步状态,则将当前节点设置为head结点
                 // 在独占模式下设置head节点,不需要考虑线程问题,
                 // 因为同一时刻只有一个线程能获取同步状态
                 setHead(node); 

                 // head节点的prev和thread引用为null,如果next引用也为null则将被GC
                 // 断开head节点的所有引用,帮助GC
                 p.next = null; 

                 failed = false;
                 return interrupted;
             }
             // 如果当前节点前继不是head节点,或者是前继节点head节点但是
             // 没有获取到同步状态

             // 而且如果需要阻塞线程,则阻塞线程,并将线程中断标志
             // 重置,并设置interrupted为重置前的中断状态
             if (shouldParkAfterFailedAcquire(p, node) &&
                 parkAndCheckInterrupt())
                 interrupted = true;
         }
     }
     // 前面死循环自旋,只有成功获取才能退出循环,而且还会设置failed为false
     // 所以我在想什么时候failed为true,并执行cancelAcquire操作
     // 后来发现在 node.predecessor() 方法中当前继节点为null时会抛出
     // NullPonitException异常,看来当前节点没有前继节点时需要取消获取的操作
     // 这是由于取消状态的节点从等待队列中移除,没有关联的前节点而导致,
     // 因为head节点一定是获取到同步状态的线程,在成功获取同步状态线程会
     // 立刻返回。
     finally {
         if (failed) 
             cancelAcquire(node);
     }
 }

值得注意的是,在获取同步状态的主循环中,仅仅是记录中断状态,然后将中断状态返回,在acquire中则根据这个中断状态,在获取成功返回前设置这个中断标志,经历如下过程:
**parkAndCheckInterrupt重置返回→acquireQueued记录并返回→acquire重新设置 **

成功获取同步状态的线程在这个方法里** 只是调用setHead设置head节点,但是并没有修改head的等待状态。,当释放同步状态时会将head节点状态CAS设置为初始状态,然后唤醒后继阻塞的线程,被唤醒的线程会检查前继节点以及其状态,当获取失败(隐含有线程成功获取同步状态,并设置了新的head)**会确保前继节点的状态为SIGNAL

什么时候会执行finally语句块的内容?

比如fullyRelease方式失败,会将节点状态设置为CANCEL,详解见上面代码注释

  1. setHead
    /**
     * 将给定的节点设置为头结点
     * 等待队列初始化后,只有获得同步状态的线程才能设置头结点
     * 所以不需要循环CAS来设置头结点
     * head节点只是获得同步状态的线程的通知阻塞线程的载体
     * @param node 新的head节点
     */
    private void setHead(Node node) {
        head = node;
        node.thread = null; // 并不保存获得同步状态线程信息
        node.prev = null; // 前继节点为null
    }

独占模式下获取到同步状态的线程的节点的状态不会在setHead之后发生变化,也不会影响后继节点。但是在共享模式下会将发生变化,见共享模式

  1. shouldParkAfterFailedAcquire
   /**
     * 检查并设置获取失败的节点的等待状态
     * 新加入的节点要么获取同步状态要么阻塞,
     * 如果当前节点是head结点,则tryAcquire获取同步状态
     * 如果当前节点不是head结点,则需要判断是否需要阻塞
     * 这个方法是整个acquire循环中主要的信号控制
     * @param pred node的前继节点
     * @param node 
     */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前继节点的等待状态
        int ws = pred.waitStatus;

        // 如果前继节点为通知状态
        // 表示后继阻塞的节点能够被前继节点唤醒(释放或取消),所以大胆的阻塞当前节点
        // 则返回true,表示应该阻塞当前节点
        if (ws == Node.SIGNAL)
            return true;

        // 如果为取消状态,则将当前节点的前继节点设置为
        // 向前遍历不为取消状态的第一个节点,并关联彼此。
        // 其实相当于在等待队列中删除取消状态的节点。
        if (ws > 0) {
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        }

        // 如果前继节点不为取消状态,则表明需要一个通知的信号
        // 设置前继节点为通知状态,但是不会阻塞线程。
        // 并由调用者在下一次尝试保证
        // 在阻塞之前不能进行获取同步状态的操作
        else {
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

对当前节点的前继节点进行判断:

  1. 当为状态为SIGNAL时则返回true,
  2. 当为CANCEL时则删除当前节点的状态为CANCEL的前继节点,直到第一个非CANCEL节点并关联彼此。
  3. 状态为INITIAL、PROPAGATE和CONDITION时,则将前继节点设置为SIGNAL,确保前继节点能够通知到当前节点,并返回false,不阻塞,进行下一次循环获取。

前继节点为SIGNAL状态的线程都需要阻塞,当前继节点不为SIGNAL,要设置为SIGNAL,即始终保持前继节点的SIGNAL状态。

  1. parkAndCheckInterrupt
    /**
     * 阻塞当前线程
     * @return 如果当前线程被中断则返回true
     */
    private final boolean parkAndCheckInterrupt() {
        // 参数this为阻塞对象,用来标识当前线程在等待的对象,
        // 该对象用来问题排查和系统监控
        LockSupport.park(this);

        // 清除中断标志,并返回清除之前的中断状态
        return Thread.interrupted();
    }
  1. selfInterrupt
    /**
     * 设置当前线程的中断标志
     */
    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }
  1. cancelAcquire
    /**
     * 取消CANCEL状态的节点尝试的获取操作
     *
     * @param node CANCEL状态的节点
     */
    private void cancelAcquire(Node node) {
        // 节点为null则返回
        if (node == null)
            return;

        // 清除线程引用,帮助GC
        node.thread = null;

        // 跳过取消的前继节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // 保存前继节点的后继节点引用
        Node predNext = pred.next;

        // 设置当前节点的状态为取消状态
        node.waitStatus = Node.CANCELLED;

        // 如果当前节点为尾节点,则将前继节点设置为尾节点
        // 并将尾节点的后继节点设置为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } 
        // 如果当前节点不为尾节点,或者为尾节点,但是设置尾节点时失败,
        // 此时当前节点可能存在后继节点,替代它成为尾节点
        else {
            // 如果后继节点能够被通知,则将当前节点的前继节点的后继节点设置
            // 为当前节点后继节点,否则直接唤醒当前节点的后继节点。
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

将取消获取的节点的状态设置为CANCEL,并剔除,然后确保调整后的节点的前继节点为SIGNAL状态。

release方法流程
  1. release
    /**
     * 释放同步状态
     * @return tryRelease返回true则返回true
     */
    public final boolean release(long arg) {
        // 修改同步状态成功
        if (tryRelease(arg)) { 
            Node h = head;
            // 如果head节点不为null,而且状态不为初始状态
            // 则唤醒head后继节点
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

独占模式下,只有获取到同步状态的线程才能释放,所以不存在线程安全的问题

  1. tryRelease
    /**
     * 独占模式释放同步状态,不需要保证线程并发
     * 需要实现此方法,为释放同步状态修改state值
     */
    protected boolean tryRelease(long arg) {
        throw new UnsupportedOperationException();
    }
  1. unparkSuccessor
    /**
     * 唤醒node节点存在的一个后继节点
     * 注意:唤醒的节点并不一定能获取到同步状态,
     * 所以setHead方法并在这里调用,而是在acquireQueued
     * 中成功获取同步状态后调用
     */
    private void unparkSuccessor(Node node) {
        /*
         * 当节点的等待状态waitStatus为负值时,比如可能通知后继阻塞节点执行等,
         * 则需要重置状态值为初始值,这里没有采用循环CAS设置,即使这一次尝试由于
         * CAS失败或者等待线程修改状态而导致失败都是允许的。
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * 如果后继节点为null或者为CANCELLED取消状态则从尾节点开始向前遍历直到
         * 找到一个为非CANCELLED状态的节点。
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从尾节点向前遍历到node节点之前找到非CANCELLED节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果遍历后找到非CANCELLED状态的节点则唤醒节点的线程
        if (s != null)
            LockSupport.unpark(s.thread);// 唤醒非CANCELLED状态的线程
    }

唤醒head后继的有效节点,如果head后继节点为null或者为取消状态,则从后向前遍历找到第一个有效的节点,被唤醒的节点会删除前继为取消状态的节点(shouldParkAfterFailedAcquire()方法中),唤醒的节点加入对同步状态的获取中
注意:被唤醒的线程并不一定能获得同步状态

独占式超时同步状态获取

队列同步器AbstractQueuedSynchronizer源码解析-续1

共享式同步状态获取与释放

队列同步器AbstractQueuedSynchronizer源码解析-续2

同步队列的使用方式

队列同步器AbstractQueuedSynchronizer源码解析-AQS使用

总结
  1. 从唤醒的线程竞争获取同步状态来看,AQS的获取并非绝对公平的,因为每一个新增的节点获取立马竞争同步状态,如果此时恰巧获取成功则这种获取是非公平的,失败才会添加到等待队列当中,等待队列内部竞争才是FIFO的,是公平的。

  2. 如何做到公平获取:需要tryAcquire和tryAcquireShared自定义方法中需要调用hasQueuedPredecessors方法来判断等待队列中是否含有不为当前线程的head的后继节点,如果有,则说明当前线程并不能立马去获取同步状态,而是加入等待队列中以FIFO来公平获取同步状态。

hasQueuedPredecessors方法
hasQueuedPredecessors方法

重入公平锁tryAcquire方法实例
image.png
  1. AQS中等待队列的每个节点可以是独占模式的也可以是共享模式的(即独占模式和共享模式都共享自同一FIFO队列),但是一般情况下,等待队列中的所有节点的等待模式一般为同一种模式,比如ReentrantLock重入锁,仅需重写对应的获取和释放的方法。也可以所有节点不为同一等待模式,如ReadWriteLock读写锁,需要重写不同模式的获取和释放方法

  2. ConditionObject被定义为AQS内部公共类,用来向外暴露给调用者;ConditionObject实现了Condition接口,用于在独占模式下。使用ConditionObject必须重写isHeldExclusively方法,因为ConditionObject多处调用了这个方法。ConditionObject详解

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容