Java中AQS基本实现原理

一、AQS概述

AQS全名AbstractQueuedSynchronizer,意为抽象队列同步器,JUC(java.util.concurrent包)下面的Lock和其他一些并发工具类都是基于它来实现的。AQS维护了一个volatile的state和一个CLH(FIFO)双向队列。

二、分析

state

state是一个由volatile修饰的int变量,它的访问方式有三种:

  • getState()
  • setState(int newState)
  • compareAndSetState(int expect, int update)
    /**
     * 由volatile修饰的state
     */
    private volatile int state;

    /**
     * 基于内存可见性的读
     */
    protected final int getState() {
        return state;
    }

    /**
     * 基于内存可见性的写
     */
    protected final void setState(int newState) {
        state = newState;
    }

    /**
     * 使用CAS+volatile,基于原子性与可见性的对state进行设值
     */
    protected final boolean compareAndSetState(int expect, int update) {
        // 使用Unsafe类,调用JNI方法
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

资源获取主要有两种形式:

  • 独占式(EXCLUSIVE)

仅有一个线程能在同一时刻获取到资源并处理,如ReentrantLock的实现。

  • 共享式(SHARED)

多个线程可以同时获取到资源并处理,如Semaphore/CountDownLatch等。

AQS中大部分逻辑已经被实现,集成类只需要重写state的获取(acquire)与释放(release)方法,因为在AQS中,这些方法默认定义的实现方式都是抛出不支持操作异常,所以按需实现即可。

其中需要继承类重写的方法有:

  • tryAcquire(int arg)

此方法是独占式的获取资源方法,成功则返回true,失败返回false。

  • tryRelease(int arg)

此方法是独占式的释放资源方法,成功则返回true,失败返回false。

  • tryAcquireShared(int arg)

此方法是共享式的获取资源方法,返回负数表示失败,0表示获取成功,但是没有可用资源,正数表示获取成功,且有可用资源。

  • tryReleaseShared(int arg)

此方法是共享式的释放资源方法,如果允许唤醒后续等待线程则返回true,不允许则返回false。

  • isHeldExclusively()

判断当前线程是否正在独享资源,是则返回true,否则返回false。

CLH(FIFO)队列

AQS中是通过内部类Node来维护一个CLH队列的。源码如下:

 static final class Node {
        /** 标记共享式访问 */
        static final Node SHARED = new Node();
        /** 标记独占式访问 */
        static final Node EXCLUSIVE = null;

        /** 字段waitStatus的值,表示当前节点已取消等待 */
        static final int CANCELLED =  1;
        /**字段waitStatus的值,表示当前节点取消或释放资源后,通知下一个节点 */
        static final int SIGNAL    = -1;
        /** 表示正在等待触发条件 */
        static final int CONDITION = -2;
        /**
         * 表示下一个共享获取应无条件传播
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     表示当前节点取消或释放资源后,通知下一个节点
         *   CANCELLED:  表示当前节点已取消等待
         *   CONDITION:  表示正在等待触发条件
         *   PROPAGATE:  表示下一个共享获取应无条件传播
         */
        volatile int waitStatus;

        /**
         * 前节点
         */
        volatile Node prev;

        /**
         * 下一个节点
         */
        volatile Node next;

        /**
         * 节点对应线程
         */
        volatile Thread thread;

        /**
         * 下一个等待的节点
         */
        Node nextWaiter;

        /**
         * 是否是共享式访问
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * 返回前节点
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // 共享式访问的构造函数
        }

        Node(Thread thread, Node mode) {     // 用于被添加等待者使用
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // 用于Condition使用
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

独占模式-获取资源

使用AQS中的acquire(int arg)方法

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

该方法分为4个部分:

  • tryAcquire()

需要自己实现的方法,如果获取到资源使用权,则返回true,反之fasle。如果获取到资源,返回true,!true为false,根据&&的短路性,则不会执行后续方法,直接跳过程序。如果未获取到资源,返回false,!false为true,则进入后续方法。

  • addWaiter()

如果未获取到资源使用权,则首先会调用此方法。上源码:

    private Node addWaiter(Node mode) {
        // 封装当前线程和独占模式
        Node node = new Node(Thread.currentThread(), mode);
        // 获取尾部节点
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // CAS设置尾部节点
            if (compareAndSetTail(pred, node)) {
                // 将为节点的下一节点指向当前node
                pred.next = node;
                return node;
            }
        }
        // 如果尾结点为空或者设置尾结点失败
        enq(node);
        return node;
    }
 private Node enq(final Node node) {
        // 如果CAS设置未成功则死循环
        for (;;) {
            // 获得尾结点
            Node t = tail;
            // 如果尾节点为空,说明CLH队列为空,需要初始化
            if (t == null) { 
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 设置当前节点的前驱节点
                node.prev = t;
                // CAS设置当前节点为尾结点
                if (compareAndSetTail(t, node)) {
                    // 设置后驱节点
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • acquiredQueued()
    final boolean acquireQueued(final Node node, int arg) {
        // 标识资源获取是否失败
        boolean failed = true;
        try {
            // 标识线程是否中断
            boolean interrupted = false;
            for (;;) {
                // 获得当前节点的前驱节点
                final Node p = node.predecessor();
                // 如果前驱节点为头结点,说明快到当前节点了,尝试获取资源
                if (p == head && tryAcquire(arg)) {
                    // 获取资源成功
                    // 设置当前节点为头结点
                    setHead(node);
                    // 取消前驱节点(以前的头部)的后节点,方便GC回收
                    p.next = null; // help GC
                    // 标识未失败
                    failed = false;
                    // 返回中断标志
                    return interrupted;
                }
                // 如果当前节点的前驱节点不是头结点或获取资源失败
                // 需要用shouldParkAfterFailedAcquire函数判断是否需要阻塞该节点持有的线程
                // 如果需要阻塞,则执行parkAndCheckInterrupt方法,并设置被中断
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            // 如果最终获取资源失败
            if (failed)
                // 当前节点取消获取资源
                cancelAcquire(node);
        }
    }
  • selfInterrupt()

中断当前线程

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

独占模式-释放资源

release() 释放资源并唤醒后继线程

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            // 获取头结点
            Node h = head;
            // 头结点不为空且等待状态值不为0
            if (h != null && h.waitStatus != 0)
                // 唤醒后续等待线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        // 如果等待状态值小于0
        if (ws < 0)
            // 使用CAS将waitStatus设置为0
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        // 如果当前节点没有后继节点或者后继节点放弃竞争资源
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 从队列尾部循环直到当前节点,找到最近的且等待状态值小于0的节点
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        // 如果找到的后继节点不为空,则唤醒其持有的线程
        if (s != null)
            LockSupport.unpark(s.thread);
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,509评论 6 504
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,806评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,875评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,441评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,488评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,365评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,190评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,062评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,500评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,706评论 3 335
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,834评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,559评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,167评论 3 328
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,779评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,912评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,958评论 2 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,779评论 2 354