JAVA 多线程与高并发学习笔记(十四)——AQS核心原理

基于 CAS自旋实现的轻量级锁在恶性自旋时会消费大量的CPU资源。解决这个问题有2种方案:分散操作热点和使用队列削峰。JUC并发包使用的是队列削峰的方案解决CAS性能问题,并提供了一个基于双向队列的削峰基类——抽象基础类AbstractQueuedSyncronizer(抽象同步器类,简称为AQS)。

AQS的内部队列

AQS是CLH队列的一个变种,原理和CLH类似。AQS队列内部维护的是一个FIFO双向链表,其中的每个节点其实是由线程封装的,当线程争抢锁失败后会封装成节点加入AQS队列中:当获取锁的线程释放锁以后,会从队列中唤醒一个阻塞的节点。

AQS的内部结构可以参考下图。

AQS内部结构.png

AQS核心成员

AQS基本模板模式实现。AQS为锁获取、锁释放的排队和出队过程提供了一系列的模板方法。由于JUC的显式锁种类丰富,因此AQS将不同锁的具体操作抽取为钩子方法,供各类锁去实现。

状态标志位

AQS 中维持了一个单一的 volatile 修饰的状态信息是 state,AQS使用int类型的state标志锁的状态,可以理解为锁的同步状态。

// 同步状态
private volatile int state;

state 因为使用了 volatile 保证了操作的可见性,AQS提供了 getState()setState() 来获取和设置同步状态。

// 获取同步状态
protected final int getState() {
    return state;
}

// 设置同步状态
protected final void setState(int newState) {
    state = neWState;
}

// 通过CAS设置同步状态
protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.comapreAndSwapInt(this, stateOffset, expect, update);
}

由于 setState() 无法保证原子性,因此AQS给我们提供了 compareAndSetState() 方法利用底层 Unsafe 的 CAS机制来实现原子性。

AbstractQueueSynchronizer 继承了 AbstarctOwnableSynchronizer,这个基类只有一个变量叫 exclusiveOwnerThread,表示当前占用该锁的线程,并且提供了相应的 get()set() 方法。具体如下:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {

    // 表示当前占用该锁的线程
    private transient Thread exlusiveOwnerThread;

    ...
}

队列节点类

AQS是一个虚拟队列,不存在队列实例,仅存在节点之间的前后关系。节点类型通过内部类 Node 定义。

static final class Node {

    // 节点等待状态值:取消状态
    static final int CANCELLED = 1;
    // 节点等待状态值:标识后继线程处于等待状态
    static final int SIGNAL = -1;
    // 节点等待状态值:标识当前线程正在进行条件等待
    static final int CONDITION = -2;
    // 节点等待状态值:标识下一次共享锁的acquireShared操作需要无条件传播
    static final int PROPAGATE = -3;
    // 节点状态
    // 普通的同步节点初始值为0,条件等待节点的初始值为CONDITION(-2)
    volatile int waitState;
    // 节点所对应的线程,为抢锁线程或者条件等待线程
    volatile Thread thread;
    // 前驱节点
    volatile Node prev;
    // 后继节点
    volatile Node next;
    // 此属性指向下一个条件等待节点
    Node nextWaiter;
}

waitStatus 属性

每个节点与等待线程关联,每个节点维护一个状态 waitStatus,它的各个常量值具体如下:

  • CANCELLED(1)。1表示该线程节点已经释放(超时、中断),已取消的节点不会再阻塞,不会参与竞争,会一直保持取消状态。
  • SIGNAL(-1)。表示其后继的节点处于等待状态,当前节点对应的线程如果释放了同步状态或者被取消,就会通知后继节点,使后继节点的线程得以运行。
  • CONDITION(-2)。表示该线程在条件队列中阻塞,表示节点在等待队列中。当持有锁的线程调用了CONDITION的signal() 方法之后,节点会从该CONDITION的等待队列转移到该锁的同步队列上,去竞争锁。
  • PROPAGATE(-3)。表示下一个线程获取共享锁后,自己的共享状态会被无条件地传播下去,因为共享锁可能出现同时有N个锁可以用。
  • 0。表示当前节点处于初始状态。

