1、AQS简介
AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer)是用来构建锁或者其他同步组件(信号量、事件等)的基础框架类。JDK中许多并发工具类的内部实现都依赖于AQS,如ReentrantLock, Semaphore, CountDownLatch等等。
AQS的主要使用方式是继承它作为一个内部辅助类实现同步原语,它可以简化你的并发工具的内部实现,屏蔽同步状态管理、线程的排队、等待与唤醒等底层操作。
在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getstate,setState以及compareAndSetState等 protected类型方法来进行操作。这个整数可以用于表示任意状态。
AQS主要做三件事:
- 同步状态的管理
- 线程的阻塞和唤醒
- 同步队列的维护
状态管理API:
- int getState(): 获取同步状态
- void setState(): 设置同步状态
- boolean compareAndSetState(int expect, int update):基于CAS,原子设置状态
AQS模板方法:
方法 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,该方法将会调用重写的tryAcquire(intarg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg)相同,但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException并返回 |
boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么就会返回false,如果获取到就返回true |
void acquireShared(int arg) | 共享式的获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式获取的主要区别是在同一时刻可以有多个线程获取到同步状态。 |
void acquireSharedInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断。 |
boolean tryAcquireSharedNanos(int arg,long nanos) | 在acquireSharedInterruptibly(int arg)基础上增加了超时限制。 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。 |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
Collection getQueuedThreads() | 获取等待在同步队列上的线程集合 |
同步器可重写的方法:
方法 | 描述 |
---|---|
boolean tryAcquire(int arg) | 独占获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期状态,然后再进行CAS设置同步状态。 |
boolean tryRelease(int arg) | 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态 |
int tryAcquireShared(int arg) | 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之失败 |
boolean tryReleaseShared(int arg) | 共享式释放同步状态 |
boolean isHeldExclusively() | 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占 |
AQS对外暴露的方法:
2、AQS数据结构
因为获取锁是有条件的,没有获取锁的线程就要阻塞等待,那么就要存储这些等待的线程。在AQS中使用CLH队列储存这些等待的线程,但它并不是直接储存线程,而是储存拥有线程的node节点。
2.1、Node数据结构:
static final class Node {
//共享模式的标记,标识一个节点在共享模式下等待
static final Node SHARED = new Node();
//独占模式的标记,标识一个节点在独占模式下等待
static final Node EXCLUSIVE = null;
// waitStatus变量的值,标志着线程被取消,后续将不会获取到锁
static final int CANCELLED = 1;
// waitStatus变量的值,标志着后继线程(即队列中此节点之后的节点)需要被阻塞.(用于独占锁)
static final int SIGNAL = -1;
// waitStatus变量的值,标志着线程在Condition条件上等待阻塞.(用于Condition的await等待)
static final int CONDITION = -2;
// waitStatus变量的值,标志着下一个acquireShared方法线程应该被无条件传播。(用于共享锁)
static final int PROPAGATE = -3;
// 标记着当前节点的状态,默认状态是0, 小于0的状态都是有特殊作用,大于0的状态表示已取消
//SIGNAL:此节点的继任节点被阻塞,故当此节点释放锁或中断时需要换继任节点的线程。
//CANCELLED:当获取锁超时或被中断时节点状态会被设置成此状态,此状态的节点不会被再次阻塞;
//CONDITION:标识此节点在条件队列中,在同步队列的节点不会出现此状态,
//当节点从条件队列移到同步队列时此状态会被设置为0;
//PROPAGATE:一个releaseShared应该被传播到其他节点,此状态在doReleaseShared()中调用,
//以确保传播传播在其他插入时保持继续。
//总结:状态小于0表示节点无需被通知唤醒;状态为0表示普通同步节点;CONDITION表示节点在
//等待队列中,状态通过CAS进行原子更新。
volatile int waitStatus;
/**
* 前驱节点,在enqueue时设置,在dequeue或前驱节点取消时清除;
*/
volatile Node prev;
/**
* 后继节点,在enqueue时设置,在dequeue或前驱节点取消时清除;
* 入队操作不会设置前驱节点的后继节点,直到节点连接到队列;
* 故next节点为null不一定表示此节点为队列尾部,当next节点为null时,
* 可遍历prev节点进行双重检查;已经取消的节点的next指向自己而不是null
*/
volatile Node next;
//该节点拥有的线程
volatile Thread thread;
/**
* 1、值为null或非SHARED;为null时表示独占模式;非SHARED时表示在Condition中等待的队列;
* 2、值为SHARED,表示共享模式;
*/
Node nextWaiter;
//是否为共享模式
final boolean isShared() {
return nextWaiter == SHARED;
}
//当前节点的前驱节点,如果前驱节点为null,则抛出NPE异常
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
//用于在addWaiter()中使用,创建同步队列中的节点
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//在Condition中使用,创建等待队列的节点
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
说明:
- waitStatus:表示当前节点的状态,会有如下五中状态:
CANCELLED(1):当获取锁超时或被中断时节点状态会被设置成此状态,此状态的 节点会被unpark,不会参与锁的获取,不会被再次阻塞;
0:表示普通节点,当节点初始插入到同步队列时的状态;
SIGNAL(-1):此节点的继任节点被阻塞,故当此节点释放锁或中断时需要换继任节点的线程。
CONDITION(-2):标识此节点在条件队列中,在同步队列的节点不会出现此状态,当节点从条件队列移到同步队列时此状态会被设置为0;
PROPAGATE(-3):一个releaseShared应该被传播到其他节点,此状态在doReleaseShared()中调用,以确保传播传播在其他插入时保持继续。
总结:状态小于0表示节点无需被通知唤醒;状态为0表示普通同步节点;CONDITION表示节点在等待队列中,状态通过CAS进行原子更新。
- prev:前驱节点,在enqueue时设置,在dequeue或前驱节点取消时清除;
- next:后继节点,在enqueue时设置,在dequeue或前驱节点取消时清除; 入队操作不会设置前驱节点的后继节点,直到节点连接到队列; 故next节点为null不一定表示此节点为队列尾部,当next节点为null时, 可遍历prev节点进行双重检查;已经取消的节点的next指向自己而不是null
- **nextWaiter: **值为SHARED,表示共享模式;值为null或非SHARED,为null时表示独占模式,非SHARED时表示在Condition队列中等待的节点;
waitStatus状态状态:
2.2、AQS数据结构
/**
* 等待队列的头节点,通过setHead方法进行更新;头结点的状态不可能是CANCELLED
*/
private transient volatile Node head;
/**
* 等待队列的尾节点
*/
private transient volatile Node tail;
/**
* 同步状态
*/
private volatile int state;
3、CLH队列相关操作
3.1、相关属性的CAS操作
/**
* 通过CAS设置AQS的head值
*/
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* 通过CAS设置AQS的tail值
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* 通过CAS设置Node节点的waitStatue值
*/
private static final boolean compareAndSetWaitStatus(Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* 通过CAS设置Node节点的next值
*/
private static final boolean compareAndSetNext(Node node,
Node expect,
Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
3.2、将节点添加到CLH队尾
//通过空转及CAS方式,将一个node插入同步队列的队尾
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
//尾节点为空,则直接CAS初始化头结点,并将尾节点设置为头节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//尾节点不为空,则CAS设置当前节点为尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
3.3、将当前线程添加到CLH队尾
//创建一个给定模式的节点,此节点线程为当前线程,并将节点加入队尾
private Node addWaiter(Node mode) {
//创建线程
Node node = new Node(Thread.currentThread(), mode);
//快速检测尾节点不为空,则CAS将当前节点替换为尾节点
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//快速替换失败?则通过enq()的空转方式将当前节点插入队尾
enq(node);
return node;
}
4、独占锁
独占锁主要包括两方面功能:
- 获取锁的功能:既当多个线程一起获取锁的时候,只有一个线程能获取到锁,其他线程必须在当前位置阻塞等待。
- 释放锁的功能:获取锁的线程释放锁资源,而且还必须能唤醒正在等待锁资源的一个线程。
4.1、独占锁获取流程
4.2、获取独占锁相关方法
//获取独占锁,忽略中断;直到成功获取锁,此方法经常被lock.lock调用
public final void acquire(int arg) {
//tryAcquire:先CAS尝试获取锁,当返回true表示获取成功,此方法由子类实现;
//当返回false,则调用
//acquireQueued进行获取及入队阻塞处理,当有其他线程释放锁会唤醒改线程。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//调用tryAcquire尝试获取锁,当获取失败就将线程节点入队并阻塞节点线程;
//直到线程被中断或被唤醒,会再次尝试获取锁;
final boolean acquireQueued(final Node node, int arg) {
//标识获取锁失败
boolean failed = true;
try {
//标识锁被中断
boolean interrupted = false;
for (; ; ) {
//获取当前节点的前驱节点,若前驱节点为头节点,则尝试获取锁;
//若获取成功则将当前节点设为头节点,否则尝试阻塞节点线程
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//将当前节点设为头节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//判断当前是否满足阻塞条件,满足则阻塞当前线程;
//并等待被中断或被唤醒
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//获取锁失败,则进行取消获取操作
if (failed)
cancelAcquire(node);
}
}
//判断当前节点线程是否需要阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驱节点的状态
int ws = pred.waitStatus;
//前驱节点状态为SIGNAL,表示前驱节点在等待获取锁的信号
//故本节点可以安全的阻塞
if (ws == Node.SIGNAL)
return true;
//前驱节点waitStatus>0,即为waitStatus=CANCELLED;
//表示前驱节点已经被取消,需需要前向遍历前驱节点,直到状态
//不为CANCELLED的节点,并将此节点设为node节点的前驱节点;
//返回false,让上层调用继续尝试获取锁
if (ws > 0) {
//循环遍历前驱节点,寻找状态不为CANCELLED的节点,并设为当前节点的
//前驱节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//当前驱节点状态为0或PROPAGATE时,通过CAS设置前驱节点状态为SIGNAL
//并返回fase,等待下个循环阻塞当前节点线程;
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//通过LockSupport.park()阻塞当前线程,直到线程被unpark或被中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
4.3、释放独占锁相关方法
//释放独占锁
public final boolean release(int arg) {
//调用tryRelease方式通过CAS尝试释放锁,tryRelease由子类实现
if (tryRelease(arg)) {
Node h = head;
//头结点不为空且头节点状态不为0,应该为SIGNAL
//表示队列中有需要唤醒的节点,调用unparkSuccessor进行头节点线程
//唤醒操作
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//唤醒节点线程处理
private void unparkSuccessor(Node node) {
//获取当前节点状态
int ws = node.waitStatus;
//状态小于零,则将状态重置为0,表示节点处理已经完成
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//获取后继节点,当后继节点为空或后继节点状态为CANCELLED时;
//由tail前向遍历队列,找到当前节点的下个有效节点,即waitStatus <= 0
//的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//下个节点非空,表示为等待信号的节点,执行unpark唤醒节点线程
if (s != null)
LockSupport.unpark(s.thread);
}
5、共享锁
5.1、共享锁获取流程
5.2、共享锁获取相关方法
//获取共享锁,忽略中断;
public final void acquireShared(int arg) {
//子类实现,CAS方式获取共享锁,若获取失败,调用doAcquireShared继续获取共享锁
if (tryAcquireShared(arg) < 0)
//尝试获取共享锁,获取失败则将当前线程节点入队,直到被通知或被中断
doAcquireShared(arg);
}
//获取共享锁,若CAS获取失败,则将当前节点入队并阻塞当前线程,直到获取锁
private void doAcquireShared(int arg) {
//将当前节点插入队尾
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (; ; ) {
//获取当前节点的前驱节点,若前驱节点为头节点,则尝试获取锁;
//若获取失败,则检查节点状态;当节点状态为SIGNAL时将节点线程阻塞
final Node p = node.predecessor();
if (p == head) {
//CAS获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//成功则设置当前节点为头节点并将其他节点状态设为PROPAGAE
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//检查当前节点是否应该阻塞,是则进行阻塞处理,直到被中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//若获取锁失败,则进行取消获取处理
if (failed)
cancelAcquire(node);
}
}
//设置当前节点为头节点并唤醒共享模式下的线程
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
//设为头节点
setHead(node);
//若propagate > 0或头结点为空且头节点状态为 < 0
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
//若头节点的后继节点为共享模式,则获取头结点
if (s == null || s.isShared())
doReleaseShared();
}
}
5.3、共享锁释放相关方法
//释放共享锁
public final boolean releaseShared(int arg) {
//cas方式释放锁,若失败则doReleaseShared释放锁
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (; ; ) {
//获取头结点,头结点不为空且状态为SIGNAL时,CAS设置状态为0并唤醒线程
//否则将头结点状态设置为PROPAGATE,然后循环检查头结点状态并试图唤醒
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
6、condition条件
6.1、condition的实现
首先内部存在一个Condition队列,存储着所有在此Condition条件等待的线程。
await系列方法:让当前持有锁的线程释放锁,并唤醒一个在CLH队列上等待锁的线程,再为当前线程创建一个node节点,插入到Condition队列(注意不是插入到CLH队列中)
signal系列方法:其实这里没有唤醒任何线程,而是将Condition队列上的等待节点插入到CLH队列中,所以当持有锁的线程执行完毕释放锁时,就会唤醒CLH队列中的一个线程,这个时候才会唤醒线程。
6.2、await及signal处理流程
6.3、await相关方法
//让当前持有锁的线程阻塞等待,并释放锁。如果有中断请求,则抛出InterruptedException异常
public final void await() throws InterruptedException {
//若当前线程已被中断,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
// 为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中
Node node = addConditionWaiter();
//释放当前线程持有的锁,并唤醒同步队列中的头结点
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果当前节点补足同步队列中;
//阻塞当前线程,当前当前线程被signal信号唤醒后,将当前节点加入同步队列中;
//等待获取获取锁
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//检查是否被中断并入队等待锁
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果节点node已经在同步队列中了,获取同步锁,只有得到锁才能继续执行,否则线程继续阻塞等待
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 清除Condition队列中状态不是Node.CONDITION的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
// 是否要抛出异常,或者发出中断请求
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
//为当前线程创建新的Node节点,并且将这个节点插入到Condition队列中
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果等待队列尾节点状态不是CONDITION,则进行清除操作;
// 清除队列中状态不是CONDITION的节点
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//为当前线程创建一个状态为CONDITION的节点,并将节点插入队尾
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
//从头到尾部遍历等待队列,去除状态不是CONDITION的节点
private void unlinkCancelledWaiters() {
//记录下一个待处理的节点
Node t = firstWaiter;
//记录上一个状态为CONDITION的节点
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
} else
trail = t;
t = next;
}
}
//释放当前线程占有的锁,并唤醒同步队列一个等待线程
//如果失败就抛出异常,设置node节点的状态是Node.CANCELLED
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//释放当前线程占有的锁
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
//判断节点释放在同步队列中
final boolean isOnSyncQueue(Node node) {
// 如果node的状态是Node.CONDITION,或者node没有前一个节点prev,
// 那么返回false,节点node不在同步队列中
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果node有下个节点,则其一定在同步队列中
if (node.next != null) // If has successor, it must be on queue
return true;
//从同步队列中查找node节点
return findNodeFromTail(node);
}
//根据当前的模式,判断是否抛出异常或重新中断等
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
6.4、signal相关方法
//如果等待队列不为空,则将队列头节点插入同步队列中
public final void signal() {
//如果当前线程不是独占锁,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
//将等待队列中的头结点插入同步队列中
if (first != null)
doSignal(first);
}
//将等待队列总的头结点插入同步队列中
private void doSignal(Node first) {
do {
// 原先的Condition队列头节点取消,所以重新赋值Condition队列头节点
// 如果新的Condition队列头节点为null,表示Condition队列为空了
// ,所以也要设置Condition队列尾lastWaiter为null
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
// 返回true表示节点node插入到同步队列中,返回false表示节点node没有插入到同步队列中
final boolean transferForSignal(Node node) {
//如果无法将节点状态由CONDITION修改为0,表示节点已在同步队列中,直接返回false
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//将节点node插入到同步队列中,p是原先同步队列尾节点,也是node节点的前一个节点
Node p = enq(node);
int ws = p.waitStatus;
// 如果前一个节点是已取消状态,或者不能将它设置成Node.SIGNAL状态。
// 就说明节点p之后也不会发起唤醒下一个node节点线程的操作,
// 所以这里直接调用 LockSupport.unpark(node.thread)方法,唤醒节点node所在线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
//唤醒所有节点
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
//循环唤醒所有等待中的节点
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}