什么是队列同步器?
AQS队列同步器作为构建锁或者其他同步组件的基础框架。使用int型的成员变量来表示同步的状态,如果线程成功获取了锁,那么同步状态值为1。如果线程释放了锁,那么同步状态值变为0。如果获取了同步状态线程以及等待状态等信息被封装成了Node节点,而这些Node节点采用先进先出的队列来进行排队管理。
继承与实现关系
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable
同步队列节点Node的数据结构
双端队列-数据结构图:

解释:对于这个链式队列结构,没有成功获取同步状态的线程将会成为节点加入到队列的尾部,而队列中的节点获取锁资源是从头部开始的。
节点Node的等待状态waitStatus

static final class Node {
/*
*指示节点在共享模式中等待的标记.
*/
static final Node SHARED = new Node();
/*
*指示节点在独占模式中等待的标记.
*/
static final Node EXCLUSIVE = null;
/*
*由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
*/
static final int CANCELLED = 1;
/*
*后继结点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,
*将会通知后继节点,使后继节点的线程得以运行
*/
static final int SIGNAL = -1;
/*
*节点在等待队列中,节点线程等待在Condition上,
*当其他线程对Condition调用了Signal()方法后,该节点将会从等待队列转移到同步队列中,加入到对同步状态的获取中
*/
static final int CONDITION = -2;
/*
*表示下一次共享式同步状态获取将会无条件地被传播下去
*/
static final int PROPAGATE = -3;
//线程的等待状态
volatile int waitStatus;
/**
* 前驱节点,当节点加入到队列尾部时被设置(尾部添加)
*/
volatile Node prev;
/**
* 后继节点
*/
volatile Node next;
/**
* 获取同步状态的线程
*/
volatile Thread thread;
/**
* 等待队列中的后继结点。
* 如果当前节点是共享的,那么这个字段将是一个SHARED常量,也就是节点类型(独占和共享)和等待队列中的后继结点共用同一个字段
*/
Node nextWaiter;
/**
* 等待的节点是否共享
*/
final boolean isShared() { return nextWaiter == SHARED;}
/**
* 获取前驱节点,但需要判断下前驱节点是否为空,否则将会抛出空指针异常
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null) throw new NullPointerException();
else return p;
}
Node() { // 用于建立初始的头结点和共享标记}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
属性
private transient volatile Node head;
说明:等待队列的头结点,采用懒初始化的方式(意思就是只有在需要头结点的时候才进行初始化)。 如果头结点存在,那么waitStatus状态就不会被标记位CANCELLED
private transient volatile Node tail;
说明:等待队列的尾节点,采用懒初始化的方式。 使用enq方法进行添加新的节点或修改节点。
private volatile int state;
说明:同步状态值。
原子方法
getState() 方法:获取同步状态的当前值
/**
* 获取同步状态的当前值。操作具有volatile读的语义。
*/
protected final int getState() {
return state;
}
setState() 方法:设置同步状态的值
/**
* 设置同步状态的值。操作具有volatile写的语义。
*/
protected final void setState(int newState) {
state = newState;
}
compareAndSetState() 方法:如果当前状态值等于预期值,那么将以原子的方式将当前同步状态的值更新给定值
/**
* 如果当前状态值等于预期值,那么将以原子的方式将当前同步状态的值更新给定值。
* 这个操作具有volatile读写语义。
* 原子性指要么更新过程成功,要么过程中断,还是原样。
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
解释:这个方法采用cas原则来保证状态设置的原子性,cas本身采用乐观锁的方式来实现的,从而不会产生线程的阻塞问题。由于采用volatile读写语义,那么线程访问是保持一致性的。
理由:因为volatile实现原则是将缓存中的数据写入到主存中的。所以每个线程读写的数据都是从主存中获取来的,而不是每个线程缓存的数据,所以保证了一致性。
需要明确的问题:
(1)问题 :乐观锁和CAS的区别?
首先乐观锁的思想是读多写少,认为并发写的可能性不高。
乐观锁的策略:
① 就是在写之前先读出版本号,然后加锁,保证当前的读出来的值是具有原子性(未被修改)。
② 然后比较当前加锁的版本号和取时的版本号是否一样,如果一样,那么说明别人没有写入值,那么我们就更新值,然后更新版本号。
③ 如果不一样,那么说明在我们取值加锁之后别人更新了值。我们就要重新读取版本号,然后加锁,比较版本号,直到当前加锁的版本号和取时的版本号不一样,才进行写入更新值,并更新版本号。
CAS的策略:
比较当前值和传入的期望值是否一样,如果一样,那么就更新当前值为需要更新的值。CAS只是一种乐观锁的实现而已。
(2)问题 :乐观锁的版本号和 “ABA”问题?
CAS虽然在前面提到了可以高效的解决了并发的原子操作,但是同样也存在很多问题。
"ABA"问题:CAS操作就是检查值是否发生变化,如果发生变化则重新读取检查,如果没有发生变化则更新值。但是可能会出现一个值原来为A,被一个线程读取后更新为B,然后又被另一个线程读取更新为A,那么其他线程过来进行检查更新时,发现值并没有发生变化,但是实际上前面已经被更新过了,那么就采用版本号1A -> 2B -> 3A ,这样1A和3A由于版本号就不会出现发生变化却使其他线程未检查出发生变化。
(3)问题:同步状态值(state)干什么用的?
/**
* 同步状态值
*/
private volatile int state;
①首先在多线程竞争的条件下,采用CAS的方式来获取和设置同步状态值(state)。
②同步状态值state代表获取锁的线程加锁的次数,如果线程获取锁,那么state加1变为1。如果线程释放锁,那么state减1变为0。
③volatile实现原则还是将缓存中的数据写入到主存,每个线程都是从主存中读取值。保证了数据的一致性。
AQS独占式模式
tryAcquire()方法:查询是否允许它在独占模式下获取对象状态,如果允许则获取它。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
说明:ReentrantLock重入锁对tryAcquire方法进行了重写;所以tryAcquire方法要结合其他类来看,因为在aqs中tryAcquire方法没有进行实现。
acquire(int arg)方法:以独占模式获取对象,忽略中断。
/**
* 以独占模式获取对象,忽略中断。
*/
public final void acquire(int arg) {
//如果没获取到队列的状态并且不停循环的方式获取线程的结果中断失败标志为true
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//中断线程
selfInterrupt();
}

addWaiter方法:为当前线程和给定的模式创建节点。
/**
* 为当前的线程和给定的模式创建节点
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
//获取尾节点
Node pred = tail;
//如果尾节点不为空
if (pred != null) {
//设置创建节点的前驱为尾节点
node.prev = pred;
//判断当前状态值是否等于预期的尾节点pred,
//如果等于就将当前同步状态值更新尾节点为node
if (compareAndSetTail(pred, node)) {
//尾节点的后继结点为node
pred.next = node;
return node;
}
}
//将节点插入到队列
enq(node);
return node;
}
解释:
注意:方法compareAndSetTail采用原子的方式进行更新尾节点。
enq方法:将节点插入到同步队列尾部。
/**
* 将节点插入到队列
*/
private Node enq(final Node node) {
for (;;) {
//获取尾节点为t
Node t = tail;
//如果尾节点为空
if (t == null) {
/*
* 初始化一个头结点作为尾节点t
* 如果节点node第一次入队列,会创建新的节点new node()作为头节点
* 而此时队列的尾节点和头结点是同一个节点
*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
//设置节点node的前驱为t
node.prev = t;
//判断node节点是否是预期的尾节点t,如果是就更新尾节点t为node
if (compareAndSetTail(t, node)) {
//尾节点的后继结点为node
t.next = node;
return t;
}
}
}
}
acquiredQueued方法:以死循环的方式不停的获取同步状态,如果获取到返回true,否则false。
/**
* 以死循环的方式不停的获取同步状态
* 如果获取不到则阻塞节点中的线程,
* 而被阻塞线程只能依靠其前驱节点的出队列操作或者阻塞线程中断
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//获取节点node的前驱节点p
final Node p = node.predecessor();
/* 如果p为头节点,并且已经成功获取了同步对象
* 获取成功就将队列头结点取出,
* 将头结点的后继结点作为头结点,
* 删除头结点与后继结点的关系
*/
if (p == head && tryAcquire(arg)) {
//设置头节点为node
setHead(node);
//前驱节点p(头结点)的后继结点设置为null
p.next = null; // help GC
//失败标志设置为false
failed = false;
//返回中断失败标志false
return interrupted;
}
//检查并更新无法获取的节点的状态并且检查是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//设置中断失败标志为true
interrupted = true;
}
} finally {
//如果中断标志位为true
if (failed)
//取消正在尝试执行的请求
cancelAcquire(node);
}
}
问题:为什么死循环中,只有前驱节点为头节点才能够尝试获取同步状态?
首先清楚的是头节点是成功获取同步状态的节点。当头节点的线程释放了同步状态之后,将唤醒其后继节点,后继节点唤醒时也得检查其前驱节点是否为头节点。(而先进先出队列本身就是这种前驱后继关系,不可能当前节点的前驱节点没有被唤醒获取同步状态,就直接跳到当前节点。其次如果当前节点的前驱节点不是头节点这个限制条件,那么当前节点的前驱节点还有其前驱节点(就是其前前驱节点没有被唤醒获取同步状态),还是不满足从队列头开始获取同步状态)。
tryRelease(int arg) :在独占模式下,尝试释放对象的状态。
//独占模式下,尝试释放对象的状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
release方法的流程图:

release(int arg) :在独占模式下,释放对象的状态。
/**
* 释放对象的状态
*/
public final boolean release(int arg) {
// 是否允许尝试在独占模式下释放对象的状态
if (tryRelease(arg)) {
// 获取队列同步器的头结点
Node h = head;
// 如果头结点不为null并且头结点的等待状态不为0
if (h != null && h.waitStatus != 0)
// 唤醒头结点的后继结点
unparkSuccessor(h);
return true;
}
return false;
}
unparkSuccessor(Node node):唤醒输入节点node的后继节点。
/**
* 唤醒节点的后继结点
*/
private void unparkSuccessor(Node node) {
// 获取线程节点的等待状态值
int ws = node.waitStatus;
// 如果等待状态值小于0
if (ws < 0)
// 使用CAS方式比较更新设置节点的等待状态值为0
compareAndSetWaitStatus(node, ws, 0);
// 获取当前节点的后继结点
Node s = node.next;
// 如果后继结点为null或者后继节点的等待状态值大于0
if (s == null || s.waitStatus > 0) {
// 设置当前节点的后继后继结点为null
s = null;
// 从尾节点向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 节点的等待状态小于等于0(需要执行)
if (t.waitStatus <= 0)
s = t;
}
// 让查找到的线程节点可用
if (s != null)
LockSupport.unpark(s.thread);
}
共享式同步状态的获取
tryAcquireShared() 方法:查询是否允许在共享模式下获取对象的状态,如果允许,那么就获取它。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
acquireShared() 方法:共享模式获取对象,忽略中断。

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
doAcquireShared() 方法:在共享模式下,以死循环的方式不停地获取同步状态。
private void doAcquireShared(int arg) {
// 获取共享模式下创建的节点
final Node node = addWaiter(Node.SHARED);
// 失败标志位默认为true
boolean failed = true;
try {
// 中断标志位默认为false
boolean interrupted = false;
for (;;) {
// 获取节点的前驱节点p
final Node p = node.predecessor();
// 如果p为头结点
if (p == head) {
// 是否允许在共享模式下获取对象
int r = tryAcquireShared(arg);
if (r >= 0) {
// 设置队列的头结点
setHeadAndPropagate(node, r);
p.next = null; // help GC
// 如果中断标志位为true,则中断当前线程
if (interrupted)
selfInterrupt();
// 设置失败标志位为false
failed = false;
return;
}
}
// 检查并更新无法获取的节点的状态并且检查是否中断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 设置中断失败标志为true
interrupted = true;
}
} finally {
// 如果中断标志位为true
if (failed)
// 取消正在尝试执行的请求
cancelAcquire(node);
}
}
解释:在共享模式获取自旋过程中,成功获取同步状态并退出自旋的条件是tryAcquireShared() 方法返回值大于等于0。如果当前节点的前驱节点为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。
tryReleaseShared(int arg)方法:在共享模式下,尝试释放对象的状态。
// 共享模式下尝试释放对象的状态
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
releaseShared(int arg)方法:在共享模式下,释放对象状态。
// 尝试在共享模式下释放对象的状态
public final boolean releaseShared(int arg) {
// 是否允许在共享模式下获取对象的同步状态
if (tryReleaseShared(arg)) {
// 在共享模式下释放对象的状态
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()方法:在共享模式下,释放对象。
/**
* 在共享模式下释放对象
*/
private void doReleaseShared() {
// 死循环
for (;;) {
// 获取头结点
Node h = head;
// 如果头节点不为null并且头结点不等于尾节点
if (h != null && h != tail) {
// 获取头结点的等待状态值
int ws = h.waitStatus;
// 如果等待状态值为-1,那么通知等待的后继线程节点
if (ws == Node.SIGNAL) {
/**
* 使用CAS方式来比较线程节点h的等待状态值为-1,
* 那么将h的等待状态值更新为0(就是h还在同步队列中)
*/
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
// 唤醒后继结点h
unparkSuccessor(h);
}// 如果等待状态值为0(h节点在同步队列中)并且节点h的等待状态值为0,那么就将等待状态值设置为-3
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // 如果循环过程中,头结点发生改变,中断循环
break;
}
}
其他AQS方法
// 在独占模式下,获取队列中等待的线程集合
public final Collection<Thread> getExclusiveQueuedThreads() {
// 创建一个线程类型数组列表
ArrayList<Thread> list = new ArrayList<Thread>();
// 从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
// 线程节点的状态是否处于独占状态
if (!p.isShared()) {
// 获取独占状态的节点线程
Thread t = p.thread;
// 如果线程不为null,那么就将线程加入到数组列表中
if (t != null)
list.add(t);
}
}
return list;
}
// 获取队列中等待的线程数
public final int getQueueLength() {
int n = 0;
// 从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
// 如果线程节点对应的线程不为null,就加1
if (p.thread != null)
++n;
}
return n;
}
// 在共享模式下,获取队列中等待的线程集合
public final Collection<Thread> getSharedQueuedThreads() {
// 创建一个线程类型数组列表
ArrayList<Thread> list = new ArrayList<Thread>();
// 从尾节点开始向前遍历
for (Node p = tail; p != null; p = p.prev) {
// 节点处于共享状态
if (p.isShared()) {
// 获取节点的线程
Thread t = p.thread;
// 如果线程不为null,就将其加入到数组列表中
if (t != null)
list.add(t);
}
}
return list;
}
// 查看是否其他线程节点也正要获取同步状态
public final boolean hasContended() {
return head != null;
}
// 队里中是否存在正要获取同步状态的线程节点
public final boolean hasQueuedThreads() {
return head != tail;
}
// 对于当前正调用的线程,是否是以同步的方式独占的运行
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
// 线程节点是否已经加入到了队列
public final boolean isQueued(Thread thread) {
// 如果线程为null,那么抛出空指针异常
if (thread == null)
throw new NullPointerException();
// 从尾节点开始遍历查找
for (Node p = tail; p != null; p = p.prev)
// 如果节点对应的线程等于查找的线程,返回true
if (p.thread == thread)
return true;
return false;
}
阅读总结
(1)AQS是采用双向链队列这种数据结构实现的。
(2)AQS中包含一个同步双端队列和一个等待双端队列。
(3)AQS没有获取同步状态的线程节点都加入到队列的尾部,AQS是从头节点开始获取锁资源。
(4)AQS独占模式:
4.1 获取线程对象acquire
① 首先是否允许获取对象的同步状态,如果允许,那么直接中断当前线程。如果不允许,那么就执行②。
② 将当前线程封装成一个node节点,插入到同步队列的尾部,然后以原子的方式更新尾节点。然后执行③
③ 以死循环的方式不停的获取同步状态,如果获取不到就只能阻塞,等到其前驱节点出队列或者线程节点中断才可以获取同步状态。
4.2 释放线程对象release
① 首先是否允许释放对象的同步状态,如果不允许直接返回false,如果允许执行②。
② 获取当前队列的头节点,如果头节点不为空或者等待状态不为初始化状态,那么唤醒头节点的后继节点。
(5)AQS共享模式:
5.1 获取线程对象doAcquireShared
① 首先是否允许获取对象的同步状态,如果不允许,那么执行②
② 在共享模式下创建节点node,然后采用死循环的方式不停获取对象的同步状态,如果节点node的前驱节点p为头结点并且允许获取对象的同步状态,将创建的节点node设置为头结点,并且断开与前驱节点的关系。
5.2 释放线程对象releaseShared
① 首先是否允许释放对象的同步状态,如果不允许直接返回false,如果允许执行②。
② 以死循环的方式获取对象的同步状态,如果头结点不为空、头节点不等于尾节点、头结点的等待状态为唤醒后继节点,那么唤醒头节点的后继节点。
---------------------------该源码为jdk1.7版本的