thread成员

Node 的 thread 成员用来存放进入AQS队列中的线程引用,Node的nextWaiter成员用来执行自己的后继等待节点。

抢占类型常量标识

Node 节点还定义了两个抢占类型常量标识,SHARED和EXCLUSIVE,具体如下:

// 标识节点在抢占共享锁
static final Node SHARED = new Node();

// 表示节点在抢占独占锁
static final Node EXCLUSIVE = null;

FIFO双向同步队列

AQS通过内置的FIFO双向队列来完成线程的排队工作,内部通过节点 head 和 tail 记录队首和队尾元素,元素的节点类型为 Node 类型,具体如下:

/* 首节点的引用 */
private transient volatile Node head;

/* 尾节点的引用 */
private transient volatile Node tail;

AQS 的首尾节点都是懒加载的,需要的时候才真正创建。只有在线程竞争失败的情况下,有新线程加入同步队列时,AQS才创建一个head节点。尾节点只在有新线程阻塞时才被创建。

JUC显式锁和AQS的关系

AQS是一个同步器,它实现了锁的基本抽象功能,支持独占锁与共享锁两种方式,该类是使用模板模式来实现的,成为构建锁和同步器的框架,使用该类可以简单且高效地构造出应用广泛的同步器。

JUC中的显式锁、线程同步工具等,内部都使用了AQS作为等待队列。

AQS中的模板模式

模板模式

模板模式是类的行为模式。准备一个抽象类,将部分的逻辑以具体方法的形式实现,然后声明一些抽象方法来迫使子类实现剩余的逻辑。不同的子类提供不同的方式实现这些抽象方法,从而对剩余的逻辑有不同的实现。

模板模式的关键在于:父类提供框架性的公共逻辑,子类提供个性化的定制逻辑。

模板方法(Template Method)也被称为骨架方法,主要定义了整个方法需要实现的业务操作的算法框架。

钩子方法(Hook Method)是被模板方法的算法框架所调用的,由子类提供具体的实现方法。

AQS的模板流程

AQS定义了两种资源共享方式:

  • Exclusive(独占锁):只有一个线程能占有锁资源。
  • Share(共享锁):多个线程可以同时占有锁资源。

AQS 为不同的资源共享方式提供了不同的模板流程。

AQS中的钩子方法

自定义同步器时,AQS中需要重写的钩子方法大致如下:

  • tryAcquire(int):独占锁钩子,尝试获取资源,若成功则返回true。
  • tryRelease(int):独占锁钩子,尝试释放资源,若成功则返回true。
  • tryAcquireShared(int):共享锁钩子,尝试获取资源,负数表示失败,0表示成功,但没有剩余可用资源,正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享锁钩子,尝试释放资源,若成功则返回true。
  • isHeldExclusively():独占锁钩子,判断该线程是否正在独占资源。

以上钩子的默认实现会抛出 UnsupportOperationException 异常。

通过AQS实现一把简单的独占锁

本部分模拟 ReentrantLock 实现一个简单的独占锁,真实的 ReentrantLock 要复杂很多。


public class SimpleMockLock implements Lock {
    private final Sync sync = new Sync();
    @Override
    public void lock() {
        sync.acquire(1);
    }

    @Override
    public void unlock() {
        sync.release(1);
    }
    

    private static class Sync extends AbstractQueuedSynchronizer {

