Java AQS源码分析

在Java中,如果要对某一共享资源进行互斥访问,一般可以通过使用synchronized关键字或者是java.util.concurrent.locks包下面的某些类来实现,比如ReentrantLock。这个包下的所有类几乎都是以AbstractQueuedSynchronizer(后文中简称AQS)为基础来构建的,AQS为锁的实现提供了一个基本机制,包括挂起和唤醒某一线程、自动管理同步状态以及将获取不到锁的线程入队等,在这些基础机制之上,可以实现各种特性不一样的锁,比如公平锁,非公平锁,可被中断的锁等等

在基本用法上,synchronized和ReentrantLock很相似,基本用法都差不多,语义也差不多,都具备可重入等特性。synchronized是在字节码层面上通过monitorenter和monitorexit来实现的,线程的阻塞(获取不到锁时)和唤醒(待获取的锁被另一拥有它的线程释放时)是内核自己调度的,而基于AQS的锁实现可以让实现者自己决定线程获取锁的规则(当某一线程释放锁,n多之前因为没有获取到锁而阻塞的线程去获取这个已经释放的锁时),比如公平锁和非公平锁,相比于synchronized,ReentrantLock有比较多的高级特性,有业务需求的话也可以通过实现AQS来进行扩展,可扩展性较好

分析AQS之前需要知道CAS(即compare-and-swap,比较并交换)。某些硬件的指令集提供了CAS指令,CAS指令需要三个操作数,分别是内存地址Loc,旧的预期值oldValue,新的更新值updateValue,CAS指令执行时,当且仅当oldValue等于Loc位置处的值时,将Loc处的值更新为updateValue,否则不更新。这个CAS操作是一个原子操作,且比synchronized更轻量。

如果不想使用synchronized来更新共享变量的值(考虑到性能),可以用CAS+volatile的方式来保证线程安全,CAS操作保证了原子性,而volatile保证了可见性

AQS里面会用到一个比较重要的类,sun.misc.Unsafe,这个类的大部分方法都是native方法,可以执行一些比较“另类”的方法,例如向操作系统申请内存,也可以用Unsafe来执行CAS操作,挂起和唤醒线程也是通过Unsafe的park和unpark来实现的

public abstract class AbstractQueuedSynchronizer
        extends AbstractOwnableSynchronizer
        implements java.io.Serializable {

    private transient volatile Node head;           //链表的头指针
    private transient volatile Node tail;           //链表的尾指针
    private volatile int state;                     ///同步状态

    static final class Node {
        static final Node SHARED = new Node();        //当前等待的锁为共享模式
        static final Node EXCLUSIVE = null;           //当前等待的锁为排他模式

        static final int CANCELLED = 1;
        static final int SIGNAL = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;

        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;

    }
}

可以看到AQS里面有一个内部类Node,Node里面有prev和next引用,因此Node可以构成一个双向链表,并且Node里面还有一个Thread的引用,可以猜想,每一个打算去获取锁的线程在AQS里面都会先包装成一个Node,假如获取不到锁的话,会将当前的Node加入这个链表,当某一个线程释放锁的时候,可以从链表里面取一个正在等待的Node,并将锁给他

首先来看看获取锁的方法

public final void acquire(int arg){
        if(!tryAcquire(arg)&&
        acquireQueued(addWaiter(Node.EXCLUSIVE),arg))
        selfInterrupt();
        }

整个方法的逻辑为:首先尝试去获取锁,如果由于竞争失败导致获取锁失败,则将自身包装成一个node对象加入到链表末尾,并尝试去获取锁

其中tryAcquire(arg)为一个protected方法,需要子类去实现,这个方法试图去获取锁,如果成功获取到锁,则返回true,否则返回false。等下次分析某个AQS的实现类时,可以看到tryAcquire(arg)的一些具体实现(会涉及到同步状态state),但是现在可以不用管它,只需要知道,如果获取到锁了就返回true,没获取到锁就返回false

如果没有获取到锁,则进入addWaiter(Node mode)方法
本文中,说到步骤的时候都是以 “方法名.步骤n” 这样一种形式来描述,以避免冲突,假如方法是当前正在分析的方法,则省略方法名,只说步骤n

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode):                      //1
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {                                                      //2
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {                                 //3
                pred.next = node;
                return node;
            }
        }
        enq(node);                                                               //4
        return node;                                                             //5
    }

