Java并发核心类——AbstractQueuedSynchronizer类
一、抽象排队同步器AQS简介
java.util.concurrent包下有多种常用的工具类可以帮助我们快速进行并发程序的开发,如:CountDownLatch、Semaphore、ReentrantLock等。阅读它们的源码可以发现,这些类的功能接口都是委托了AbstractQueuedSynchronizer类的子类进行实现的。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues.
上面这句话节选自Java1.8 API文档中关于AbstractQueuedSynchronizer类(下称AQS)的描述,作者是并发大神Doug Lea(他设计了java.util.concurrent包)。由此可知AQS类自成一派,为实现锁以及相关的的同步器提供了一个依赖FIFO队列的基础开发框架。它是JDK层面上实现同步的核心类。
既然依赖队列,所以AQS有三个主要的对象成员:指向队列头结点的引用head、指向队列尾节点的引用tail、同步器状态state。头尾节点是Node类型,它是AQS类的一个内部类,线程每一次对资源的请求(获取或释放,底层是对state的修改)若一开始不成功,则会被封装成一个Node节点,加入到FIFO队列尾部。
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile Node head;
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile Node tail;
/**
* The synchronization state.
*/
private volatile int state;
同步工具类主要是为了线程安全地更新某些状态而存在的,它们的实现则利用AQS来管理它们所依赖的状态。根据利用AQS实现的同步工具类不同,state变量代表的含义也不同。例如ReentrantLock中利用state表示持有锁线程重复获取该锁的次数,Semaphore使用state表示剩余的许可数量,ThreadPoolExecutor.Worker类用state=1表示锁定状态,state=0表示解锁状态。在“获取(acquire)”和“释放(release)”资源的一系列方法中,可以通过getState、setState和compareAndSetState方法来获取或者修改state。
基于AQS实现的同步工具类都有不同形式的“获取”和“释放”操作。比如ReentrantLock类的lock和unLock方法,Semaphore类的acquire和release方法。这些方法的实现都是调用了AQS类的acquire和release方法,这两者伪代码大致如下:
boolean acquire() throws InterruptedException {
while(获取锁) {
if (获取到) {
退出while循环
} else {
if(当前线程没有入队列) {
那么封装成Node类入队列
}
阻塞当前线程
}
}
}
boolean release() {
if (释放成功) {
删除头结点
激活后继节点
}
}
二、用于封装资源请求的容器——Node类简介
Node类是AQS的静态内部类,用于封装线程对资源的请求,并构建一个双向的等待队列(Wait Queue)。
The wait queue is a variant of a "CLH" (Craig, Landin, and Hagersten) lock queue. CLH locks are normally used for spinlocks. We instead use them for blocking synchronizers, but use the same basic tactic of holding some of the control information about a thread in the predecessor of its node.
这是java源码中关于Node类的一段描述:等待队列是CLH锁队列的一个变体。CLH锁通常被用作自旋锁。虽然我们将其用作阻塞同步器,但是却使用了同样的持有前驱节点中线程的某些控制信息的基本策略。有关CLH锁请参考这里:CLH锁 、MCS锁
Node的类成员如下:
Field Name | Description |
---|---|
int waitStatus | 表示节点的状态,一共有5种: 1. SIGNAL=-1,表示在当前节点释放或者取消时要唤醒后记节点。 2. CANCELLED=1,表示该节点被取消。 3. CONDITION=-2, 表示当前节点在某个条件上等待。 4. PROPAGATE=-3,常用作共享锁模式,表示当前节点的SHARED后继节点能够得以执行。 5. 0表示初始状态,节点等待获取资源。 注意:一般使用CAS操作对该状态进行更新。 |
Node prev | 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接 |
Node next | 后继节点 |
Thread thread | 请求的线程引用 |
Node nextWaiter | 存储condition队列中的后继节点。 |
Node类的方法:
Method Name | Description |
---|---|
boolean isShared | 判断节点是否是共享模式(SHARED) |
Node predecessor | 返回前驱节点 |
多线程下并发的对同步器进行访问并请求资源,会产生一个个节点,前后相连地构成一个等待队列。同步器资源的获取从AQS的head域指向的头节点开始,依次向后转移。
三、构造一个简易同步工具类
使用AQS构建同步工具类,需要在其内部定义一个AQS的子类作为自定义同步器,并重写下列部分方法:
前三个方法用来实现以独占的(exclusive)形式获取资源(锁),后面两个则是以共享的(shared)形式获取资源(锁)。独占方式一次只能允许一个线程获取,而共享的形式则允许多个线程同时获得。大部分的同步器获取资源的方式非此即彼,此时只要实现这两类方法中的其中一类即可。但是也存在像ReentrantReadWriteLock类这样二者皆可的同步工具类。
下面的例子给出一个使用AQS实现的信号量MySemaphore。
public class MySemaphore implements Serializable{
private static final long serialVersionUID = -5515268704842028921L;
// 自定义内部同步器,共享方式获得资源
private static class Sync extends AbstractQueuedSynchronizer{
private static final long serialVersionUID = 294947484043077260L;
// 构造方法,用state表示可用许可数
Sync(int permit){
setState(permit);
}
@Override
protected int tryAcquireShared(int acquirePermit) {
// 使用CAS自旋修改状态
while (true) {
int remainPermit = getState();
int newPermit = remainPermit - acquirePermit;
if (newPermit < 0 || compareAndSetState(remainPermit, newPermit))
return newPermit;
}
}
@Override
protected boolean tryReleaseShared(int releasePermit) {
// 使用CAS自旋修改状态
while (true) {
int remainPermit = getState();
if (compareAndSetState(remainPermit, remainPermit + releasePermit))
return true;
}
}
// 获取剩余许可数量
public int getPermit(){
return getState();
}
}
private Sync sycn;
MySemaphore(int permit) {
sycn = new Sync(permit);
}
// 获取1个许可
public void acquire() { sycn.acquireShared(1); }
// 释放一个许可
public boolean release() { return sycn.releaseShared(1); }
public int getPermit() {
return sycn.getPermit();
}
// 测试代码
public static void main(String[] args) {
// 创建一个具备3个许可的信号量
MySemaphore ms = new MySemaphore(3);
Runnable task = new Runnable() {
@Override
public void run() {
// 获取许可
ms.acquire();
try {
// 每隔5秒打印
TimeUnit.SECONDS.sleep(5);
System.out.println("Thread Name: " + Thread.currentThread().getName()
+ ",Permit: " + ms.getPermit());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 释放许可
ms.release();
}
}
};
ExecutorService exec = Executors.newCachedThreadPool();
// 启动20个线程测试
for (int i = 0; i < 20; ++i) {
exec.execute(task);
}
}
}
以上代码构建了一个具备最基本功能的信号量同步工具,该类以共享的方式获取和释放资源。MySemaphore类的API接口并没有直接调用重写的tryAcquireShared和tryReleaseShared方法,而是调用了AQS的acquireShared和releaseShared方法来实现功能。由于信号量只有3个许可,因此后面测试代码中,控制台中每隔5秒输出3行内容。
当然,MySemaphore类可以以继承的方式扩展AQS来实现所需功能,而不是在其内部持有一个AQS子类实例来实现。但这样的做法并不推荐,因为外界与MySemaphore实例交互只需要3个方法,继承的方式破坏了该类的简洁性。同时AQS的许多public方法能够直接修改状态,如果通过继承AQS来实现功能,有可能导致其他开发者误用。
Implementations of these methods must be internally thread-safe, and should in general be short and not block. Defining these methods is the only supported means of using this class. All other methods are declared final because they cannot be independently varied.
注意:重写上述的两类5个方法是唯一允许的使用AQS的方法
java1.8关于AQS的源码199~284行有更多的例子可供参考。
四、实现原理分析
4.1 public final void acquire(int arg)
acquire方法以独占的方式获取资源,并且忽略中断。
public final void acquire(int arg) {
// 首先调用tryAcquire方法获取资源
if (!tryAcquire(arg) &&
// 将当前请求封装成Node并插入队列
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先调用tryAcquire方法获取资源,若失败则将请求以独占mode封装成Node节点并插入队列中。这里涉及addWaiter和acquireQueued两个方法。
private Node addWaiter(Node mode) {
// 创建一个mode(EXCLUSIVE/SHARED)节点,并封装当前线程
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
// 新建节点的前驱指向之前的尾节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
// 若CAS设置尾节点成功,则更新尾节点的后继为新建节点
pred.next = node;
// 返回新加入的节点
return node;
}
}
// 尾节点为空(队列未初始化)或者第一次CAS操作失败调用下面代码
// 将新建节点入队,若没有初始化则初始化队列
enq(node);
return node;
}
private Node enq(final Node node) {
// 自旋插入node到队尾,若没有初始化则初始化
for (;;) {
Node t = tail;
// 尾节点为空表示队列未初始化
if (t == null) { // Must initialize
// 创建一个新节点作为头结点,这样下次循环时队列就已经初始化了
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 返回新加入的节点
return t;
}
}
}
}
addWaiter将请求封装成node节点并插入到等待队列队尾,若队列没有初始化,则先初始化队列。新建的节点通过自旋+CAS操作来插入到队列尾部。成功后返回新加入的节点。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 判断前驱节点是否是头结点,并且尝试获取资源
if (p == head && tryAcquire(arg)) {
// 设置该节点为头结点,setHead方法中切断前驱引用
setHead(node);
// 切断后继引用
p.next = null; // help GC
failed = false;
return interrupted;
}
// 判断是否阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 根据failed值判断是否取消当前节点
if (failed)
cancelAcquire(node);
}
}
// 判断是否阻塞node节点引用的线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // 表示状态为CANCELLED
do {
// 因为前驱节点为取消状态,则将前驱节点从队列中移除,循环移除直到找到符合条件的前驱节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// CAS设置节点等待状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 阻塞并检查线程是否被中断
private final boolean parkAndCheckInterrupt() {
// 调用系统级代码阻塞当前线程
LockSupport.park(this);
// 检查线程是否中断
return Thread.interrupted();
acquireQueued方法能够令存在队列中的线程获取资源,总体思路是CAS+阻塞线程+自旋检查。返回值表示线程是否中断。
- 判断前驱节点是否是头结点并尝试获取资源,若成功则将当前节点设置为头结点并移除之前的头结点。
- 若操作失败,则调用shouldParkAfterFailedAcquire方法判断当前节点是否需要阻塞。该方法在前驱节点状态为SIGNAL时才返回True,方法内部具备移除队列中CANCELLED节点的功能。
- 调用parkAndCheckInterrupt方法阻塞线程,并检查线程是否中断。线程会阻塞在该方法中,当该方法返回时,无论是否中断,都会再次判断第一步的条件,完成自旋。
- finally中根据failed的值判断是否取消当前节点。
以上流程图描述了acquire方法的执行流程。考虑到很多时候线程对资源的请求并不存在竞争,因此同步器中的等待队列采用lazy方式初始化。
当线程在parkAndCheckInterrupt方法中阻塞时,需要有一个唤醒操作来唤醒阻塞的线程,重新判断条件。这个操作来自前驱节点的release方法。
4.2 boolean release(int)
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
// 若头结点为null,说明获取资源不存在竞争,没有初始化队列,不用唤醒后继节点
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 清除当前节点的等待状态,成功与否无关紧要,反正都会走下去
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// 如果后继节点的等待状态大于零,说明为CANCELLED状态
// 从尾部往前找到离当前节点最近的非CANCELLED状态节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点的线程
LockSupport.unpark(s.thread);
}
release方法调用tryRelease方法释放独占资源。
- 判断队列是否初始化,若没有初始化则直接返回true。
- 若队列初始化且头结点状态不为初始状态,则调用unparkSuccessor尝试唤醒后继节点。否则直接返回true。
- unparkSuccessor将当前节点的之后的第一个非CANCELLED状态的节点所持有的线程唤醒。
unparkSuccessor中之所以要从后往前查找非CANCELLED状态节点,是因为当前节点的后继节点引用有可能指向null,此时若从当前节点往后查找,则会报NullPointerException。
4.3 public final void acquireShared(int arg)
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// tryAcquireShared返负数表示失败
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 将请求以SHARED方式封装成Node节点加入队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 判断前驱节点是否是头结点
if (p == head) {
int r = tryAcquireShared(arg);
// 判断获取资源是否成功
if (r >= 0) {
// 将当前节点设置为头结点并且唤醒之后为SHARED模式的节点,直到遇到一个EXCLUSIVE节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
对比doAcquireShared和acquireQueued方法,可以发现二者的相似性。区别在于当前驱节点为头结点且共享方式获取资源成功后,会将当前节点设置为头结点并且不断唤醒之后为SHARED模式的后继节点,直到遇到一个EXCLUSIVE节点为止。
4.4 public final boolean releaseShared(int arg)
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
// 不断唤醒共享状态节点
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
releaseShared方法唤醒一个节点后,会同时唤醒之后mode为SHARED的节点。
4.5 public final void acquireInterruptibly(int arg)
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 封装请求成Node,加入队列中
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 注意和acquireQueued对比,此处抛出中断异常。
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireInterruptibly与acquire方法功能相同,但是可以在获取资源的过程中响应中断。parkAndCheckInterrupt判断有中断时,随后即抛出中断异常。
4.6 public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
// 尝试在指定时间内获取资源
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 响应中断
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
// 计算deadline
final long deadline = System.nanoTime() + nanosTimeout;
// 封装请求
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 判断时间是否超过deadline
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
// 阻塞线程一段时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
tryAcquireNanos方法不像acquire方法,第一次获取资源失败后会阻塞线程直到前驱节点对其唤醒,而是在规定时间内若没有获取资源则返回失败,线程可以继续以不同策略向下执行。调用LockSupport.parkNanos(this, nanosTimeout)
方法令当前线程休眠nanosTimeout纳秒。线程一旦被唤醒,就会检查当前时间是否超时(超过deadline),若超时,则返回false。
若剩余的等待时间小于spinForTimeoutThreshold,则快速自旋操作的效率将大于休眠唤醒线程的操作。Doug Lea应该测算了在线程调度器上的切换造成的额外开销,因此在小于spinForTimeoutThreshold=1000ns内就让当前线程进入快速自旋状态,如果这时再休眠相反会让nanosTimeout的获取时间变得更加不精确。
该方法的功能可以用来实现定时锁。
五、总结
本文介绍了设计java.util.concurrent包的核心类AbstractQueuedSynchronizer。分析了其内部等待队列的节点类Node。并根据源码介绍了以独占或共享方式获取释放资源的实现原理。