AbstractQueuedSynchronizer 队列同步器是用来构建锁或者其他同步组件的基础框架,它使用int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作,并发包的作者期望它能够成为实现大部分同步需求的基础。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的三个方法(getState,setState,compareAndSetState)来进行操作因为他们能够保证状态的改变是安全的。继承AQS的子类推荐被定义为同步组件的静态内部类,AQS自身并没有实现任何同步接口,它仅仅是定义若干同步状态获取和释放的方法供自定义同步组件使用,AQS可以支持独占式的获取同步状态共享式的获取同步状态。
在锁的实现中聚合同步器,利用同步器实现锁的语义。锁是面向使用者的,定义了使用者与锁的交互接口,隐藏实现细节;同步器面向的是锁的实现者,简化锁的实现方式,屏蔽同步状态的管理,线程排队,等待,唤醒等底层操作。
同步器的设计基于模版方法,同步组件的实现者需要继承同步器并重写指定的抽象方法,调用同步器提供的模版方法的时候,组件重写的方法将被调用。
接下来分析同步器是如何完成线程同步的,主要包括:同步队列,同步状态获取与释放。
同步队列
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败后,同步器会将当前线程以及等待状态等信息封装为Node节点,加入同步队列中,同时会阻塞当前线程,让同步状态释放,会把首节点,会把首节点唤醒,使其再次尝试获取同步状态。
Node节点中存放线程引用,等待状态,前驱,后继节点
Node节点属性如下:
-
waitStatus
CANCELLED = 1 由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
SIGNAL = -1 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
CONDITION = -2 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用single方法后,该节点将从等待队列中转移到同步队列中,加入到对同步状态的获取中
PROPAGATE = -3 表示下一次共享式同步状态获取将会无条件地被传播下去
INITIAL =0 初始状态
Node prev 前驱节点,当节点加入同步队列时被设置
Node next 后继节点
Node nextWaiter 等待队列中的后继节点。如果当前节点是共享的,那么这个字段是一个SHARED常量,也就是说节点类型(独占式,共享式)和等待队列中的后继节点公用同一个字段
Thread thread 获取同步状态的线程
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;
同步器可以重写的方法:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
共享式释放同步状态
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
同步器提供的模版方法:
独占式获取同步状态,成功就从该方法返回,失败的话当前线程信息及等待信息就被包装为Node节点放入同步队列中
public final void acquire(int arg) {
//调具体同步组件实现的方法获取同步状态,失败后将当前线程封装为等待获取同步状态的Node节点放入同步队列中
if (!tryAcquire(arg) &&
//由于添加节点操作时enq函数中是循环进行的,成功入队后需要acquireQueued再检查
acquireQueued(
addWaiter(Node.EXCLUSIVE)//添加节点到同步队列
, arg))
selfInterrupt();
}
//如上模版方法中
tryAcquire 由各个同步组件自己实现
private Node addWaiter(Node mode) {
//将当前线程以等待获取同步的状态构建Node节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//尾节点不为null说明队列中存在节点
//将尾节点赋值给pred
//CAS将新节点设置尾部节点,设置成功则将当前节点赋值给历史尾节点的后继节点,失败说明有别的线程入队了
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//进行相关队列初始化
enq(node);
return node;
}
--------
//CAS将新节点设置为尾节点
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
--------
//进行相关队列初始化
//如果队列为空,首先将新节点设置为头节点,并将首节点赋值给尾节点,第二波循环的时候,设置尾节点为当前节点的前驱节点,CAS设置新节点为尾节点,并将当前节点设置为前驱节点的后继节点
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//尾节点为空,CAS新节点为head节点,并将首节点赋值给尾节点,首尾节点相同
if (t == null) { // Must initialize
//尾节点为null,说明是第一个节点入队,需要一个哨兵节点 new Node()(和线程无关的节点)
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾节点不为null(可能是第一次加的哨兵节点也可能是真实的线程节点),将尾节点设置为当前节点的前驱节点,CAS当前节点为尾部节点,CAS成功则将当前节点设置为历史尾节点的后继节点,CAS失败意味着当前队列尾节点变了--其他线程加入队列成功,则继续循环设置,直到将当前节点放入同步队列,并将当前节点设置为历史尾节点的后继节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
////入参为成功入同步队列的当前节点和获取同步状态的参数。---addWaiter添加节点到同步队列的方法中的enq方法通过循环CAS操作将并发的入队操作串行化,保证节点入队,节点入队后就进入自旋状态(阻塞当前线程),当条件满足时获取同步状态并退出同步队列
final boolean acquireQueued(final Node node, int arg) {//方法名称也表示该方法是条件满足时获取同步状态退出同步队列,条件不满足时自旋阻塞当前线程
boolean failed = true;
try {
boolean interrupted = false;
//入队后的节点在死循环中尝试获取同步状态,
for (;;) {
//返回当前节点的前驱节点,,如果前驱节点为null 抛出NEP---上面enq方法的循环操作保证节点前驱节点不会为null,enq方法保证第一个节点进入同步队列的时候,head,tail,prev,next节点为同一个节点,
final Node p = node.predecessor();
//只有前驱节点是头节点的当前节点才能尝试获取同步状态,因为1 头节点是成功获取到同步状态的节点,而头节点的的线程释放同步状态后会唤醒其后继节点,后继节点被唤醒后需要检查自己的前驱节点是否是头节点;2 维护同步队列的FIFO原则
if (p == head && tryAcquire(arg)) {
//头节点的后继节点获取同步状态成功后,将当前节点设置为头节点,将历史头节点移除FIFO队列(方式就是将历史头节点的后继节点置为null)所以头节点是获取同步状态成功的节点,下一个获取同步状态的节点就是头节点的后继节点,这里就理解了上面if中的条件,
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//循环获取同步状态失败时的处理,
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//入参为当前节点前驱节点和当前插入节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//检查前驱节点等待状态,
int ws = pred.waitStatus;
//如果是SIGNAL表示当前节点等待被唤醒,当前节点前驱节点执行介绍或被取消时唤醒当前节点
if (ws == Node.SIGNAL)
return true;
//当前节点前驱节点被取消了,继续寻找当前节点前驱节点的前驱节点,一直往前检查节点,直到将当前节点设置为未被取消的最后一个节点的后继节点
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//一直往前检查节点,直到将当前节点设置为未被取消的最后一个节点的后继节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
//取消当前节点获取同步状态的操作
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
//一直寻找当前节点前驱节点的前驱节点没有被取消的最后一个节点,并设置为当前节点的前驱节点
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
与独占式获取同步状态相同,但是该方法响应中断,线程被中断时抛出 InterruptedException
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
//
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//将当前节点添加到同步队列
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
//获取当前节点前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点且获取到同步状态,
if (p == head && tryAcquire(arg)) {
//将当前节点设置为头节点,前驱节点置为null,节点线程信息设置null
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
独占式获取同步状态附加中断响应和超时返回失败
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
//当前线程已经被中断就响应中断抛出异常
throw new InterruptedException();
//否则获取同步状态,同步状态获取失败后尝试在超时前再获取
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
共享式获取同步状态,成功就从该方法返回,失败的话当前线程信息及等待信息就被包装为Node节点放入同步队列中,可以有多个线程获取到同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
//获取共享状态失败后,
doAcquireShared(arg);
}
//
private void doAcquireShared(int arg) {
//将当前节点以共享的状态添加到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
//加入同步队列的节点死循环获取同步状态
for (;;) {
//当前节点前驱节点
final Node p = node.predecessor();
//前驱节点为头节点时获取同步状态
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上模版方法中tryAcquireShared需要各个同步组件自己实现,
响应中断
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
等待超时返回
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
独占式释放同步状态,释放状态后,将同步队列中第一个节点包含的线程唤醒
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//唤醒节点获取同步状态
private void unparkSuccessor(Node node) {
//获取待唤醒节点等待状态
int ws = node.waitStatus;
if (ws < 0)
//CAS设置等待状态为初始化
compareAndSetWaitStatus(node, ws, 0);
//获取当前节点后继节点,后继节点null或被取消时
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
//循环从tail开始寻找tail的前驱节点,直到找到最后一个或找到当前节点位置, 未被取消的节点,并将该节点设置为当前节点的后继节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//如果当前节点后继节点不为null,则唤醒线程
if (s != null)
LockSupport.unpark(s.thread);
}
共享式释放同步状态
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}