作为Java核心内容之一【并发】,该部分的源码基本在java.util.concurrent这个包下面。本文的内容的源码版本是jdk1.8_11。
作为AQS的前言,可以看看我之前的文章:《Java同步框架AQS原文分析》。
1、认识AQS
使用过Java中的锁对象,一定会对一个锁很熟悉—ReentrantLock。这是一个可重入的锁。大部分情况是作为一些情况替换synchronized这个关键字的方案。synchronized这个关键字是Java内部实现同步机制的,那么ReentrantLock实现的方式是什么?
它背后的大佬就是大名鼎鼎的AQS(AbstractQueuedSynchronizer)
AQS——简单来说,就是提供一个实现阻塞式锁和相关同步器的框架。它的内部是依赖一个FIFO阻塞队列实现以上功能的。
1.1 、AQS的两种模式
AQS中有两种获取和释放资源的模式,两个方式都很容易理解。
- 独占式:只允许同时一个线程获取同一个许可。可以对应为一些锁ReetrantLock
- 共享式:允许同时多个线程获取同一个许可。可以对应一些同步器类似Semaphore
Exclusive vs Shared 模式
在此队列中,通过一个非常简单的方式区分两种模式。就是以下两个静态常量。
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
1.2、AQS的子类
除了ReentrantLock之外还有以下的类使用到了AQS。这个版本是jdk1.8
2 、AQS内部结构
AQS内部的结构不算太复杂,这里先将其中重点的几个字段列出来
public abstract class AbstractQueuedSynchronizer{
private volatile int state;
private volatile Node head; // 队列的头节点
private volatile Node tail; // 队列的尾节点
...
}
state字段代表的同步的状态,它有什么用?我们暂时留个疑问,只需要知道它是用来决定同步的一个状态。后面分析流程时,再来分析这个字段。
这里我们先重点分析AQS的内部类Node
2.1、锁队列节点Node
AQS的锁队列就是依靠此内部类实现的。它是一个双向队列,内部存储了线程的信息。
static final class Node {
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread; // 节点的线程信息
volatile int waitStatus; // 线程的等待状态
Node nextWaiter; // 用于条件队列使用
...
}
前三个字段其实很好理解,既然是双向队列,那就会有前驱和后继节点,而线程信息也是需要保存的。waitStatus这个字段是什么作用的?
waitStatus
首先我们看看这个状态的值有什么意义。它在内部中定义了waitStatus只能有以下四个值。
- CANCELLED:值为1。代表此节点中的线程因为超时或者中断而被取消了。这种线程不会被再次阻塞。
- SIGNAL:值为-1。代表此节点的后继结点马上要或者已经阻塞了。所以,当前节点必须在它释放或者取消的时候,取消它后继结点的阻塞状态。为了避免出现竞争,获取许可的时候先要表明需要一个信号signal。然后开始原子式的获取,失败后就阻塞。
- CONDITION:值为-2。队列在Condition Queue中,等待某一个条件。
- PROPAGATE:值为-3。代表后继节点会传播唤醒的操作,此值在共享模式下启用。
对于一个线程来说,判断它能否获得许可,就需要根据waitStatus的值。
2.2、条件队列ConditionObject
除了锁队列之外,AQS还有一个条件队列,本文暂不分析。
3 、独占式AQS解析
介绍上述内容后,依然对上面提到的变量还有模式很模糊,后面我们开始分析具体的AQS流程,在流程中,我们将逐一讲解上述的内容的实际意义。
3.1、acquire操作
在AQS中的入口即是acquire。AQS 获取独占式的许可,需要调用acquire方法。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这里方法传进来了一个arg参数,arg的意义AQS不做具体定义和处理。简单来说,它是可以跟AQS 中的state字段进行联合使用的,用来判断现在是否能完成同步操作,而怎么使用state这个字段应该是交给子类去实现的,
3.1.1、tryAcquire
tryAcquire方法是AQS交给子类处理state与传入的arg的方法,子类需要判断是否允许当前线程在独占模式下进行acquire。
tryAcquire是一个空方法,具体实现方式在子类中。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
这里我们以ReentrantLock内的公平锁作为例子,分析具体情况下,子类的tryAcquire应该做什么工作。
protected final boolean tryAcquire(int acquires) {
...
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
...
setState(nextc);
return true;
}
return false;
}
这里我们不展开分析方法的具体意义。在ReentrantLock的公平锁中,state代表的是可重入的次数,简单来说就是多少对lock和unlock方法。
如果state等于0代表没有使用过lock方法,那就会进行两个条件判断。
- AQS队列中是不是没有其他还在等待的线程;
- CAS更新State字段成功与否。
如果上面两个条件都是true,那么就认为当前线程可以持有这个锁。
如果state不等于0,这就代表此次调用的lock方法不是第一对。那么只要当前线程与存储的当前持有锁的线程为同一个线程,就会state+1。表示当前lock的对数为2对。以此来实现可重入的锁!
3.1.2、addWaiter
在tryAcquire获取许可权失败后,addWaiter就是用来将当前失败的线程,添加进等待队列。传入的参数也就是AQS的模式,在1.1小节中提到的final两个字段。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 当等待队列尾部不为空时,有在等待的线程
if (pred != null) {
// 将新增的节点的前驱节点链接到尾部节点。
node.prev = pred;
// 采用CAS的方式进行数据的更新,更新成功后,则将原尾节点的后继节点链接到新增的节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果队列为空或者当前的尾节点已经变化了,则直接向队列中添加这个新的节点
enq(node);
return node;
}
addWaiter的大致流程在代码中注释中已经说明了,需要解释的一点是为什么采用CAS更新并连接线程节点的前驱节点。Doug Lea在AQS的文章中提到过,目前并没有很好的原子方式去更新双向队列。所以采用了上述的择中方案。
Q1:为什么采用CAS更新前驱节点保证其原子性,而对后继节点采用普通的赋值方式?
AQS采用的CLH锁的变体.
CLH锁中每个节点都回去对自身的前驱节点判断是否可以访问然后进行自旋,而AQS将自旋改为了LockSupport的park阻塞过程(这个过程在后文会解释!)。因此需要保证前驱节点一定是原子性,在不断的访问前驱节点过程中一定是最新的。这样即使next域不是最新的,也并不影响在队列中阻塞的过程,因为他们都是访问前驱节点。
然后,我们重点关注的是enq过程。
// 将新的节点插入到等待队列的尾部。如果等待队列不存在,一定要对其初始化
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
// 队列为空情况下,CAS操作将头节点进行更新,将头节点设置为空节点。
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 尾节点存在的情况下,代表AQS等待队列已经初始化完成。
// 与addWaiter一样,先将新增节点的前驱节点链接到尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
// 当更新尾节点成功后,就将原尾节点的后继节点连接为新增的节点
t.next = node;
return t;
}
}
}
}
AQS中对锁队列的初始化过程并不是在类建立的时候初始化,而是第一次有线程开始获取同步资源时。它初始化时将头节点设定为一个空的Node节点,新增的节点都添加在头节点后的后继节点中。
Q2:为什么要初始化成一个空节点作为头节点?
这个与后面文章的内容acquireQueue方法中入队操作有关。Q1问题提到了AQS借用了CLH的思想,也就是不断对前驱节点的状态进行访问。那么在判断是否可以获取资源时,也需要再次访问前驱节点。如果初始化的时候不将头节点赋值为空节点,那么前驱节点为null,这个与不是初始化阶段的队列情况不同,势必会复杂后面的流程。这也是AQS的简单优化之一。
3.1.3、acquireQueued
在将新的线程节点入队后,这个方法将会执行阻塞,并根据当前线程信息更新队列。它采用一个循环去处理队列中的信息,只有队列中获取节点成功才能从循环中弹出。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取节点的前驱节点
final Node p = node.predecessor();
// 该节点的前驱节点是头节点,就尝试去获取一下许可
// 当获取许可成功后,就可以将头节点设置为当前的节点
// 并将后继节点置空,重置状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当获取许可失败后,会判断需不需要对线程进行阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 如果执行过程中出现了了错误,或者中断后,就会对数据进行处理,完成销毁的过程。
if (failed)
cancelAcquire(node);
}
}
这里的核心在于两个if语句的内容:
- 第一个if语句主要实现对前驱节点的判定,只有是头节点才会再次尝试能否拿到资源的许可访问权。当拿到了许可权后,也就是持有独占权,那么就会将头节点更新为当前的线程节点。返回的状态为interrupted状态。
- 第二个if语句是前驱节点不是头节点或者在许可获取失败后,对线程节点的状态进行更新,并对合适的线程进行阻塞操作。
3.1.4、shouldParkAfterFailedAcquire
在线程不是头节点或者许可获取失败后,会调用此方法。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 只有SIGNAL状态才能阻塞线程
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
// 向队列的前驱节点找,一直到找到一个状态码小于0的,也就是非CANCEL状态的
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 把找到节点后继节点链接到当前节点
pred.next = node;
} else {
// 如果前驱节点的状态是小于0的,CAS方式将前驱节点的状态更新为SIGNAL。同时不对线程阻塞
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这个函数是整个循环中处理信号状态的唯一方法。信号是什么?也就是我们在Node章节提到的waitStatus状态。
waitStatus状态初始化的值是0。如代码中描述的,waitStatus只有前驱节点是SIGNAL状态才会将当前的线程阻塞住。其他情况下,waitStatus为非正数时,就会将前驱节点的状态更新为SIGNAL。而状态为正数时,代表节点被取消了,就会不断向前寻找,直到找到未被取消的节点。取消的节点通通会被垃圾回收机制处理掉。
【举个栗子!就是一个栗子】这样理解起来可能还是比较困难,我们可以试想一个情形,两个线程依次争夺独占权,线程A先获得了独占权,获得后的AQS等待队列的状态是如下图所示
在线程A释放独占权之前,线程B也发起了独占权的争夺。于是,按照之前的过程分析,在获取占有权失败后,会调用addWaiter方法,线程B的节点会被添加到等待队列中。添加后的结构就如下图所示
由于线程A一直没释放占有权,那么线程B就会进入到shouldParkAfterFailedAcquire方法内,而线程B的前驱节点就是线程A,而线程A节点的waitStatus是0,那就将它的状态更新为SIGNAL。它会返回到acquireQueue的死循环中再次进行占有权的获取,当然如果再次没有获取成功,也会进入此方法,此次它的前驱节点线程A的waitStatus已经更新为SIGNAL。返回true。
Q3:为什么需要SIGNAL信号才阻塞线程呢?
其实只是为了在线程park阻塞和解除阻塞的次数不那么频繁,减少了不必要的线程阻塞切换时间。SIGNAL设置后,只有确定SIGNAL信号的线程才会阻塞。
3.1.5、parkAndCheckInterrupt
这个方法很简单,就是使用LockSupport.park对线程进行了阻塞,返回线程是否被中断的状态。LockSupport的细节不做解释,简单来说,它主要是用来解决Thread.suspend这种方法会产生死锁的问题,同时能够对线程进行阻塞,核心是由native方法进行处理。有想深究的可以自行Google哈。
3.2、release操作
与acquire相对的方法也就是释放占有权的release方法。
3.2.1、release
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
相比之下,release方法涉及的步骤相对来说简单一些。与acquire方法类似,release也有一个tryRelease方法,是子类去实现具体功能。
具体步骤:
- 调用子类的tryRelease方法。
- 成功返回后,判断头节点是否存在和头节点的状态
- 若头节点存在,且头节点的waitStatus状态是非0,也就说明AQS不是初始化阶段
- 将后继节点的线程唤醒
【这里重点注意一下】:waitStatus状态是0,代表这个节点的状态还未更新,因此AQS没有将后继节点进行阻塞,这个时候就不能去做unpark的操作。
3.2.2、unparkSuccessor
在成功获取到子类的release认可后, 会进行唤醒过程,对该节点的后继节点进行唤醒。
unparkSuccessor方法不仅仅要处理release的解除阻塞操作,同时需要对cancel的进行区分。核心内容在于对Cancel状态的处理。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果当前节点的状态小于0,将状态更新为0,重置一下
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 后继节点不存在或者后继节点的waitStatus大于0(也就是被取消的状态)
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 置后继节点为null,
// 若当前节点发生了取消操作,重新从尾部开始遍历,找到没有标记为取消的节点进行阻塞操作
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
3.3、cancelAcquire
在整个acquire的过程中,如果发生超时,线程发起了中断操作,那么就会发起cancel方法,将线程的信息情况。具体见代码
private void cancelAcquire(Node node) {
...
node.thread = null;
// 跳过所有取消了的前驱节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
// 不采用 CAS方式进行写操作
node.waitStatus = Node.CANCELLED;
// 如果此节点已经是队尾,就将自己CAS删除。
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果后继节点需要信号SIGNAL,就自己前驱结点重新链接。
// 否则将线程唤醒,进行propagate过程
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node;
}
}
cancelAcquire的方法这里进行了一些不同情况下的取消操作。
- 取消的节点是队尾节点tail。那么直接将pred的next和当前节点的prev全部置null。重新设置一下tail节点。
- 取消的节点是队列中的一个节点,并且不是head头节点的直接后继节点。那么也很容易理解,就是将pred的next和当前节点的next进行一个链接。那么有一个疑问,代码中只将next节点链接了,prev的节点如何连接。这里的prev是由其它线程做的,由于当前节点已经是CANCEL状态,其他线程在进行竞争时,都会遍历前驱节点,重新链接前驱节点完成prev的链接。
- 取消的节点是队列头节点head的直接后继节点。那么如果取消的节点后继节点是存在的,可以直接唤醒后继节点。
小结
本文只是分析了独占式模式,第二篇将分析共享式的AQS的情况。
参考文章
如有问题,欢迎批评指正!