Java并发编程 AQS

1.为什么需要AQS

如果没有AQS,就需要每个协作工具自己实现:

  • 同步状态的原子性管理
  • 线程的阻塞与解除阻塞
  • 队列的管理
    在并发场景下,自己正确且高效实现这些内容,是相当有难度的,所以我们用AQS来帮我们把这些事情搞定,我们只关注业务逻辑就够了。

2.AQS的作用

AQS是一个用于构建锁、同步器、协作工具类的工具类(框架)。有了AQS以后,更多的协作工具类都可以很方便的被写出来。
有了AQS,构建线程协作类就容易多了。

比如:
Semaphore和AQS的关系
Semaphore内部有一个Sync类,Sync类继承了AQS

3.AQS的重要性、地位

AbstractQueuedSynchronizer是Doug Lea写的,从JDK1.5加入的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,我们用IDE看AQS的实现类,可以发现实现类有以下这些:


image.png

4.AQS内部原理解析

AQS最核心的就是三大部分:

  • state(在不同实现子类中,有不同的含义)
  • 控制线程抢锁和配合的FIFO队列
  • 期望协作工具类去实现的获取/释放等重要方法
4.1state状态
  • 这里的state的具体含义,会根据具体实现类的不同而不同,比如在Semaphore里,它表示“剩余的许可证的数量”,而在CountDownLatch里,他表示“还需要倒数的数量”
  • state是volatile修饰的,会被并发地修改,所以所有修改state的方法都需要保证线程安全,比如getState、setState以及compareAndSetState操作来读取和更新这个状态。这些方法依赖于j.u.c.atomic包的支持
   protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }
4.1 state状态
  • 在ReentrantLock中
  • state用来表示“锁”的占用情况,包括可重入计数
  • 当state的值为0的时候,标识改Lock不被任何线程所占用
4.2 控制线程抢锁和配合的FIFO队列
  • 这个队列用来存放“等待的线程”,AQS就是“排队管理器”,当多个线程争用同一把锁时,必须由排队机制将那些没能拿到锁的线程串在一起。当锁释放时,锁管理器就会挑选一个合适的线程占有这个刚刚释放的锁。
  • AQS会维护一个等待的线程队列,把线程都放到这个队列里
  • 这是一个双向形式的队列
image.png
4.3 期望协作工具类实现的获取/释放等重要方法

这里的获取和释放方法,是利用AQS的协作工具类里最重要的方法,是由协作类自己去实现的,并且含义各不相同

获取方法
  • 获取操作会依赖state变量,经常会阻塞(比如获取不到锁的时候)
  • 在Semaphore中,获取就是acquire方法,作用是获取一个许可证
  • 在CountDownLatch里面,获取就是await方法,作用是“等待,直到倒数结束”
释放方法
  • 释放操作不会阻塞
  • 在Semaphore中,释放就是release方法·,作用是释放一个许可证
  • CountDownLatch里面,获取就是countDown方法,作用是“倒数1个数”
还需要重写tryAcquire方法和tryRelease等方法

5.AQS应用实例、源码解析

AQS在ReentrantLock的应用

第一步:写一个来,想好协作的逻辑,实现获取/释放方法
第二步:内部写一个Sync类继承AbstractQueuedSynchronizer
第三步:根据是否独占重写tryAcquire/tryRelease或tryAcquireShared() 和tryReleaseShared()等方法,在之前写的获取/释放方法中调用AQS的acquire/release或者Shared方法

AQS在CountDownLatch的应用
* 构造函数
   public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
   Sync(int count) {
            setState(count);
        }

调用AbstractQueuedSynchronizer

    protected final void setState(int newState) {
        state = newState;
    }
* getCount
    public long getCount() {
        return sync.getCount();
    }

调用AbstractQueuedSynchronizer

    protected final int getState() {
        return state;
    }
* countDown
    public void countDown() {
        sync.releaseShared(1);
    }

CountDownLatch内部Sync类实现tryReleaseShared

       protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }

调用AbstractQueuedSynchronizer

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

把所有等待的线程去唤醒

 private void doReleaseShared() {

        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }
* await
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

CountDownLatch内部Sync类实现tryAcquireShared

       protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

调用AbstractQueuedSynchronizer

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

doAcquireSharedInterruptibly把当前线程放在阻塞队列中进行等待

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

parkAndCheckInterrupt方法

  private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }

把当前线程挂起 进入阻塞状态

AQS在CountDownLatch的总结

调用CountDownLatch的await方法时,便会尝试获取“共享锁”,不过一开始是获取不到该锁的,于是线程被阻塞。
“共享锁”可获取到的条件,就是“锁计数器”的值为0
“锁计数器”的初始值为count,每当一个线程调用该CountDownLatch对象的countDown()方法时,才将“锁计数器”-1

AQS在Semaphore的应用
* acquire
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

调用AbstractQueuedSynchronizer

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

Semaphore中内部类

  /**
     * NonFair version
     */
    static final class NonfairSync extends Sync {
        private static final long serialVersionUID = -2694183684443567898L;

        NonfairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
    }

    /**
     * Fair version
     */
    static final class FairSync extends Sync {
        private static final long serialVersionUID = 2014338818796000944L;

        FairSync(int permits) {
            super(permits);
        }

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }

nonfairTryAcquireShared 检查剩余许可证数量够不够

      final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
* release
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

