AbstractQueuedSynchronizer源码(上)--排他锁

AbstractQueuedSynchronizer简称为AQS,AQS是ReentrantLock、CountdownLatch、CycliBarrier等并发工具的原理/基础,所以了解AQS的原理对学习J.U.C包很重要,本篇博客主要学习排他锁的加锁和解锁过程,而共享锁的部分将会在下一篇博客中学习。

基本原理:

1.AQS中包含两种队列(FIFO),同步队列+条件队列,底层都是双向链表,也就是通过其内部的Node实现。

  1. AQS有排他锁和共享锁两种模式,子类可以实现内部类选择实现一种,当然也可以通过两个内部类定义两种锁,例如ReentrantReadWriteLock,一个读锁,一个写锁。

3.子类通过对volatile修饰的state字段赋值,判断当前是否能够获取锁。

4.通过new ConditionObject()获得条件队列。

5.AQS定义了锁的框架,但是如何获取锁,释放锁等需要子类实现,AQS中默认抛出UnsupportOperationException。

AQS

类定义:

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
}

AQS继承了AbstractQueuedSynchronizer类,这个类只包含一个属性,以及其get、set方法,主要用来记录当前获取锁的线程。

private transient Thread exclusiveOwnerThread;

基本属性:

//同步队列的头结点
private transient volatile Node head;

//同步队列的尾结点
private transient volatile Node tail;

//当前锁的状态,需要子类去实现,例如0表示可以获取锁,1表示当前锁被持有,无法获取(排他),共享锁模式下,state>0表示持有锁线程的数量
private volatile int state;

//旋转超时阈值,设置等待超时时间才生效
static final long spinForTimeoutThreshold = 1000L;

这几个属性中,最重要的就是state,子类通过CAS机制进行赋值,保证其原子性。

条件队列属性:

public class ConditionObject implements Condition, java.io.Serializable {
    //条件队列的头结点
    private transient Node firstWaiter;
    //条件队列的尾结点
    private transient Node lastWaiter;
}

条件队列就是通过ConditionObject得到,其实现了Condition接口,Condition内部定义了一些抽象方法,如await()、signal()、signalAll(),相当于Object中的wait、notify、notifyAll。我们看到这两种队列使用的Node,所以下面一起看看Node的定义。

Node属性:

static final class Node {

    //标记当前node为共享模式
    static final Node SHARED = new Node();

    //标记当前node为独占模式
    static final Node EXCLUSIVE = null;

    //当前节点的状态,通过waitStatus控制节点的行为,同步队列初始为0,而同步队列节点初始为-2,下面就是几种取值
    volatile int waitStatus;

    //当前节点被取消,属于无效节点
    static final int CANCELLED =  1;

    //当前节点的后继节点将要/已经被阻塞,在当前节点release的时候需要unpark后继节点
    static final int SIGNAL    = -1;

    //当前节点处于条件队列
    static final int CONDITION = -2;

    //共享模式下释放锁,应该传播到其他节点
    static final int PROPAGATE = -3;

    //当前节点的前一个节点
    volatile Node prev;

    //当前节点的后一个节点
    volatile Node next;

    //当前节点持有的线程,head不保存Thread,只是保存其后继节点的引用
    volatile Thread thread;

    //等待队列的中表示下一个节点,如果是同步队列,只是表示当前节点处于共享模式还是独占模式
    Node nextWaiter;
}

我们通过Lock.lock()获取锁的时候,根据定义决定调用acquire()还是tryAcquire(),这两个方法都是获取排他锁的,下面看看具体实现细节。

acquire()获取排他锁:

public final void acquire(int arg) {
    //tryAcquire:在AQS中默认抛出异常,需要子类去实现,子类一般选择非public的内部类去实现AQS,表示当前是否获取到锁,如果获取到锁,直接结束执行
    //addWaiter:当前没有获取到锁,将线程添加到同步队列尾部
    //acquireQueued:阻塞当前节点
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        //自我打断
        selfInterrupt();
}

上面注释中对acquire()中每一个方法的基本描述,我们可以简单实现tryAcquire()。

tryAcquire():

// 尝试获取锁,如果状态为0,获取到锁,更新state为1(1代表exclusive模式下锁已经被持有)
public boolean tryAcquire(int acquires) {
    //CAS实现赋值
    if (compareAndSetState(0, acquires)) {
        //将当前线程set到AbstractOwnableSynchronizer中的exclusiveOwnerThread,为了方便跟踪获得锁的线程
//                ,可以帮助监控工具识别哪些线程持有锁。
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

上面是一种tryAcquire的基本实现,通过CAS对state进行赋值,0表示当前可以获取锁,1表示当前锁被别的线程持有,无法获取。如果返回true,直接结束。如果返回false,会有后续流程。

addWaiter():

//acquire方法为EXCLUSIVE独占锁模式
addWaiter(Node.EXCLUSIVE);

private Node addWaiter(Node mode) {
    //将当前线程,封装成一个EXCLUSIVE模式的节点
    Node node = new Node(Thread.currentThread(), mode);
    //取出尾结点
    Node pred = tail;
    //
    if (pred != null) {
        //tail赋值给当前节点的prev
        node.prev = pred;
        //如果CAS把当前node变成tail节点,返回当前节点
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    //如果一次没有把当前节点放到队列中,进入自旋enq()
    enq(node);
    return node;
}

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
         //如果tail 为null,通过CAS进行head、tail的初始化
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {//和之前的操作一样,只是外部是for(;;)保证入队成功
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                //注意这里return 的是当前节点的前一个节点,和上面不一样。
                return t;
            }
        }
    }
}