这个方法的步骤为:
1:首先利用当前线程创建node节点,并取出尾节点tail
2:判断之前的尾节点tail是否为null。如果不为null,则将新创建节点node的prev引用指向尾节点tail,并进入步骤3。如果尾节点为null,则进入步骤4
3:这时候利用CAS操作试图将尾节点修改为新创建的node节点。如果CAS操作成功,则新创建的节点成为尾节点,并且将之前尾节点的next引用指向新创建的节点node,实现一个链表中两个节点的连接行为,并返回这个新创建的节点node。如果CAS操作失败,则说明已经有其它线程抢先一步将它自己加入到了链表中,此时,进入步骤4
4:如果尾节点为null或者步骤3中的CAS操作失败,则会进入此步骤,注意步骤4中传入的参数为新创建的node节点

接下来看下步骤4中的enq(Node node)方法,注意,enq方法是用final修饰的

private Node enq(final Node node) {
        for (; ; ) {
            Node t = tail;                                  //1
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))          //2
                    tail = head;
            } else {
                node.prev = t;                              //3
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

此方法中,for是一个无线循环,出口只有else分支里面的一个return语句,此方法的步骤为:
1:首先取出尾节点tail,并判断尾节点是否为null。如果尾节点为null,则进入步骤2,否则进入步骤3
2:新创建一个节点(这个节点并没有绑定某一个线程,可以认为是一个“空”节点),并通过CAS操作尝试将其设置为头结点,如果设置成功,将尾节点也指向刚创建的节点,并进入下一次循环,此时tail节点已经不为null。需要注意的是这里并没有采用CAS的方式将新创建的节点设置为tail,这是不必要的,因为return语句在else分支里面,而进入else分支的条件是尾节点不为null,假如我们在tail = head处打一个断点,并且让当前线程跑到这个断点处,这时候让其它的线程调用acquire方法,当其他的线程进入enq方法后,由于头结点已经被设置了,所以其他线程在//2处的CAS操作将失败,此时会进行下一次循环,从这可以看到,初始化的时候,假如A线程设置了head节点,那就只有A线程有权利更新尾节点tail,之后方法才能正常进行下去。而只有设置了tail节点之后,代码才有可能走到else分支从而返回。
3:如果尾节点tail存在,则将传入参数node的prev指针指向tail,并利用CAS操作尝试将传入的节点node设置为尾节点,如果CAS操作成功,则将之前尾节点的next指针指向传入的参数node,并返回之前的尾节点tail,如果CAS操作失败(其它线程抢先一步将其自己设置成了尾节点),则进入下一次循环并继续尝试将传入的参数节点node设置为尾节点,直到CAS操作在某次重试中成功

当enq(Node node)方法成功返回后,进入步骤addWaiter.5,返回新创建的node

所以addWaiter()要做的就是,在多线程的环境下,竞争锁失败的时候,将当前线程包装成一个node节点,并将其安全地插入到链表的末尾。
需要注意的是,假如之前一个线程已经获得了某一个锁且未释放,也没有其它线程请求去获取锁,这时候链表中并没有任何节点元素,这时候假如有另一个线程去请求获取这个锁,则会将当前线程封装的节点node插入链表末尾,这个节点在链表中的位置并不是头结点,而是链表中的第二个节点。初始化链表的时候会插入一个不绑定线程的“空”节点
接下来调用acquireQueued方法

acquireQueued(final Node node, int arg)方法如下所示,可以看到,只有在当前节点的前一个节点是头结点head的时候才有可能获取到锁(即当前节点的前一个节点为head头结点是当前节点能获取到锁的前提条件,考虑到非公平锁,这么说其实有点不太正确)

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;                                   //1
        try {
            boolean interrupted = false;
            for (; ; ) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {              //2
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&     //3
                        parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {                                              //4
            if (failed)
                cancelAcquire(node);
        }
    }

该方法的步骤为:
1:首先将failed初始化为true,表示获取锁是否失败,失败了会在步骤4中做一些后续工作,至于interrupted的含义,会在后文看到。然后取出当前节点node的前一个节点p 。如果p为头结点head且当前线程能获取到锁,则进入步骤2。否则,如果不是头结点或者获取不到锁,进入步骤3
2:当当前节点node的prev为head头结点,且能获取到锁时,进入该步骤。将当前节点设为头结点,并且将之前的head头结点从链表中分离,这时候将failed设置为false,表示成功获取到了锁。并且返回interrupted。注意,这里设置头结点head的时候并不需要通过CAS操作来进行,因为没有竞争
3:如果当前节点node的前一个节点不为head头节点,或者没有获取到锁,会进入该步骤。该步骤会调用shouldParkAfterFailedAcquire(Node pred, Node node)方法,这个方法的功能已经被方法名描述的很形象了,就是判断锁获取失败之后需不需要将当前线程挂起(AQS中通过park和unpark方法来挂起和唤醒线程),假如shouldParkAfterFailedAcquire返回true,则调用parkAndCheckInterrupt方法,如果parkAndCheckInterrupt也返回true,则将interrupted赋值为true

首先看看shouldParkAfterFailedAcquire方法

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)                                   //1
            return true;
        if (ws > 0) {                                            //2
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {                                                 //3
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

先说一下内部类Node的waitStatus字段以及CANCELLED、SIGNAL、CONDITION、PROPAGATE等字段,在源码里有关于这几个字段的简短说明

/** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;

大致意思就是说,waitStatus表示了当前节点所代表线程所在的状态,有可能为CANCELLED、SIGNAL、CONDITION、PROPAGATE、0中其中的一种
CANCELLED表示当前线程由于超时或者被中断
SIGNAL表示当前节点的next指向的节点正在阻塞或即将被阻塞,所以当前节点释放锁或者取消时要唤醒其后一个节点(也就意味着假如当前节点的waitStatus为SIGNAL,当它释放锁或者取消时会去唤醒它的下一个节点)
CONDITION:表示当前节点处于条件队列中
PROPAGATE:表示唤醒当前节点的动作需要向后传播(不仅仅限于当前节点的后一个节点),在某些共享锁里面会用到这个,排它锁不会用到这个值

1:如果当前节点node的前一个节点pred的waitStatus为SIGNAL,则返回true(如果pred节点的waitStatus为SIGNAL,则意味着当它释放锁或者取消时会去唤醒它的下一个节点node,所以这时候我们只需要将node所代表的线程挂起就行了,所以返回true)
2:如果当前节点node的前一个节点的waitStatus大于0,会将node之前的waitStatus大于0的所有节点从链表中删除掉,直到某一个节点的waitStatus小于等于0,并且将这个节点和node互相关联起来
3:如果当前节点node的前一个节点pred的waitStatus等于0,则尝试用CAS操作将pred节点的waitStatus值改为Node.SIGNAL
在分支2和分支3中,由于其前面一个节点的waitStatus不为SIGNAL,所以没办法保证会有一个线程去唤醒node,则只能返回false且进入下一次循环(即只有在前一个节点的waitStatus为SIGNAL的情况下,后面的线程才有可能被唤醒,即挂起node所代表的线程是安全的)
如果shouldParkAfterFailedAcquire方法返回true,则进入parkAndCheckInterrupt()方法

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

可以看到parkAndCheckInterrupt()方法很简单,只是挂起当前线程并返回当前线程的中断标记(如果有其他线程唤醒它)
如果当前线程被设置过中断标记(注意,interrupted方法会清除掉线程的中断状态)
则步骤acquireQueued.3中会将interrupted置为true并返回,以标志当前线程在等待锁的过程中是否被中断过
当当前线程被唤醒后,又会进入下一次循环,继续下一次获取锁的尝试,直到其获取到锁或者抛出异常
当acquireQueued返回true时,acquire方法的selfInterrupt方法会设置当前线程的中断标记

整个acquire方法的精髓就在这儿了,一个节点node的前一个节点假如是head节点,他才会去尝试获取锁,需要注意的是,这里不一定能够获取到锁(非公平锁),假如获取不到锁,线程会尝试将自己挂起,等待被中断或者被其它的线程唤醒,当被唤醒后,会继续尝试去获取锁,直到获取到锁或者抛出异常,又或者被取消等等。。

接下来说下线程释放锁的过程:

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {                                       //1
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

在unparkSuccessor方法中,如果node的waitStatus为0,首先会尝试将node的waitStatus设置为0,就算CAS操作失败了也不要紧,这意味着node的waitStatus已经被改变了。
在步骤1中,如果当前节点node的后一个节点s不为null且s的waitStatus小于等于0,则可以直接唤醒s对应的线程,如果s为null或者s的waitStatus大于0,则会从链表的末尾开始向前找,直到找到一个距离节点node最近的且waitStatus小于等于0的节点,并唤醒它

Doug Lea大师把CAS指令玩的飞起。只能膜拜!!

待续...

key words:AQS CAS Unsafe

关于AQS,多线程大师Doug Lea发表了一篇论文,《The java.util.concurrent Synchronizer Framework》,感兴趣的可以看看

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容