1. 前言
AbstractQueuedSynchronizer(AQS)基于FIFO等待队列以及CAS操作实现了基础了同步框架,JUC中包括ReentrantLock,ReentrantReadWriteLock,CountDownLatch,Condition都是基于AbstractQueuedSynchronizer提供的基础同步操作来实现的。
2. AbstractQueuedSynchronizer实现原理
2. 1 CLH队列
CLH(代表Craig, Landin, Hagersten三人)队列是一个FIFO的队列,这个队列用来对资源竞争者(不同的线程就是一个竞争者)进行排队。
2.1.1 队列节点Node
AQS实现中队列中一个节点即一个竞争者,用类Node,Node类有如下主要成员:
成员 | 含义 | 取值 |
---|---|---|
waitStatus | 当前节点状态 | SIGNAL=-1 表示当前节点释放资源后需要唤醒他的后继阻塞中的节点; CANCELLED=1当前节点已经取消对资源的竞争; PROPAGATE=-3 用于竞争共享资源竞争,表示当前节点获取资源后,应该唤醒其他排队阻塞中获取该共享资源的竞争者; CONDITION=-2专门用于Condition的实现 |
Node prev,next | 当前节点的前驱和后继节点 | -- |
Thread thread | 当前排队的线程,也就是竞争资源的线程 | -- |
Node nextWaiter | 如果waitStatus=CONDITION则nextWaiter连接到下一个等待在同一个condition上的node;nextWaiter也可以是class NODE的静态成员SHARED表示当前节点竞争共享资源;等于null表示竞争排他性资源 |
1. 节点种类
根据上表中nextWaiter可以看出,实际上node的种类有三种:
- exclusive node(表示当前节点申请排他新资源)
- shared node(表示当前节点申请的是共享资源)
- condition, 当前节点等待条件变量
不同的node种类获取资源(对应入队操作)和释放资源(对应出队操作)都有所不同
2.2.2 入队操作
也就是去竞争资源。
这里只介绍一下exclusive和shared node的入队操作
2.2.2.1 exclusive node入队操作
使用AQS竞争排他性资源时,需要调用如下两个接口之一:
/*
调用acquire获取资源,这是一个阻塞操作直到获取到资源,且不可被中断。
*/
public final void acquire(long arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
和acquire不同的是竞争者等待期间是可以被中断
*/
public final void acquireInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
两者都调用了tryAcquire(), 这个方法需要使用AQS的用户继承并实现它,并且这个调用不应该阻塞,之所以提供tryAcquire操作是因为当前竞争者可能已经获取了资源,那么在tryAcquire可以做一下判断,避免已经获取了资源后还去盲目的进去CLH队列排队。
典型的就是重进入锁ReentrantLock的实现,获取锁的线程可以再次获取锁,那么就可以在tryAcquire中做一下判断。
关于参数arg,可以理解成尝试获取资源数量吧,实际上这个跟AQS的一个重要的成员state有关,当用户使用AQS时,需要设置state这个整数值,关于这个state,应该可以当成是对资源状态的描述吧。比如ReenctrantLock中可以根据state是否等于0判断是否加锁了,以及state的值大小判断重进入锁加了几次锁。
上面代码addWaiter()其实就是不停的使用CAS操作将当前节点加入CLH队列尾部。
2. acquireQueued()
下面是acquireQueued的代码实现:
final boolean acquireQueued(final Node node, long arg) {
boolean failed = true;
try {
// 线程中断标记
boolean interrupted = false;
for (;;) {
// 获得当前节点的前驱
final Node p = node.predecessor();
// 如果前驱节点已经是队列头,那就意味着看起来自己最优先获取到资源,然后就调用tryAcquire尝试获取资源。
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当前节点不是头节点则排队阻塞。
// shouldParkAfterFailedAcquire会将前驱节点的waitStatus标记成SIGNAL表示告诉前驱节点:你释放资源后需要唤醒我,我可能阻塞了
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果排队过程中被中断,记下来,不甩异常
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
以上可以看出acquireQueued其实就是在不停的做如下尝试:
- 判断当前节点是不是最前面了,是则调用tryAcquire尝试获取资源,获取成功就返回,获取失败或者不是最前面节点则阻塞等待,然后到2
- 无限等待然后被唤醒,继续1.
2. doAcquireInterruptibly()
和acquireQueued的过程是一样的,就是等待的过程中被中断了立即甩异常
2.2.2.2 shared node入队操作
和exelusive模式一样都有可中断和不可中断两个,如下:
public final void acquireShared(long arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
public final void acquireSharedInterruptibly(long arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
tryAcquireShared()需要使用AQS使用者继承实现,不过和tryAcquire不一样的是它返回一个整数,小于0表示尝试失败了。
1. doAcquireShared
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
// 和exclusive模式下acquireQueued整体流程是一样的,
// 不同的是在这里,成功获取到竞争资源后,调用的这个方法。
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
关于上面setHeadAndPropagate做了什么,先通过下图描述一下为什么会有这个方法的调用吧.
假设现在排队队列如下:
----- ------ ----- -----
| E1 | ->| S1 | -> | S2 | ->| S3 |
----- ------ ----- ----
节点E1是头节点,exclusively持有资源, S1, S2, S3都是在申请共享访问资源,然后因为E1已经持有了,所以都阻塞了。
然后E1释放资源了,所以他会唤醒阻塞中的S1,S1拿到了资源,但是由于S1是共享模式去拿资源,S2,S3也都是去申请共享资源,所以S1应该去同样唤醒S2,S3一起去分享资源。 这也就是setHeadAndPropagate
干的事情--唤醒S2, S2醒了唤醒S3。
2.2.3 出队操作
也就是释放资源.
2.2.3.1 exclusive node释放资源
释放资源通过如下方法实现:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease和tryAcquire一样,是需要使用者继承并实现的,返回true表示资源完全被释放(也就是需要唤醒后面等待的node),tryRelease同样需要结合资源状态state去判断,且应该是一个非阻塞的调用。
下面unparkSuccessor唤醒后继等待者的实现:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
// 后继节点等于null并不能代表没有后继节点,原因跟入队addWaiter的实现有关.
// 调用addWaiter入队newNode时先用CAS将tail设置成newNode,然后再将之前的tail.next设置成newNode,
// 所以这中间的时间间隔中有tail但是之前的tail还没有完成和newNode连接,也就是之前的tail的后继是null
// 而s.waitStatus >0 则表示这个节点状态为CACELLED取消竞争资源,应该再找它后面的节点
// for循环里从tail往前遍历找到最前面的没有CANCELLED的节点,然后唤醒它。
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒找到的节点对应的线程。
LockSupport.unpark(s.thread);
}
2.2.3.2 shared node释放资源
调用如下方法:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared需要使用AQS的继承并实现它,返回true表示成功释放,意味着需要唤醒后续等待者。
关于doReleaseShared就是唤醒后继所有等待的节点,被唤醒的节点又会在前面介绍的各种doAcquireXXX中醒来去尝试获取资源,获取不到的又会等待。