java AQS框架

概述

AbstractQueuedSynchronizer,即队列同步器。它是构建锁或者其他同步组件的基础框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是JUC并发包中的核心基础组件。

设计

AQS的基本设计思想并不复杂:
获取锁:

while (synchronization state does not allow acquire) {
  enqueue current thread if not already queued;
  possibly block current thread;
}
dequeue current thread if it was queued;

释放锁:

update synchronization state;
if (state may permit a blocked thread to acquire)
  unblock one or more queued threads;

这两段代码中有三个基本组件:

  • state
    一个同步操作的变量,用以表示锁的状态,加锁之前需要去修改state,修改成功了说明获取锁成功。当然,解锁时也需要更新锁的状态
  • queue
    等待队列,获取锁失败的线程需要进入这个队列,等待着被唤醒,再次尝试获取锁。获取到锁的线程,需要出队列(如果线程在等待队列中的话)
  • block and unblock
    在线程进入等待队列后,需要阻塞自己。释放锁时,需要唤醒等待队列中的一个或多个线程,以让它获取锁。

Synchronization State

state是整个机制中竞争访问最激烈的对象了,它的操作应该具备以下三个条件:

  • 原子性
    state的更新必定得具备原子性,以保证更新动作的完整,i++这种错误在state的更新中是绝对不允许的。
  • 可见性
    state的更新对其他线程应当是立即可见的。
  • 有序性
    鉴于Java的指令重排序优化,使得有序性成为同步操作的一个必要条件。考虑一下单例模式的双锁检测实现方式,正是因为乱序执行引发的错误(java 1.5后用volatile可修复)

在AQS的实现中,state是一个volatile修饰的int变量。volatile变量可以满足state操作内存可见性、有序性(内存屏障)的要求。但是volatile并不保证原子性,所以需要Unsale包的CAS操作来保证更新的原子性。

比较并交换(compare and swap, CAS),是原子操作的一种,可用于在多线程编程中实现不被打断的数据交换操作,从而避免多线程同时改写某一数据时由于执行顺序不确定性以及中断的不可预知性产生的数据不一致问题。 该操作通过将内存中的值与指定数据进行比较,当数值一样时将内存中的数据替换为新的值。
---- wikipedia

CLH Queue

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态。

CLH队列的优势在于它的入队和出队都是快速的无锁无障碍操作(CAS乐观锁机制),即使是在线程争用的情况下,也能保证有一个线程能快速入队出队。检测是否有线程在等待也是快速的,只需要判断head==tail就可以了,无需加锁。

在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread)、状态(waitStatus)、前驱节点(prev)、后继节点(next),其定义如下:

static final class Node {

       // 标记当前结点是共享模式
        static final Node SHARED = new Node();
       // 标记当前结点是独占模式 
       static final Node EXCLUSIVE = null;

        // 代表线程已经被取消
        static final int CANCELLED =  1;
        // 代表后续节点需要唤醒
        static final int SIGNAL    = -1;
        // 代表线程在condition queue中,等待某一条件
        static final int CONDITION = -2;
        // 代表后续结点会传播唤醒的操作,共享模式下起作用
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;

        // 指向条件队列的下一个结点,或者特殊的SHARED结点
        Node nextWaiter;
}

入队:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

可以看到,入队是用了CAS乐观锁来控制并发的。这里有一个小优化,先快速尝试入队,失败了再进入自旋循环反复尝试。
需要注意的是,compareAndSetTail(t, node)t.next = node这两步操作并非原子操作,也就是说在通过t.next并不一定能找到下一个结点。原始的CLH队列中仅仅是一个单向的队列(从tail到head),next是AQS的一个小优化,可以迅速找到下一个结点,找不到的时候,需要从tail结点回溯,因为t.next = node可能还没来得及执行。

出队:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

出队只需要设置新的head就可以了。

Block and Unblock

LockSupport类通过native方法提供了线程的阻塞(park)和唤醒(unpark)方法。并且,park方法还可以设置时间或者设置许可(blocker,等待对象),很明显设置超时时间可以用于解决锁的超时问题,设置许可可以解决锁的条件等待问题。

使用

AQS采用模板方法设计模式,AQS提供了大量的方法帮助我们实现同步,子类只需要继承它并根据需求选择实现它的部分抽象方法来管理抽象状态就可以了。

AQS提供了两种模式的同步——独占式和共享式,以及这两种模式下的同步状态获取方法(包括 acquire、acquireInterruptibly、tryAcquireNanos)和释放(release方法)。