        // 钩子方法
        protected boolean tryAcquire(int arg) {
            // CAS更新状态值为1
            if(compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 钩子方法
        protected boolean tryRelease(int arg) {
            if(Thread.currentThread() != getExclusiveOwnerThread()) {
                throw new IllegalMonitorStateException();
            }
            
            if(getState() == 0) {
                throw new IllegalMonitorStateException();
            }
            
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
    }
}

AQS抢占锁的原理

下面通过AQS中的方法讲解一下AQS抢占锁的原理。

AQS模板方法:acquire

acquire 是AQS 封装好的获取资源的公共入口,它是AQS提供的利用独占的方式获取资源的方法,编码实现如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

acquire 至少执行一次 tryAcquire 钩子方法。若调用成功,则 acquire 直接返回,表示已经抢到锁,若不成功,则将线程加入等待队列。

钩子实现: tryAcquire

tryAcquire 是需要实现的钩子方法,可以参照我们前面实现的 SimpleMockLock。

直接入队:addWaiter

在 acquire 模板方法中,如果钩子方法 tryAcquire 尝试获取同步状态失败的话,就构造同步节点,通过 addWaiter 方法将该节点加入同步队列的队尾。

private Node addWaiter(Node mode) {
    // 创建新节点
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    // 加入队列尾部,将目前的队列tail作为自己的前驱节点pred
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // 现场时通过AQS方式修改尾节点为最新的节点
        // 如果修改陈宫,将节点加入队列的尾部
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 第一次尝试添加尾部失败,意味着有并发抢锁发生,需要进行自旋
    enq(node);
    return node;
}

addWaiter 方法中,首先需要构造一个 Node 对象,其中有两个参数,第一个是当前线程,第二个是Node节点,可能值为 SHARED 或 EXCLUSIVE。

自旋入队:enq

addWaiter 第一次尝试在尾部加节点失败,意味着有并发抢锁发生,需要进行自旋,enq 方法通过 CAS 自旋将节点添加到队列尾部。

/**
 * 这里进行了循环,如果此时存在tail,就执行添加新队尾的操作
 * 如果依然不存在,就把当前线程作为 head 节点
 * 插入节点后,调用acquireQueued() 进行阻塞
 */
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            // 队列为空,初始化尾节点和头节点为新节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

/**
 * CAS 操作head指针,仅仅被enq()调用
 */
private final boolean compareAndSetHead(Node update) {
    return unsafe.compareAndSwapObject(this, headOffset, null, update);
}

/**
 * CAS 操作tail指针,仅仅被enq()调用
 */
private final boolean compareAndSetTail(Node expect, Node update) {
    return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}

自旋抢占:acquireQueued

在节点入队之后,启动自旋抢锁的流程,aquireQueued方法的主要逻辑:当前Node节点线程在死循环中不断获取同步状态,并且不断在前驱节点上自旋,只有当前驱节点是头节点时才能尝试获取锁,原因是:

  1. 头结点是成功获取同步状态(锁)的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头节点。
  2. 维护同步队列的FIFO原则,节点进入同步队列之后,就进入了自旋的过程,每个节点都在不断地执行for死循环。
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            // 自选检查当前节点的前驱节点是否为头节点,才能获取锁
            for (;;) {
                // 获取节点的前驱节点
                final Node p = node.predecessor();
                // 节点中的线程循环地检查自己的前驱节点是否为head节点
                // 前驱节点是head时,进一步调用子类的 tryAcquire 实现
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 检查前一个节点的状态,预判当前获取锁失败的线程是否要挂起
                // 如果需要挂起,调用 parkAndCheckInterrupt 方法挂起当前线程,直到被唤醒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true; // 若两个操作都是true,则置true
            }
        } finally {
            // 如果等待过程中没有成功获取资源,
            // 那么取消节点在队列中的等待
            if (failed)
                // 取消等待,将当前节点从队列中移除
                cancelAcquire(node);
        }
    }

为了不浪费资源,acquireQueued自旋过程中会阻塞线程,等待被前驱节点唤醒后才启动循环。如果成功就返回,否则执行 shouldParkAfterFailedAcquire、parkAndCheckInterrupt来达到阻塞的效果。

挂起预判:shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire 方法的主要功能是,将当前节点的有效前驱节点(不是CANCELLED类型的节点)找到,并且将有效前驱节点的状态设置为SIGNAL,之后返回true代表当前线程可以马上被阻塞了。

具体分为三种情况:

