AQS——AbstractQueuedSynchronizer 同步框架(基于JDK14)
1、结构说明
AbstractQueuedSynchronizer 继承 AbstractOwnableSynchronizer,含有五个内部类,其中比较重要的是Node,ConditionNode及ConditionObject。下面是具体类图(由于AbstractQueuedSynchronizer 方法太多,此处已屏蔽大多数方法):
AbstractOwnableSynchronizer:
AbstractQueuedSynchronizer 的父类,主要是设置当前独占锁的线程:
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* Empty constructor for use by subclasses.
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
*/
private transient Thread exclusiveOwnerThread;
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
* 设置独占锁
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
* 获取独占锁
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
}
Node:节点类,AQS队列的节点,包含prev,next,由此可以看出,AQS队列是个基于链表实现的双向队列。waiter为在该节点的线程,既等待获取锁的线程。status用来标识当前节点线程的状态,有以下值:
表示节点的状态。其中包含的状态有(此处有待):
- CANCELLED,值小于0,表示当前的线程被取消;
- COND,值为 2,表示当前节点在等待condition,也就是在condition队列中(此时Node的实际值是3);
- WAITING,值为1,表示当前节点在sync队列中,等待着获取锁;
ExclusiveNode:独占锁节点,继承Node,无扩展;
SharedNode:共享锁节点,继承Node,无扩展;
ConditionNode:继承Node,与Node相比,多了nextWaiter属性(),为Condition队列的节点;
// Node status bits, also used as argument and return values
static final int WAITING = 1; // must be 1
static final int CANCELLED = 0x80000000; // must be negative
static final int COND = 2; // in a condition wait
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // link to next waiting node
/**
* Allows Conditions to be used in ForkJoinPools without
* risking fixed pool exhaustion. This is usable only for
* untimed Condition waits, not timed versions.
*/
public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}
public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}
ConditionObject:实现了Condition的接口,提供操作Condition队列方法,主要包含Condition类型的firstWaiter及lastWaiter属性(队列头及队列尾);这个类是为了让子类支持独占模式的。await()、sign()
方法就是让线程阻塞、加入队列、唤醒线程。AQS框架下基本各种独占的加锁,解锁等操作到最后都是基于这个类实现的。
该类是提供给子类去使用的,在Reentrantlock有相关的使用。有人可能觉得为什么实现这个内部类,又不用,而是给子类去用,那为什么不放到子类去呢?其实答案,很简单,抽象加模板模式。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient ConditionNode firstWaiter;
/** Last node of condition queue. */
private transient ConditionNode lastWaiter;
...
}
从以上的结构可以看出,AQS中包含两个队列,一个是由Node组成的双向队列,也就是同步队列,另一个则是由ConditionNode组成的单向队列,即Condition队列。Condition队列由ConditionObject操作,ConditionObject实现Condition接口。
两个队列的关系如下图所示(此图来自网络):
2、同步队列
AQS是一个基础的同步框架,只是定义了部分基础的方法,具体实现部分交由子类去实现,以便扩展不同的功能。
获取锁的流程如下:
可以看出来,在线程未暂停之前,每进行一次循环,都会调用tryAcquire/tryAcquireShared方法。
加入同步队列(同时,离队也在该方法中实现):
//子类进入同步队列时,调用此方法
public final void acquire(int arg) {
//tryAcquire方法交由子类去实现
if (!tryAcquire(arg))
acquire(null, arg, false, false, false, 0L);
}
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
//first表示当前node是否是head的下一个节点,在出队时需要
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued
/*
* Repeatedly:
* Check if node now first
* if so, ensure head stable, else ensure valid predecessor
* if node is first or not yet enqueued, try acquiring
* else if node not yet created, create it
* else if not yet enqueued, try once to enqueue
* else if woken from park, retry (up to postSpins times)
* else if WAITING status not set, set and retry
* else park and clear WAITING status, and check cancellation
*/
//第一次调用时,此时的node必定为null(conditionNode获取锁时,此时node不为null)
//当初始化node之后,node的prev、next等都是null
for (;;) {
//first == false 且 node不为null,node的前置节点不为null,node的前置节点不是head节点
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) {
//node 的status < 0
if (pred.status < 0) {
//清除已取消的节点
//这个方法暂时没看懂
cleanQueue(); // predecessor cancelled
continue;
} else if (pred.prev == null) {
//进入此步的情景:同步队列只有一个节点(即head的prev才会等于null),但是if条件中
//限制了这种情景
//这步没看懂
Thread.onSpinWait(); // ensure serialization
continue;
}
}
if (first || pred == null) {
boolean acquired;
try {
//此处,调用子类的 tryAcquire/tryAcquireShared 的实现。如果成功,则获取锁,不进入同步队列(cas修改status变量)
//相比JDK1.8,此处是一个小优化,当线程在进入同步队列期间,线程还没暂停之前,在执行方法进入同步方法时,不时直接去获取锁,
//在并发较低的场景下,省去了进入队列、暂停线程的操作;但是在并发较高的情况下,tryAcquire基本是失败的,多了tryAcquire的消耗。
//线程被唤醒之后,也会通过此处获取锁,走出循环
//共享锁
if (shared)
acquired = (tryAcquireShared(arg) >= 0);
else
//独占锁
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
//获取锁成功
if (acquired) {
if (first) {
//离队
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared)
signalNextIfShared(node);
if (interrupted)
current.interrupt();
}
return 1;
}
}
//根据是否是独占锁,创建对应的节点
if (node == null) { // allocate; retry before enqueue
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { // try to enqueue
//初始化节点
node.waiter = current;
Node t = tail;
//尾节点
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null)
//初始化头尾节点(head == tail)
tryInitializeHead();
//cas修改尾节点和新节点(即移动tail节点的位置)
//此处是节点入队列的关键操作
else if (!casTail(t, node))
node.setPrevRelaxed(null); // back out
else
t.next = node;
} else if (first && spins != 0) {
//非公平锁时会进入该步骤
//即线程被唤醒,却没有获取到锁
--spins; // reduce unfairness on rewaits
//jdk9新方法,优化自旋 改善自旋等待循环中的响应时间
Thread.onSpinWait();
} else if (node.status == 0) {
//初始化status
node.status = WAITING; // enable signal and recheck
} else {
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed)
//暂停线程
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L)
LockSupport.parkNanos(this, nanos);
else
break;
//被唤醒,此时清除状态
node.clearStatus();
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
return cancelAcquire(node, interrupted, interruptible);
}
唤醒线程,通知线程离队:
public final boolean release(int arg) {
//调用子类方法
if (tryRelease(arg)) {
//唤醒下一个节点
signalNext(head);
return true;
}
return false;
}
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
//设置等待状态(此时该节点的线程将要被唤醒)
s.getAndUnsetStatus(WAITING);
//唤醒线程
LockSupport.unpark(s.waiter);
}
}
3、Condition队列
Condition队列是独立与同步队列的队列,一个同步队列可以对应多个Condition队列。
入队:
/**
* Implements interruptible condition wait.
* <ol>
* <li>If current thread is interrupted, throw InterruptedException.
* <li>Save lock state returned by {@link #getState}.
* <li>Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li>Block until signalled or interrupted.
* <li>Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li>If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//构造ConditionNode
ConditionNode node = new ConditionNode();
//填充conditionNode,并且condition队列入队,同步队列离队
int savedState = enableWait(node);
//暂存线程 jdk14新方法
LockSupport.setCurrentBlocker(this); // for back-compatibility
boolean interrupted = false, cancelled = false;
//当node不在condition队列时,当线程被唤醒时,node已不在condition队列,而是在同步队列
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
//如果线程被中断并且当前状态值为3
//COND 按位非得 mask = -(COND+1),status = status & mask,
//mask == -3,以status的取值,只有当status == 3时,才退出
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
//挂起线程,调用 ConditionNode 的block方法
// 唤醒条件:
// status <= 1 || Thread.currentThread().isInterrupted()
ForkJoinPool.managedBlock(node);
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null);
node.clearStatus();
//重新竞争锁
acquire(node, savedState, false, false, false, 0L);
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}
/**
* Adds node to condition list and releases lock.
*
* @param node the node
* @return savedState to reacquire after wait
*/
private int enableWait(ConditionNode node) {
//是否持有独占锁
if (isHeldExclusively()) {
//设置node的线程
node.waiter = Thread.currentThread();
//coonditionNode的status 为2|1 = 3
node.setStatusRelaxed(COND | WAITING);
ConditionNode last = lastWaiter;
//进入condition队列
if (last == null)
firstWaiter = node;
else
last.nextWaiter = node;
lastWaiter = node;
//获取status
int savedState = getState();
//释放锁,注意,进入condition队列将释放锁,且离开同步队列
if (release(savedState))
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}
出队:
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
public final void signal() {
ConditionNode first = firstWaiter;
//如果当前线程没有独占锁的话,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
if (first != null)
//condition队列还有节点
doSignal(first, false);
}
/**
* Removes and transfers one or all waiters to sync queue.
*/
private void doSignal(ConditionNode first, boolean all) {
while (first != null) {
ConditionNode next = first.nextWaiter;
if ((firstWaiter = next) == null)
lastWaiter = null;
//此处判断status是否等于3,即是否处于等待中
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
//重新进入同步队列队尾
enqueue(first);
if (!all)
break;
}
first = next;
}
}
/**
* Enqueues the node unless null. (Currently used only for
* ConditionNodes; other cases are interleaved with acquires.)
*/
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) {
t.next = node;
if (t.status < 0) // wake up to clean link
//唤醒线程
LockSupport.unpark(node.waiter);
break;
}
}
}
}