独占式

独占式:同一时刻仅有一个线程持有同步状态。

acquire

acquire是最最常用的一种方式,基于AQS实现的锁一般实现lock方法都是直接调用这个方法。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
  • tryAcquire是由子类实现的,尝试获取锁,如果成功了返回true。该方法必须必须保证线程安全地对state进行操作。
  • addWaiter在上文中已经分析过了,将当前线程加入等待队列。
  • acquireQueued方法比较复杂,我们来看看源码:
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;

        // 自旋
        for (;;) {
            final Node p = node.predecessor();
            // 当前线程是head时,尝试获取,成功则返回
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 判断是否需要park,需要则park
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 检测到中断,记录在案,接着干我的(中断就中断,关我啥事)
                interrupted = true;
        }
    } finally {
        // 失败了,取消获取
        if (failed)
            cancelAcquire(node);
    }
}
  • selfInterrupt 获取失败,自行了断(中断)
acquireInterruptibly

acquire方法对中断不响应,对线程进行中断操作后,该线程会依然位于CLH同步队列中等待着获取同步状态。为了响应中断,AQS提供了acquireInterruptibly(int arg)方法,该方法在等待获取同步状态时,如果当前线程被中断了,会立刻响应中断抛出异常InterruptedException。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    // 检测中断
    if (Thread.interrupted())
        throw new InterruptedException();
    // 照例先尝试获取
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}
  • doAcquireInterruptibly
private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                // 跟acquireQueued唯一的区别在于,检测到中断,立刻抛出异常
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
tryAcquireNanos

tryAcquireNanostryAcquire的增强版,在响应中断的同时,增加了超时机制

public final boolean tryAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
}
  • doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout)
        throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    
    // 计算deadline
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            
            // 判断是否超时
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                // 使用LockSupport.parkNanos方法,超时自动唤醒
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

release

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
  • tryRelease 由子类实现,同步设置state
  • unparkSuccessor唤醒等待队列的head线程

共享式

共享式:在同一时刻可以有多个线程获取同步状态

acquireShared

acquireShared对应于acquire

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}
  • tryAcquireShared 也是由子类实现的,同步尝试修改state值
  • doAcquireShared
private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 跟doAcquire的不同在于此
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
  • setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 尝试唤醒后继的结点:<br />
     * propagate > 0说明许可还有能够继续被线程acquire;<br />
     * 或者 之前的head被设置为PROPAGATE(PROPAGATE可以被转换为SIGNAL)说明需要往后传递;<br />
     * 或者为null,我们还不确定什么情况。 <br />
     * 并且 后继结点是共享模式或者为如上为null。
     * <p>
     * 上面的检查有点保守,在有多个线程竞争获取/释放的时候可能会导致不必要的唤醒。<br />
     * 
     */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        // 后继结是共享模式或者s == null(不知道什么情况)
        // 如果后继是独占模式,那么即使剩下的许可大于0也不会继续往后传递唤醒操作
        // 即使后面有结点是共享模式。
        if (s == null || s.isShared())
            // 唤醒后继结点
            doReleaseShared();
    }
} 

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 队列不为空且有后继结点
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 不管是共享还是独占只有结点状态为SIGNAL才尝试唤醒后继结点
            if (ws == Node.SIGNAL) {
                // 将waitStatus设置为0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);// 唤醒后继结点
                // 如果状态为0则更新状态为PROPAGATE,更新失败则重试
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        // 如果过程中head被修改了则重试。
        if (h == head) // loop if head changed
            break;
    }
} 

tryAcquireSharedInterruptiblytryAcquireSharedNanostryAcquireInterruptiblytryAcquireNanos类似,都是在tryAcquireShared的基础上加了中断响应和超时机制,这里不做分析。

releaseShared
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
  • tryReleaseShared 同样的由子类实现
  • doReleaseShared 见上文分析

结论

AQS基于volatile关键字、cas操作、自旋锁、park和unpark支持等简单高效的操作,为java构建了一套高效、通用的同步框架。AQS的设计思想简单易懂,然而基于AQS的同步类却复杂高效,且用途各异,实在牛逼。
本文尚有不少欠缺,未能涉及Condition部分,也没有关于ReentrantLockSemaphore等基于AQS框架实现的同步器的分析,后续当继续分析。

参考

Java 源码
The java.util.concurrent Synchronizer Framework
java 7 doc
J.U.C之AQS
Java Concurrency In Practice

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容