  1. 如果前驱节点的状态为-1(SIGNAL),说明前驱的等待标志已设好,返回true表示设置完毕。
  2. 如果前驱节点的状态为1(CANCELLED),说明前驱节点本身不再等待了,需要跨越这些节点,然后找到一个有效节点,再把当前节点和这个有效节点的唤醒关系建立好,调整前驱节点的next指针为自己。
  3. 如果其他情况:-3、-2、0,那么通过CAS尝试设置前驱节点为SIGNAL,表示只要前驱节点释放锁,当前节点就可以抢占锁了。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus; // 获取前驱节点的状态
    if (ws == Node.SIGNAL)
        /*
            * This node has already set status asking a release
            * to signal it, so it can safely park.
            */
        return true;
    if (ws > 0) {
        /*
            * Predecessor was cancelled. Skip over predecessors and
            * indicate retry.
            */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
            * waitStatus must be 0 or PROPAGATE.  Indicate that we
            * need a signal, but don't park yet.  Caller will need to
            * retry to make sure it cannot acquire before parking.
            */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

线程挂起:parkAndCheckInterrupt

parkAndCheckInterrupt 的主要任务是暂停当前线程,具体如下:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 调用park()使线程进入waiting状态
    return Thread.interrupted(); // 如果被唤醒,查看自己是否已经被中断
}

AQS 会把所有等待的线程构成一个阻塞等待队列,当一个线程执行完 lock.unlock(),会激活其后继节点,通过 LockSupport.unpark(postThread) 完成后继线程的唤醒。

AQS 的两个关键点:节点的入队和出队

理解AQS的一个关键点是掌握节点的入队和出队。

节点的自旋入队

节点在第一次入队失败后,就会开始自旋入队,分为以下两种情况:

  1. 如果AQS的队列非空,新节点入队的插入位置在队列的尾部,并且通过CAS方式插入,插入之后AQS的tail将指向新的尾节点。

