概述
AQS 全称是 Abstract Queued Synchronizer,即抽象队列同步器
,AQS内部基于CAS、LockSupport、自旋和双端等待队列实现的多线程同步工具
,AQS它是一套实现多线程同步功能的框架。
AQS 在JDK源码中被大量使用到,尤其是在 JUC(Java Util Concurrent)中,比如ReentrantLock、CountDownLatch、ThreadPoolExecutor
。理解 AQS 对我们理解 JUC 中其他组件至关重要,并且在实际开发中也可以通过自定义 AQS 来实现各种需求场景。
ReentrantLock 和 AQS 的关系
这次我们主要通过大家都熟悉的ReentrantLock(重入锁)
来理解 AQS 内部的工作机制。首先从 ReentrantLock
的lock
方法开始:
public void lock() {
sync.lock();
}
ReentrantLock
的lock
方法就一行代码很简单,直接调用了一个 Sync
对象的lock
方法,这个Sync
是什么呢?
public class ReentrantLock implements Lock, java.io.Serializable {
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer {
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { }// 尝试获取锁(非公平锁)
protected final boolean tryRelease(int releases) { }// 尝试释放锁
protected final boolean isHeldExclusively() { }
final ConditionObject newCondition() { }
final Thread getOwner() { }
final int getHoldCount() { }
final boolean isLocked() { }
private void readObject(java.io.ObjectInputStream s) {}
}
}
可以看出,Sync
类是 ReentrantLock
中的一个静态抽象内部类
,抽象方法是lock
。
这里ReentrantLock
并没有直接继承 AQS
,而是通过定义抽象内部类Sync
来扩展 AQS 的功能,然后在ReentrantLock
中定义Sync
的成员变量引用。
Sync
在ReentrantLock
有两种实现分别是NonfairSync
和FairSync
,分别对应非公平锁
和公平锁
,两种实现如下实现源码如下:
非公平锁的同步对象类
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) // 通过CAS 修改 state字段,即锁状态,表示争抢锁的操作。CAS 是CPU对变量进行的原子操作,绝对线程安全
setExclusiveOwnerThread(Thread.currentThread());// 设置为线程独占锁
else
acquire(1);// 通过CAS修改state(同步状态)失败,说明争抢锁失败,这里面是要加入双端等待队列中,然后轮询tryAcquire(子类实现AQS)方法,根据队列Node的waitStatus的不同可能会被挂起,当然挂起需要等待唤醒然后进行再次进入轮询获取锁
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
上面代码有几点:
- 通过CAS操作修改 state字段(锁同步状态),表示争抢锁的操作,如果争抢成功,那么将当前锁设置为线程独占锁。CAS操作是CPU对变量进行的原子操作,绝对线程安全。
- 如果CAS操作修改state字段(锁同步状态)失败,说明争抢锁失败可能有其他线程持有锁了,那么是要把线程加入双端等待队列中,然后轮询tryAcquire(子类实现AQS)方法,根据队列Node的waitStatus的不同可能会被挂起,当然挂起需要等待唤醒然后进行再次进入轮询获取锁。
公平锁的同步对象类
/***公平锁的同步对象*/
static final class FairSync extends Sync {
final void lock() {
acquire(1);// 通过CAS修改state(同步状态)失败,说明争抢锁失败,这里面是要加入双端等待队列中,然后轮询tryAcquire(子类实现AQS)方法,根据队列Node的waitStatus的不同可能会被挂起,当然挂起需要等待唤醒然后进行再次进入轮询获取锁
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&// 1
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
公平锁和非公平锁的区别: 公平锁是直接调用acquire()
方法 ,然后通过hasQueuedPredecessors()
方法判断队列是有等待的前驱节点线程,有的话就直接取出队列首节点的线程尝试去获得锁,而非公平锁是直接通过CAS争抢锁资源 。
理解AQS我们以非公平锁为例,实现源码如下:
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) // 通过CAS 修改 state字段,即锁状态,表示争抢锁的操作。CAS 是CPU对变量进行的原子操作,绝对线程安全
setExclusiveOwnerThread(Thread.currentThread());// 设置为线程独占锁
else
acquire(1);// 通过CAS修改state(同步状态)失败,说明争抢锁失败,这里面是要加入双端等待队列中,然后轮询tryAcquire(子类实现AQS)方法,根据队列Node的waitStatus的不同可能会被挂起,当然挂起需要等待唤醒然后进行再次进入轮询获取锁
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
可以看出在非公平锁中的 lock
方法中,主要做了如下操作:
- 如果通过
CAS
操作设置变量state
(同步状态)成功,表示当前线程获取锁成功,则将当前线程设置为锁独占线程。 - 如果通过
CAS
操作设置变量state
(同步状态)失败,表示当前锁正在被其他线程持有,则进入acquire
方法进行后续处理。
acruire
方法定义在 AQS 中,具体如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
acquire
方法是一个比较重要的方法,主要分为 3 个步骤:
-
tryAcquire
方法主要目的是尝试获取锁,tryAcquire
方法是子类实现的; -
addWaiter
如果tryAcquire
尝试获取锁失败,则调用addWaiter
方法将当前线程添加到一个等待队列
中,等待后续处理; -
acquireQueued
方法处理加入到队列中的节点(Node),通过自旋
去尝试获取锁,会根据前驱节点的waitStatus的情况将线程挂起
或者取消
。
以上 3 个方法都被定义在 AQS 中,但其中tryAcquire
有点特殊,其实现如下:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
默认情况下直接抛异常,因此必须要在子类中复写,也就是说真正的获取锁的逻辑由子类同步器自己实现
。
看ReentrantLock
中tryAcquire
的实现(非公平锁)如下:
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();// 获取当前线程
int c = getState();// 获取state(同步状态)的值
if (c == 0) {// state =0 说明当前是无锁状态
if (compareAndSetState(0, acquires)) {// 通过CAS操作替换state的值为acquires
setExclusiveOwnerThread(current);// 设置当前线程独占锁
return true;
}
}else if (current == getExclusiveOwnerThread()) {// 如果有线程持有该锁并且是同一线程获取锁,则直接增加重入次数,即:实现重入锁,不然会死锁
int nextc = c + acquires;// 增加重入次数
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
- 获取当前线程,判断当前的锁的状态,
state =0
说明当前是无锁状态; - 如果
state=0
表示当前是无锁状态,通过cas
操作更新state
状态的值,返回 true; - 如果当前锁属于重入锁并且是同一线程,则增加重入次数,返回 true;
- 上述情况都不满足,则获取锁失败返回 false。
最后用一张图表示ReentrantLock.lock
过程:
从图中我们可以看到:
- 如果
CAS
操作修改state(锁状态)失败说明有线程占用该锁,那么将调用tryAquire
方法再次尝试获取锁。 - 如果
tryAcquire
尝试获取锁失败,则调用addWaiter
将当前线程添加到一个等待队列中。 - 后续将调用
acquireQueued
方法处理加入到队列中的节点,通过自旋去尝试获取锁
,根据前驱节点的waitStatus的情况将线程挂起或者取消。 - 当持有锁的线程释放锁会将指定线程唤醒,那么线程依然会重新调用tryAcquire获取锁,即:
自旋去尝试获取锁
。
在ReentrantLock
执行lock
的过程中,大部分同步机制的核心逻辑都已经在 AQS 中实现,ReentrantLock
自身只要实现某些特定步骤下的方法即可,这种设计模式叫作模板模式
。比如Activity 的生命周期的执行流程都已经在 framework 中定义好了,子类 Activity 只要在相应的 onCreate、onPause 等生命周期方法中提供相应的实现即可。
AQS 核心功能原理分析
首先看下 AQS 中几个关键的属性,如下所示:
static final class Node {}
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
在AQS 中有两个非常重要的属性 Node(队列节点)和 state(锁同步状态)。
state 锁状态或同步状态
state
表示当前锁状态或同步状态。当 state = 0 时表示无锁状态;当 ·state>0·时,表示已经有线程持有了锁,也就是state=1
,如果同一个线程多次获得同步锁的时候,state
会递增,比如重入 5 次,那么 state=5
。 而在释放锁的时候,同样需要释放 5 次直到state=0
,其他线程才有资格获得锁。
state
还有一个功能是实现锁的独占模式或者共享模式。
独占模式:只有一个线程能够持有同步锁。
比如在独占模式
下,我们可以把state
的初始值设置成 0,当某个线程申请锁对象时,需要判断 state 的值是否为 0,如果不是 0 的话意味着其他线程已经持有该锁,则本线程需要轮询获取锁
或阻塞等待
。共享模式:可以有多个线程持有同步锁。
在共享模式下的道理也差不多,比如说某项操作我们允许 10 个线程同时进行,超过这个数量的线程就需要阻塞等待。那么只需要在线程申请对象时判断 state 的值是否小于 10。如果小于 10,就将 state 加 1 后继续同步语句的执行;如果等于 10,说明已经有 10 个线程在同时执行该操作,本线程需要阻塞等待。
Node 双端队列节点
Node 是一个先进先出的双端队列,并且是等待队列,当多线程争用资源被阻塞时会进入此队列。这个队列是 AQS 实现多线程同步的核心。
从之前 ReentrantLock 图中可以看到,在 AQS 中有两个 Node 的指针,分别指向队列的 head 和 tail。
Node 的主要结构如下:
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
volatile int waitStatus;// Node中线程的状态,和state作用是不一样的
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile Node prev; // 前驱节点
volatile Node next; // 后继节点
volatile Thread thread;
}
默认情况下,AQS 中的head和tail是为null的。
获取锁失败后续流程分析
其实AQS是在竞争锁失败后才会起到很大的作用。
锁的意义就是使竞争到锁对象的线程执行同步代码,多个线程竞争锁时,竞争失败的线程需要被阻塞等待后续唤醒。那么ReentrantLock
是如何实现让线程等待并唤醒的呢?
前面提到在ReentrantLock.lock
阶段,在 acquire() 方法中会先后调用tryAcquire、addWaiter、acquireQueued
这 3 个方法来处理。
tryAcquire
在ReentrantLock
中被复写并实现了,如果返回 true 说明成功获取锁,就继续执行同步代码语句。可是如果tryAcquire
返回 false,也就是当前锁对象被其他线程所持有,那么当前线程会被 AQS 如何处理呢?
addWaiter 方法
首先当前获取锁失败的线程会被添加到一个等待队列的末端tail,具体源码如下:
/*** 将线程以Node方法是添加队列末端*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);// 把当前线程封装到一个新的Node中
Node pred = tail;
if (pred != null) {// 将Node插入队列中
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS替换当前AQS的tail属性
pred.next = node;// 修改队列节点
return node;
}
}
enq(node);// 插入队列失败,进入enq自旋重试插入队列
return node;
}
/*** 把节点插入队列中,如果队列未初始化则初始化,然后在插入新的Node*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))// 创建一个新的Node,通过CAS操作设置给队列头部head
tail = head;// 因为是只有一个线程所以头尾相同 tail = head
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
有两种情况会致使插入队列失败:
1)tail 为空:说明队列从未初始化,因此需要调用 enq 方法在队列中插入一个空的 Node;
2)compareAndSetTail
失败:说明插入过程中有线程修改了此队列,因此需要调用 enq 将当前 node 重新插入到队列末端。
经过 addWaiter 方法之后,此时线程以 Node 的方式被加入到队列的末端,但是线程还没有被执行阻塞操作,真正的阻塞操作是在下面的acquireQueued
方法中判断执行。
acquireQueued 方法
在acquireQueued
方法中并不会立即挂起该节点中的线程,因此在插入节点的过程中,之前持有锁的线程可能已经执行完毕并释放锁,所以这里使用自旋再次去尝试获取锁
。如果自旋操作还是没有获取到锁!那么就将该线程挂起(阻塞),该方法的源码如下:
/*** 在一队列的Node节点通过此方法获得锁*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;// 标记是否成功拿到资源
try {
boolean interrupted = false;// 标记等待过程中是否中断过
for (; ; ) {// 开始自旋,要么获取锁,要么中断
final Node p = node.predecessor(); // 得到当前节点的前驱节点
/*
* 检测当前节点的前驱节点是不是head节点,这是获取锁的资格,双端队列第二个节点才是真实数据
* 如果是的话,调用tryAcquire尝试获取锁,成功 ,那么将当前节点设置为head节点,也就是说当前节点拿到锁了,将其设置为虚节点
*/
if (p == head && tryAcquire(arg)) {
setHead(node);// 拿到锁,则将当前节点设置为队列的head节点,从而出队(head虚拟节点),还清空了未使用的字段。
p.next = null; // help GC
failed = false;
return interrupted;
}
/**
* 如果为成功获取锁,则根据前驱节点判断是否需要阻塞。
*
* shouldParkAfterFailedAcquire方法在前驱节点状态不为SIGNAL的情况下会循环重试获取锁
*/
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//若在等待队列中的Node拿到锁了,则将当前节点设置为队列的head节点,从而出队(head虚拟节点),head是虚拟节点不存储任何数据信息,还清空了未使用的字段。
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
可以看出在 shouldParkAfterFailedAcquire 方法中会判读当前线程是否应该被挂起,其代码如下:
/**
* 根据前驱节点的 waitStatus 来判断是否需要阻塞当前线程
* 返回true 代表需要阻塞当前线程
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)//-1 如果是SIGNAL,返回true 将当前线程挂起。
return true;
if (ws > 0) {//CANCELD
do {
// 前驱节点为CANCELD,向前遍历,更新当前节点的前驱节点为往前第一个不为CANCELD的节点
// 并且之后会回到循环再次重试获取锁
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {//0 -3 -2
// 等待状态为 0 -3 -2 ,设置前驱节点 的状态为 SIGNAL
// 并且之后会回到循环再次重试获取锁
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
首先获取前驱节点的 waitStatus
值,Node 中的 waitStatus
一共有 5 种取值,分别代表的意义如下:
- CANCELLED (1): 当前节点线程因为超时或者中断被取消。这是一个终结态,也就是状态到此为止
- SIGNAL (-1): 当前节点线程的后继线程被阻塞或者即将被阻塞,当前线程释放锁或者取消后需要唤醒后继线程。这个状态一般都是后继线程来设置前驱节点的。
- CONDITION (-2): 当前线程在 condition 队列中
- PROPAGATE (-3): 用于将唤醒后继线程传递下去,这个状态的引入是为了完善和增强共享锁的唤醒机制。在一个节点成为头节点之前,是不会跃迁为此状态的。
接下来根据 waitStatus 不同的值进行不同的操作,主要有以下几种情况:
- 如果前驱节点waitStatus 等于 SIGNAL,返回 true 将当前线程挂起,等待后续唤醒操作即可。
- 如果 前驱节点waitStatus 大于 0 也就是 CANCLE 状态,会将此前驱节点从队列中删除,并在循环中逐步寻找下一个不是“CANCEL”状态的节点作为当前节点的前驱节点。
- 如果前驱节点 waitStatus 既不是 SIGNAL 也不是 CANCEL,则将当前节点的前驱节点状态设置为 SIGNAL,这样做的好处是下一次执行 shouldParkAfterFailedAcquire 时可以直接返回 true,挂起当前线程。
代码再回到acquireQueued
中,如果shouldParkAfterFailedAcquire
返回 true 表示线程需要被挂起,那么会继续调用parkAndCheckInterrupt
方法执行真正的阻塞线程代码,具体如下:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//LockSupport.java
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
这个方法比较简单,只是调用了 LockSupport 中的 park 方法。在 LockSupport.park方法中调用了 Unsafe API 来执行底层 native 方法将线程挂起。
获取锁失败的的大体流程总结如下:
- AQS 的模板方法
acquire
通过调用子类自定义实现的tryAcquire
获取锁; - 如果获取锁失败,通过
addWaiter
方法将线程构造成 Node 节点插入到同步队列队尾; - 在
acquirQueued
方法中以自旋的方法尝试获取锁,如果失败则判断是否需要将当前线程阻塞,如果需要阻塞则最终执行 LockSupport(Unsafe) 中的 native API 来实现线程阻塞。当被唤醒时又通过自旋tryAcquire
再次获取锁。
释放锁流程分析
在上面加锁阶段被阻塞的线程需要被唤醒过后才可以重新执行。那具体 AQS 是何时尝试唤醒等待队列中被阻塞的线程呢?
同加锁过程一样,释放锁需要从 ReentrantLock.unlock
方法开始:
public void unlock() {
sync.release(1);
}
//AbstractQueueSynchronizer.java
public final boolean release(int arg) {
if (tryRelease(arg)) {// 上边自定义的tryRelease如果返回true,说明该锁没有被任何线程持有
Node h = head;
// 头结点不为空并且头结点的waitStatus不是初始化节点情况,解除线程挂起状态
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
// Sync.java
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
这里的判断条件为什么是h != null && h.waitStatus != 0?
- h == null Head还没初始化。初始情况下,head == null,第一个节点入队,Head会被初始化一个虚拟节点。所以说,这里如果还没来得及入队,就会出现head == null 的情况。
- h != null && waitStatus == 0 表明后继节点对应的线程仍在运行中,不需要唤醒。
- h != null && waitStatus < 0 表明后继节点可能被阻塞了,需要唤醒。
可以看出,首先调用tryRelease
方法尝试释放锁,tryRelease
方法是子类同步器实现的,如果成功最终会调用 AQS 中的 unparkSuccessor 方法来实现释放锁的操作。unparkSuccessor 的具体实现如下:
/**
* 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
*
* 若在等待队列中的Node拿到锁了,则将当前节点设置为队列的head节点,从而出队(head虚拟节点),head是虚拟节点不存储任何数据信息,还清空了未使用的字段。
*
* @param head 实际上传入的是 head 节点 ,也就是当前节点 拿到锁的节点
*/
private void unparkSuccessor(Node head) {
//获取head的waitStatus
int ws = head.waitStatus;
// 双向链表中,第一个节点为虚节点,其实并不存储任何信息,只是占位。真正的第一个有数据的节点,是在第二个节点开始的。
if (ws < 0)// 不是CANCELD状态
compareAndSetWaitStatus(head, ws, 0);//将head的waitStatus重置为原始状态0
Node s = head.next;//获取当前head节点的下一节点,head是虚拟节点不存储任何数据信息
if (s == null || s.waitStatus > 0) {//如果当前节点的next节点s为null 或者 next节点状态是CANCELD
s = null;
// 就行队列尾部开始找到队列首部,找到队列第一个waitStatus<0的节点
for (Node t = tail; t != null && t != head; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果当前节点的下个节点不为null,而且waitStatus<0,就把当前节点的next节点unpark,唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
首先获取当前节点(实际上传入的是 head 节点)的状态,若head waitStatus<0则将waitStatus重置为0,若 head 节点的下一个节点是 null,或者下一个节点的状态为 CANCEL,则从等待队列的尾部开始遍历,直到寻找到第一个 waitStatus<0 的节点。
如果最终遍历到的节点不为 null,再调用 LockSupport.unpark 方法,调用底层方法唤醒线程。 至此,线程被唤醒的时机也分析完毕。
最后看一下AQS机制的工作流程图
理解CAS
在AQS中不管是在加锁还是释放锁阶段,多次提到了一种通用的操作:compareAndSetXXX。这种操作最终会调用 Unsafe 中的 API 进行 CAS 操作。
CAS 全称是 Compare And Swap,译为比较和替换,是一种通过硬件实现并发安全的常用技术,底层通过利用 CPU 的 CAS 指令对缓存加锁
或总线加锁
的方式来实现多处理器
之间的原子操作
。
CAS的实现过程主要有 3 个操作数:内存值 V,旧的预期值 E,要修改的新值 U,当且仅当预期值 E和内存值 V 相同时,才将内存值 V 修改为 U,否则什么都不做。
CAS 底层会根据操作系统和处理器的不同来选择对应的调用代码,以 Windows 和 X86 处理器为例,如果是多处理器,通过带 lock 前缀的 cmpxchg 指令对缓存加锁或总线加锁的方式来实现多处理器之间的原子操作;如果是单处理器,通过 cmpxchg 指令完成原子操作。
CAS是存在以下缺点:
1)ABA 问题。一般通过加版本戳,如JDK提供的 AtomicStampedReference
2)只能保证共享变量原子性操作。通过将多个变量封装成对象,使用AtomicReference
修改即可。
自定义 AQS
理解了 AQS 的设计思路,接下来我们就可以通过自定义 AQS 来实现自己的同步实现机制。
public class CustomAQSLock {
private final AQSLock aqsLock;
public CustomAQSLock() {
this.aqsLock = new NonFairAQSLock();
}
static abstract class AQSLock extends AbstractQueuedSynchronizer {
public abstract void lock();
@Override
protected boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
@Override
protected boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
static class NonFairAQSLock extends AQSLock {
@Override
public void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
public void lock() {
aqsLock.lock();
}
public void unlock() {
aqsLock.release(1);
}
}
代码中的CustomAQSLock是一个非公平独占锁,通过使用 CustomAQSLock 也能实现同 synchronized 和 ReentrantLock 相同的功能。比如如下代码:
public class Main_CustomSync {
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
CusThread cusThread = new CusThread();
cusThread.setName("Thread-" + 1);
cusThread.start();
}
}
private static int count;
private static CustomAQSLock lock = new CustomAQSLock();
static class CusThread extends Thread {
@Override
public void run() {
setCount();
}
private void setCount() {
try {
lock.lock();
count++;
System.out.println(Thread.currentThread().getName() + " CusThread count = " + count);
} finally {
lock.unlock();
}
}
}
}
最终打印的 count 值为 1 2 3 4 5 6 7 8 9 10,说明两个线程之间是线程安全的同步操作。
总结
总体来说,AQS 是一套框架,在框架内部已经封装好了大部分同步需要的逻辑,在 AQS
内部维护了一个状态指示器 state
和一个等待队列 Node
,而通过 state 的操作又分为两种:独占式和共享式,这就导致 AQS 有两种不同的实现:独占锁(ReentrantLock 等)和分享锁(CountDownLatch、读写锁等)。
通俗讲: AQS
内部维护了一个state状态指示器
,state
表示线程是否已经得到锁,和一个Node等待双端队列
,Node封装了线程
和waitStatus
,当队列中某个线程获得了锁,那么就把该Node设置成head,双端队列的head不保存数据的。
几个有可能需要子类同步器实现的方法如下。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回 true,失败则返回 false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回 true,失败则返回 false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0 表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回 true,否则返回 false。