1.3.1 AQS抽象队列同步器详解

什么是AQS

AQS(AbstractQueuedSynchronizer),AQS是JDK下提供的一套用于实现基于FIFO等待队列的阻塞锁和相关的同步器的一个同步框架。这个抽象类被设计为作为一些可用原子int值来表示状态的同步器的基类。如果你有看过类似 CountDownLatch 类的源码实现,会发现其内部有一个继承了 AbstractQueuedSynchronizer 的内部类 Sync 。可见 CountDownLatch 是基于AQS框架来实现的一个同步器.类似的同步器在JUC下还有不少。(eg. Semaphore )

AQS用法

如上所述,AQS管理一个关于状态信息的单一整数,该整数可以表现任何状态。比如, Semaphore 用它来表现剩余的许可数,ReentrantLock 用它来表现拥有它的线程已经请求了多少次锁;FutureTask 用它来表现任务的状态(尚未开始、运行、完成和取消)

如JDK的文档中所说,使用AQS来实现一个同步器需要覆盖实现如下几个方法,并且使用getState,setState,compareAndSetState这几个方法来设置获取状态

  1. boolean tryAcquire(int arg)
  2. boolean tryRelease(int arg)
  3. int tryAcquireShared(int arg)
  4. boolean tryReleaseShared(int arg)
  5. boolean isHeldExclusively()
    以上方法不需要全部实现,根据获取的锁的种类可以选择实现不同的方法

J.U.C是基于AQS(AbstractQueuedSynchronizer)实现的,AQS是一个同步器,设计模式是模板模式。
核心数据结构:双向链表 + state(锁状态)
底层操作:CAS

在这里插入图片描述

首先,我们根据API的方法功能,由我们前面阶段学习的知识进行一个自己定义的AQS,来加深印象。

// 抽象队列同步器
// state, owner, waiters
public class kfAqs {
    // acquire、 acquireShared : 定义了资源争用的逻辑,如果没拿到,则等待。
    // tryAcquire、 tryAcquireShared : 实际执行占用资源的操作,如何判定一个由使用者具体去实现。
    // release、 releaseShared : 定义释放资源的逻辑,释放之后,通知后续节点进行争抢。
    // tryRelease、 tryReleaseShared: 实际执行资源释放的操作,具体的AQS使用者去实现。

    // 1、 如何判断一个资源的拥有者
    public volatile AtomicReference<Thread> owner = new AtomicReference<>();
    // 2、 保存正在等待的线程
    public volatile LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();
    // 3、 记录资源的状态
    public volatile AtomicInteger state = new AtomicInteger(0);

    // 共享资源占用的逻辑,返回资源的占用情况
    public int tryAcquireShared(){
        throw new UnsupportedOperationException();
    }
    
    public void acquireShared(){
        boolean addQ = true;
        while(tryAcquireShared() < 0) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    // 共享资源释放的逻辑,返回资源是否已释放
    public boolean tryReleaseShared(){
        throw new UnsupportedOperationException();
    }

    public void releaseShared(){
        if (tryReleaseShared()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    // 独占资源相关的代码

    public boolean tryAcquire() { // 交给使用者去实现。 模板方法设计模式
        throw new UnsupportedOperationException();
    }

    public void acquire() {
        boolean addQ = true;
        while (!tryAcquire()) {
            if (addQ) {
                // 没拿到锁,加入到等待集合
                waiters.offer(Thread.currentThread());
                addQ = false;
            } else {
                // 阻塞 挂起当前的线程,不要继续往下跑了
                LockSupport.park(); // 伪唤醒,就是非unpark唤醒的
            }
        }
        waiters.remove(Thread.currentThread()); // 把线程移除
    }

    public boolean tryRelease() {
        throw new UnsupportedOperationException();
    }

    public void release() { // 定义了 释放资源之后要做的操作
        if (tryRelease()) {
            // 通知等待者
            Iterator<Thread> iterator = waiters.iterator();
            while (iterator.hasNext()) {
                Thread next = iterator.next();
                LockSupport.unpark(next); // 唤醒
            }
        }
    }

    public AtomicInteger getState() {
        return state;
    }

    public void setState(AtomicInteger state) {
        this.state = state;
    }
}

资源占用流程图

在这里插入图片描述

源码解析

static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;// 等待超时或被中断
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;// 释放锁之后,是否通知后一个节点
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;// 处于等待队列中,结点的线程等待在Condition上
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;// 共享模式中使用,线程处于可运行状态
  //核心数据结构:双向链表 + state(锁状态)
//资源争用的逻辑
public final void acquire(int arg) {
        if (!tryAcquire(arg) && // 判断是否拿到锁
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}
static void selfInterrupt() {
        Thread.currentThread().interrupt();
}
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; // 当前线程释放中断的标志位
            for (;;) {// 不断尝试
                final Node p = node.predecessor(); // 获取前一个节点
                if (p == head && tryAcquire(arg)) { // 如果前一个节点是head,尝试抢一次锁
                    setHead(node); // 更换head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&// 检查状态,是否需要挂起线程
                    parkAndCheckInterrupt())// 如果需要挂起,则通过Park进入停车场挂起
                    interrupted = true; // 如果出现中断,则修改标记
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
}
//资源释放的逻辑
public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head; // 从头开始找
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h); // 唤醒下一个线程
            return true;
        }
        return false;
}
protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
}
/** 唤醒等待者
     * Wakes up node's successor, if one exists.
     *
     * @param node the node
     */
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus; // 正在释放锁的线程节点状态
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0); // 修改当前节点状态

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next; // 找下一个节点
        if (s == null || s.waitStatus > 0) { // 如果不存在或者被取消了,继续寻找合适的下一个节点
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null) // 如果找到了合适的节点,就唤醒它
            LockSupport.unpark(s.thread);
    }
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,591评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,448评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,823评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,204评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,228评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,190评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,078评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,923评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,334评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,550评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,727评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,428评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,022评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,672评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,826评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,734评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,619评论 2 354

推荐阅读更多精彩内容