Java AQS AbstractQueuedSynchronizer

更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》


AbstractQueuedSynchronizer (AQS)

AQS 的功能可以分为两类,独占功能共享功能
它的所有子类中:

  • 要么实现并使用了它独占功能的 API,例如 ReentrantLock RenntrantReadWriteLock
  • 要么使用了共享锁的功能,例如 CountDownLatch
  • 而不会同时使用两套 API,即便是它最有名的子类 ReentrantReadWriteLock,也是通过两个内部类:读锁和写锁,分别实现的两套API来实现的

AQS 的基本思想

AQS 使用标志位 + 队列的方式,记录获取锁、竞争锁、释放锁等一系列锁的状态。

  • 获取锁:判断当前状态 waitStatus 是否允许获取锁
    • 如果是就获取锁,修改当前状态,并且如果进了队列就从队列中移除
    • 否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败
    • 另外如果是阻塞线程,那么线程就需要进入阻塞队列
  • 释放锁:修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。

要支持上面两个操作就必须有下面的条件:

  • 原子性操作同步器的状态位volatile int waitStatus;

    • 它是一个 volatile 变量,确保了可见性
    • 使用 CAS 操作来更新 waitStatus,确保了原子性
    • waitStatus 可能的状态包括:
      • CANCELLED = 1: 节点操作因为超时或者对应的线程被 interrupt。节点不应该留在此状态,一旦达到此状态将从CHL队列中踢出。
      • SIGNAL = -1: 节点的继任节点是(或者将要成为)BLOCKED状态(例如通过 LockSupport.park() 操作),因此一个节点一旦被释放(解锁)或者取消就需要唤醒(LockSupport.unpack())它的继任节点。
        只有当前节点的前一个节点为 SIGNAL 时,才能当前节点才能被挂起。
      • CONDITION = -2:表明节点对应的线程因为不满足一个条件(Condition)而被阻塞。
      • 0: 正常状态,新生的非CONDITION节点都是此状态。
      • 非负值标识节点不需要被通知(唤醒)。
  • 阻塞和唤醒线程:在 JDK 5.0 以后利用 JNI 在 LockSupport 类中实现了此特性。
    LockSupport.park()
    LockSupport.park(Object)
    LockSupport.parkNanos(Object, long)
    LockSupport.parkNanos(long)
    LockSupport.parkUntil(Object, long)
    LockSupport.parkUntil(long)
    LockSupport.unpark(Thread)
    上面的API中 park() 是在当前线程中调用,导致线程阻塞,带参数的 Object 是挂起的对象,这样监视的时候就能够知道此线程是因为什么资源而阻塞的。

  • 一个有序的队列:采用 CHL 列表来解决有序的 FIFO 队列的问题

    • 对于入队列(enqueue):采用 CAS 操作,每次比较尾结点是否一致,然后插入的到尾结点中。
    • 对于出队列(dequeue):由于每一个节点也缓存了一个状态,决定是否出队列,因此当不满足条件时就需要自旋等待,一旦满足条件就将头结点设置为下一个节点。

AQS 核心字段

  • private volatile int state;:描述的有多少个线程取得了锁,对于互斥锁来说state<=1。
  • private transient volatile Node head;:等待队列的头
  • private transient volatile Node tail;:等待队列的尾,head 与 tail 构成了一个 FIFO 队列

Node 节点的属性包括:

  • volatile Node prev;:此节点的前一个节点。节点的 waitStatus 依赖于前一个节点的状态。
  • volatile Node next;:此节点的后一个节点。后一个节点是否被唤醒依赖于当前节点是否被释放。
  • volatile Thread thread;:节点绑定的线程。
  • volatile int waitStatus;:描述节点的状态
  • Node nextWaiter;:下一个等待条件(Condition)的节点,由于 Condition 是独占模式,因此这里有一个简单的队列来描述 Condition 上的线程节点。

AQS 独占锁

有且只有一个线程获取到锁,其余线程全部挂起,直到该拥有锁的线程释放锁,被挂起的线程被唤醒重新开始竞争锁。
例如 ReentrantLock RenntrantReadWriteLock就是独占锁。

比如 ReentrantLock 中锁的实现 Sync 继承了 AbstractQueuedSynchronizer,同时包括了公平锁 FairSync 和非公平锁NonfairSync

public class ReentrantLock implements Lock, java.io.Serializable {
    private static final long serialVersionUID = 7373984872572414699L;
    /** Synchronizer providing all implementation mechanics */
    private final Sync sync;

    /**
     * Base of synchronization control for this lock. Subclassed
     * into fair and nonfair versions below. Uses AQS state to
     * represent the number of holds on the lock.
     */
    abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
    ......

    static final class NonfairSync extends Sync {
    ......

    static final class FairSync extends Sync {
    ......

AQS 共享锁

例如 CountDownLatch 就是共享锁。
Java CyclicBarrier VS CountDownLatch 中关于 CountDownLatch 的示例中,使用了 private static CountDownLatch countDownLatch = new CountDownLatch(5); 就是表示该锁可能被 5 个线程共享。

ReentrantLock 类似,CountDownLatch 内部也有一个叫做 Sync 的内部类,同样也是用它继承了AbstractQueuedSynchronizer

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
  • 其中的构造方法 setState(count) 就是设置 private volatile int state;,描述的有多少个线程取得了锁`
  • 其中的 tryAcquireShared() 方法判断了 state 是否为 0,即计数器是否为 0。

引用:
深度解析Java 8:JDK1.8 AbstractQueuedSynchronizer的实现分析(上)
深度解析Java 8:AbstractQueuedSynchronizer的实现分析(下)
深入浅出 Java Concurrency (7): 锁机制 part 2 AQS

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容