调用AbstractQueuedSynchronizer 中release方法

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

CountDownLatch内部Sync类实现tryReleaseShared

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

cas设置state值

AQS在Semaphore的总结
  • 在Semaphore中,state表示许可证的剩余数量
  • 看tryAcquire方法,判断nonfairTryAcquireShared大于等于0的话,代表成功
  • 这里会先检查剩余许可证数量够不够这次需要的,用减法来计算,如果直接不够,那就返回负数,表示失败,如果够了,就用自旋加compareAndSetState来改变state状态,直到改变成功就返回正数;或者是期间如果被其他人修改了导致剩余数量不够了,那也返回负数代表获取失败。
AQS在ReentrantLock的应用
  • 分析释放锁的方法tryRelease

ReentrantLock

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

调用AbstractQueuedSynchronizer 中release方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantLock 中内部类Sync类

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

unparkSuccessor方法表示唤醒其他线程

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

getState 代表重入的次数
free = true; 表示锁是自由状态

  • 分析释放锁的方法tryAcquire
   public void lock() {
        sync.lock();
    }
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

setExclusiveOwnerThread 表示设置互斥独有线程

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

添加等待节点

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

acquireQueued 获取的线程处于互斥的不可中断模式队列。用于条件等待方法以及获取。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
ReentrantLock释放锁分析

由于是可重入的,所以state代表重入的次数,每次释放锁,先判断是不是当前持有的锁的线程释放的,如果不是就抛异常,如果是的话,重入次数就减1,如果见到了0,就说明完全释放了,于是free就是true,并且把state设置为0.

ReentrantLock加锁分析

第一步:调用子类实现的获取锁的方法 tryAcquire 成功则结束if
Reentrantlock中两个子类的尝试修改锁状态方法
非公平 不判断当前是否还有其他节点在队列中 hasQueuedPredecessors
第二步:失败 进去 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
addWaiter 将当前线程节点入队 先cas快速入队 失败进入循环cas入队
第三步 acquireQueued 尝试将线程阻塞 需要循环向前查找非取消的节点 然后才能进行阻塞 即 前一个节点非cancel 然后设置成SIGNAL 状态 这样前节点的线程如果释放了同步状态或者被取消时候会通知本线程

调用lock方法,存在竞争的时候,T2会去入队,首先会初始化一个空节点,t2节点实际上存放的是第二个位置,t3进来的时候继续在后面排队,
t2和t3都是调用park方法进行阻塞。入队的时候会将前面的节点的waitstatus状态由0改为-1。在调用unlock的时候会将waitstatus不等于0的释放。

6.AQS实现一个自己的Latch门闩

public class OneShotLatch {

    private final Sync sync = new Sync();

    public void signal() {
        sync.releaseShared(0);
    }
    public void await() {
        sync.acquireShared(0);
    }

    private class Sync extends AbstractQueuedSynchronizer {

        @Override
        protected int tryAcquireShared(int arg) {
            return (getState() == 1) ? 1 : -1;
        }

        @Override
        protected boolean tryReleaseShared(int arg) {
           setState(1);

           return true;
        }
    }


    public static void main(String[] args) throws InterruptedException {
        OneShotLatch oneShotLatch = new OneShotLatch();
        for (int i = 0; i < 5; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                    oneShotLatch.await();
                    System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
                }
            }).start();
        }
        Thread.sleep(5000);
        oneShotLatch.signal();

        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+"尝试获取latch,获取失败那就等待");
                oneShotLatch.await();
                System.out.println("开闸放行"+Thread.currentThread().getName()+"继续运行");
            }
        }).start();
    }
}
Thread-2尝试获取latch,获取失败那就等待
Thread-3尝试获取latch,获取失败那就等待
Thread-0尝试获取latch,获取失败那就等待
Thread-1尝试获取latch,获取失败那就等待
Thread-4尝试获取latch,获取失败那就等待
开闸放行Thread-2继续运行
开闸放行Thread-0继续运行
开闸放行Thread-3继续运行
开闸放行Thread-4继续运行
开闸放行Thread-1继续运行
Thread-5尝试获取latch,获取失败那就等待
开闸放行Thread-5继续运行
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,904评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,581评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,527评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,463评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,546评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,572评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,582评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,330评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,776评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,087评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,257评论 1 344
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,923评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,571评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,192评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,436评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,145评论 2 366
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,127评论 2 352

推荐阅读更多精彩内容

  • Java并发编程——AQS源码解析 什么是AQS AQS有什么用 AQS实现方式 一、AQS是什么? AQS是一个...
    Ant_way阅读 597评论 0 4
  • 为什么需要AQS 锁和协作类有共同点:闸门像ReentrantLock和Semaphore有一些共同点,并且很相似...
    小波同学阅读 1,423评论 0 10
  • AbstractQueuedSynchronizer是为实现依赖于先进先出 (FIFO) 等待队列的阻塞锁和相关同...
    FantJ阅读 906评论 0 1
  • 概述 在前面我们初步认识了一下lock与AQS的基础内容,在同步组件的实现中,AQS是核心部分,我们通过AQS提供...
    长大后简单很幸福_f63e阅读 476评论 1 3
  • 激动啊,太激动了,收到我当年梦想的机器,2003年神器啊,当年卖七千多啊,那时候工资才千元多啊,蔡司镜头啊,2.0...
    小庄_94e5阅读 163评论 0 1