  2. 如果AQS的队列为空,新节点入队时,AQS 通过 CAS 方法将新节点设置为头节点 head,并将 tail 指针指向新节点。

private Node enq(final Node node) {
    for (;;) { // 自旋入队
        Node t = tail;
        if (t == null) { 
            // 队列为空,初始化尾节点和头节点为新节点
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 如果队列不为空,将新节点插入队列尾部
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

节点的出队

节点的出队算法在 acquireQueued() 方法中,这是一个模板方法,acquireQueued() 方法不断在前驱节点上自旋(for 循环),如果前驱节点是头节点并且当前线程使用钩子方法 tryAcquire() 获得了锁,就移除头节点,将当前节点设置为头节点。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // 中断标记
        boolean interrupted = false;
        for (;;) {
            // 获取当前线程节点的前驱节点
            final Node p = node.predecessor();
            // 如果前驱节点是头节点,则使当前线程尝试获取锁资源(tryAcquire方法忘了回头看第五步)
            if (p == head && tryAcquire(arg)) {
                // 如果当前程线程获取锁资源成功,则将当前线程节点设置为头节点
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 根据前驱节点p的等待状态判断是否要将当前线程阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        // 生成CANCELLED状态节点,并唤醒节点
        if (failed)
            cancelAcquire(node);
    }
}

节点加入队列尾部后,如果其前驱节点不是头节点,通常情况下,该新节点所绑定的线程会被无限期阻塞,而不会去执行无效循环,从而导致CPU资源的浪费。

对于公平锁,头节点一般是占用锁的节点,在释放锁时,会唤醒其后稷街店所绑定的线程,后继节点的线程被唤醒后会重新执行自旋抢锁逻辑。

AQS 释放锁的原理

AQS 释放锁分成 3 个部分:

release 模板方法

首先会调用 AQS 的 release 模板方法,代码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

可以看到其中主要有 2 个方法,一个是 tryRelease 钩子方法,该方法会尝试释放当前线程持有的资源,由子类根据具体业务提供具体实现,执行成功后返回 true

另一个方法是 unparkSucessor(),用来唤醒后继节点,代码如下。

private void unparkSuccessor(Node node) {
    // 获取节点状态,释放锁的节点,也就是头节点
    int ws = node.waitStatus;
    // 若头节点状态小于0,则将其置为0,表示初始状态
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    Node s = node.next; // 找到后头的一个节点
    if (s == null || s.waitStatus > 0) {
        // 如果新节点已经被取消
        s = null;
        // 从队列尾部开始,往前去找醉千年的一个 waitStatus 小于0的节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒后继节点对应的线程
    if (s != null)
        LockSupport.unpark(s.thread);
}

ReentrantLock 抢锁流程

下面分别对 ReentrantLock 的公平锁和非公平锁的实现进行讲述。

ReentrantLock 非公平锁的抢占流程

总体流程图如下:

reentrantlock1.png

非公平锁的同步器子类

ReentrantLock 为非公平锁实现了一个内部的同步器—— NofairSync,器显式锁获取方法 lock() 的源码如下:

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

首先用一个CAS操作判断 state 是不是0(表示当前锁未被占用),如果是0就把它置为1,并且设置当前线程为该锁的独占线程,表示获取锁成功。当多个线程同时尝试占用一个锁时,CAS操作只能保证一个线程操作成功,剩下的要排队。

非公平体现在:如果占用锁的线程刚刚释放锁,state 置为0,而排队等待锁 的线程还未唤醒,新来的线程就直接抢占了该锁,那么久“插队”了。

非公平抢占的钩子方法:tryAcquire

如果非公平抢占没有成功,非公平锁的 lock 汇之星模板方法 acquire,首先会调用钩子方法 tryAcquire,非公平抢占的钩子方法实现如下:

static final class NonfairSync extends Sync {
    ...

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}
 abstract static class Sync extends AbstractQueuedSynchronizer {
   
    final boolean nonfairTryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        // 先直接获取锁的状态
        int c = getState();
        if (c == 0) {
            // 如果内部队列首节点的线程执行晚了,它会将锁的state置为0
            // 当前抢锁线程的下一步就是直接进行抢占
            // 发现state是空的,就直接拿来加锁使用
            if (compareAndSetState(0, acquires)) {
                // 1.利用CAS自旋方式确认当前state确实为0,然后设置acquire(1)
                setExclusiveOwnerThread(current);
                // 设置当前执行的线程,直接返回true
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            // 2.当前线程和执行中的线程是同一个,也就意味着可重入操作
            int nextc = c + acquires;
            if (nextc < 0) // overflow
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            // 表示当前锁被1个线程重复获取了nextc次
            return true;
        }
        // 否则就返回false,表示没有成功获取当前锁,进入排队过程
        return false;
    }
    ...
}

其核心思想是当前线程尝试获取锁的时候,如果发现锁的状态为0,则直接尝试将锁拿过来,而不会考虑其他排队节点。

ReentrantLock 公平锁的抢占流程

ReentrantLock 公平锁的抢占流程如下:

reetrantlock2.png

公平锁的同步器子类

ReentrantLock 为公平锁实现了一个内部的同步器——FairSync,其显式锁获取方法 lock 的代码如下:

static final class FairSync extends Sync {
        
    final void lock() {
        acquire(1);
    }
    ...
}

其核心思想是通过 AQS 模板方法进入队列入队操作。

公平抢占的钩子方法:tryAcquire

公平锁的 lock 会执行模板方法 acquire,该方法首先会调用钩子方法 tryAcquire,其实现如下:

static final class FairSync extends Sync {
    ...

    protected final boolean tryAcquire(int acquires) {
        final Thread current = Thread.currentThread();
        int c = getState(); // 锁状态
        if (c == 0) {
            if (!hasQueuedPredecessors() &&
                compareAndSetState(0, acquires)) {
                setExclusiveOwnerThread(current);
                return true;
            }
        }
        else if (current == getExclusiveOwnerThread()) {
            int nextc = c + acquires;
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc);
            return true;
        }
        return false;
    }
}

公平锁的钩子方法中,首先判断是否有后继节点,如果有,并且当前线程不是锁的占有线程,钩子方法就返回 false,模板方法会进入排队的执行流程。

是否有后继节点的判断

FairSync 是否有后继节点的判断代码如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

执行场景大致如下:

  1. h != t 不成立的时候,说明 h 头节点,t 尾节点要么是同一节点。要么都是 null,此时返回 false,表示没有后继节点。

  2. h != t 成立的时候,进一步检查 head.next 是否为 null,如果为 null,返回 true,这种场景在有其他线程第一次正在入队时可能会出现。

  3. 如果 h != t 成立,head.next != null,判断 head.next 是不是当前线程,如果是就返回 false,不是就返回true

AQS 条件队列

Condition 是 JUC 用来代替传统 Objectwait/notify 线程间通信与写作机制的新组件,它更加的高效。

Condition 基本原理

ConditionObject 类是实现条件队列的关键,每个 ConditionObject 对象都维护一个单独的条件等待队列。每个 ConditionObject 对应一个条件队列,它记录该队列的头结点和尾节点。

public class ConditionObject implements Condition, java.io.Serializable {
        
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;
}

一个 Condition 对象是一个单条件的等待队列.

condtionObject.png

在一个显式锁上,我们可以创建多个等待任务队列,这点和内置锁不同,Java 内置锁上只有唯一的一个等待队列。比如,我们可以使用 newCondition 创建两个等待队列,具体如下:

private Lock lock = new ReentrantLock();
//创建第一个等待队列
private Condition firstCond = lock.newCondition();
//创建第二个等待队列
private Condition secondCond = lock.newCondition();
condition.png

await等待方法原理

当线程调用 await 方法时,说明当前线程的节点为当前AQS队列的头节点,正好处于占有锁的状态,await 方法需要把该线程从 AQS 队列挪到 Condition 等待队列里。

在 await 方法将当前线程挪动到 Condition 等待队列后,还会唤醒 AQS 同步队列中的 head 节点的下一个节点,方法代码如下:

 public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); // 1
    int savedState = fullyRelease(node); // 2
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 3
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE) // 4
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // 5
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

整体流程如下:

  1. 执行 await 时,会新创建一个节点并放入 Condition 队列尾部。
  2. 然后释放锁,并唤醒AQS同步队列中的头节点的后一个节点。
  3. 然后执行 while 循环,将该节点的线程阻塞,直到该节点离开等待队列,重新回到同步队列成为同步节点后,线程才退出 while 循环。
  4. 退出循环后,开始调用 acquireQueued() 不断尝试拿锁。
  5. 拿到锁后,会清空 Condition 队列中被取消的节点。

创建一个新节点并放入 Condition 队列尾部的工作由 addCondtionWaiter 方法完成。

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // 如果尾节点取消,重新定位尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    // 创建一个新Node,作为等待节点
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    // 将新Node加入等待队列
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

signal唤醒方法原理

线程在某个 ConditionObject 对象上调用 signal 方法后,等待队列中的 firstWaiter 会被加入同步队列中,等待节点被唤醒,流程如下:

public final void signal() {
    // 如果当前线程不是持有该锁的线程,就抛出异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first); // 唤醒节点
}

// 执行唤醒
private void doSignal(Node first) {
    do {
        // 出队的代码写的很巧妙
        // first出队,firstWaiter头部指向下一个节点,自己的nextWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            // 如果第二个节点为空,则尾部也为空
            lastWaiter = null;
        // 将原来头部first的后继置空,help for GC
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
}

// 将被唤醒的节点转移到同步队列
final boolean transferForSignal(Node node) {

    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

signal 方法的整体流程如下:

  1. 通过 enq() 方法自旋讲条件队列中的头节点放入 AQS 同步队列尾部,并获取它在 AQS 队列中的前驱节点。

  2. 如果前驱节点的状态是取消状态,或者设置前驱节点为 Signal 状态失败,就唤醒当前节点的线程,否则节点在同步队列的尾部,参与排队。

  3. 同步队列中的线程被唤醒后,表示重新获取了显式锁,然后继续执行 condition.wait() 语句后面的临界区代码。

AQS实际应用

JUC 的同一架构如下图所示。

juc总体架构.png

AQS 建立在 CAS 原子操作和 volatile 可见性变量的基础之上,为上层的显式锁、同步工具类、阻塞队列、线程池、并发容器、Future异步工具提供线程之间同步的基础设施。所以,AQS在JUC框架使用很广泛。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容