通过上面的代码将新线程封装成一个新的节点,进行入队操作,但是不知道为啥返回当前节点或者其前一个节点,我也不知道为啥这样,麻烦大佬留言告知。

acquireQueued():

final boolean acquireQueued(final Node node, int arg) {

    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //p为当前节点的前一个节点
            final Node p = node.predecessor();
            //如果p为head,也就是当前节点为head的后继节点,就会尝试获取锁,如果成功,将node设置为head,直接返回
            if (p == head && tryAcquire(arg)) {
                //把head节点的thread赋值为null,head节点永远是空节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果当前node不是head的后继节点,将前一个节点的waitStatus设置为signal,并且阻塞自己
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        //如果上面失败,取消正在进行中的acquire
        if (failed)
            cancelAcquire(node);
    }
}

acquireQueued方法从命名上看到,排队获取,主要逻辑就是讲当前节点的前一个节点waitStatus设置为signal,然后阻塞自己。通过parkAndCheckInterrupt()将自身阻塞的。

shouldParkAfterFailedAcquire():

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //ws为当前节点的前驱节点waitStatus
    int ws = pred.waitStatus;
    //如果ws已经是signal,直接返回
    if (ws == Node.SIGNAL)
        return true;
    //如果ws>0,也就是canceld状态,往前面遍历,知道找到前面不是canceld状态的节点
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //通过CAS将ws设置为signal状态
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
//通过LockSupport.park挂起线程,直到被唤醒,返回是否interrupt
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

parkAndCheckInterrupt()将自己阻塞,当被唤醒的时候,仍然在for(;;)内部自旋尝试获取锁。独占模式获取锁的流程,大概是这样。

release()释放排它锁:

public final boolean release(int arg) {
    //尝试去释放锁,方法由子类实现
    if (tryRelease(arg)) {
        Node h = head;
        //当前队列head不为null,外套Status不是初始状态
        if (h != null && h.waitStatus != 0)
            //唤醒head的后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

tryRelease()由子类实现,我们看一下ReentrantLock的乐观锁实现。

//unlock方法调用,参数为1
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    //如果当前节点和AbstractOwnableSynchronizer保存的线程不相同,直接抛出异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //如果当前state状态就是1
    if (c == 0) {
        free = true;
        //将独占线程设置为null,返回true
        setExclusiveOwnerThread(null);
    }
    //将c赋值state
    setState(c);
    return free;
}

对于release来说,参数为1,只有当前state为1的状态,返回true,执行后续唤醒操作。

unparkSuccessor():

private void unparkSuccessor(Node node) {

    //此时node节点就是head
    int ws = node.waitStatus;
    //ws<0,如果>0就是被取消,不用管,所以判断<0的时候,将head初始化
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    
    Node s = node.next;
    
    //①
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //②
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor()就是唤醒其后继节点,这里标注①的位置是一个注意点,为什么是从tail往前面遍历,一直到②,将其唤醒。

原因:

之前节点都在队列中阻塞,通过acquireQueued()阻塞的,逻辑是判断node的前驱节点是否为head,并且尝试获取锁(想不起来的话,回头看看代码)。而在unparkSuccessor()中当前释放的节点是从head的后驱节点开始的,判断==null || waitStatus > 0,说明这个节点是不符合可被唤醒的条件,如果是从前往后开始遍历,找到第一个waitStatus符合的node,然后唤醒,这个node是从acquireQueued()中苏醒的,需要判断前驱节点的,而此时所有前驱节点都不满足,还是直接被阻塞,那就完蛋了。所以在==null || waitStatus > 0前提下,从tail往前遍历,找到节点能够直接过滤掉无效的前驱节点,不然的话,被唤醒,然后直接阻塞。

总结:

关于AQS排他锁加锁和解锁的过程,已经做了基本了解,由于水平实在有限,如果博客中有错误请指出。

加锁:

  1. 通过自己对status的CAS赋值去尝试获取锁,如果成功就直接结束,失败进入2。

  2. 将当前thread封装成节点,通过CAS将node添加到同步队列的尾部。

  3. 调用acquireQueued(),如果当前节点的前驱节点为node,再次尝试获取锁,只有head的后驱节点才能去获取锁,成功的话,将自身设置为head,head节点是没有线程的dummy节点,如果不成功,进入4.

  4. 将当前节点的waitStatus != canceld的前驱节点设置为signal状态,然后通过LockSupport.park(this)直接阻塞自身。

  5. 如果有节点释放锁了,也是在acquireQueued()中被唤醒,继续获取锁。

  6. 如果获取锁失败,会取消尝试获取锁的线程。

解锁:

  1. 排他锁解锁内容比较简单,首先尝试释放锁,判断当前线程和持有锁的线程是否一致,判断status是否为1,如果符合将持有锁的线程设置为null,status设置为0。

  2. 从head节点开始,通过LockSupport.unpark()唤醒其后驱节点。

  3. 但是如果head后驱节点s == null || s.waitStatus > 0的情况是从tail开始向前唤醒,知道找到符合的节点,原因上面已经说了。

  4. 被唤醒的节点还是处于自旋尝试获取锁。

我们将在下篇博客去学习共享锁的基本原理。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,110评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,443评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,474评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,881评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,902评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,698评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,418评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,332评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,796评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,968评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,110评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,792评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,455评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,003评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,130评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,348评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,047评论 2 355