上一篇文章对ThreadPoolExecutor源码分析中关键内部类Worker继承AbstractQueuedSynchronizer(以下简写AQS)这个抽象类,若对JUC下的源码进行解读, 发现 ThreadPoolExecutor、ReentrantLock、CountDownLatch、Semaphore、FutureTask等都以AQS为基础, 下面我们进行AQS分析.
AQS结构
// 头结点 当前持有锁的节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
// 当前锁的状态 0没有被持有 大于0标示被持有
// 这个值标志这已AQS为基础的的锁是可重入锁
private volatile int state;
// 当前独占锁的线程
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
每个线程被包装成一个 Node 实例,数据结构是链表
static final class Node {
// 标识节点当前在共享模式下
static final Node SHARED = new Node();
// 标识节点当前在独占模式下
static final Node EXCLUSIVE = null;
// ======== 下面的几个int常量是给waitStatus用的 ===========
// 代表此线程取消了争抢这个锁
static final int CANCELLED = 1;
// 其表示当前node的后继节点对应的线程需要被唤醒
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
// 取值为上面的1、-1、-2、-3,或者0
// 这么理解,暂时只需要知道如果这个值 大于0 代表此线程取消了等待,
// 半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
// 前驱节点的引用
volatile Node prev;
// 后继节点的引用
volatile Node next;
// 线程节点
volatile Thread thread;
}
下面我们ReentrantLock抢锁,解锁对AQS进行分析
public class AQSTest {
@Test
public void test() {
Lock lock = new ReentrantLock(true);
lock.lock();
try {
// 执行业务
} finally {
lock.unlock();
}
}
}
ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
abstract static class Sync extends AbstractQueuedSynchronizer {}
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}
抢锁
final void lock() {
acquire(1);
}
public final void acquire(int arg) {
// 尝试获取, 如果获取不到添加进链表
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();// 中断此线程
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取锁的状态
int c = getState();
// 没有线程获取锁
if (c == 0) {
// 没有排队的队列与同一时刻没有被别的线程获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置独占锁的线程
setExclusiveOwnerThread(current);
return true;
}
}
// 若进入else if分支,说明是重入了,需要操作:state=state+1
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 若执行到这里, 没有获取到锁
return false;
}
// 没有获取到锁, 执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg), 先执行addWaiter(Node.EXCLUSIVE)
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) {
// 尾节点不为null会进入这里
// 将新建的节点的上一个节点指向尾节点
node.prev = pred;
// CAS将新建的节点设置成尾节点
if (compareAndSetTail(pred, node)) {
// 设置成功进入这里, 将之前的尾节点的下一个节点指向新的尾节点
pred.next = node;
return node;
}
}
// 尾节点为null, 或者设置尾节点失败会执行下面流程
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// AQS中tail, head初始化值都为null, 对head,tail进行延时初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// 初始化完成或者上一步设置尾节点失败会进入这里
// 将新建的节点的上一个节点指向尾节点, CAS设置尾节点
// 设置成功,将之前的尾节点的下一个节点指向新的尾节点,返回, 否则一直循环
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
// 现在回到acquireQueued(addWaiter(Node.EXCLUSIVE), arg))方法, 经过上一步处理, node为尾节点
final boolean acquireQueued(final Node node, int arg) {
// 失败的标志
boolean failed = true;
try {
// 中断的标志
boolean interrupted = false;
for (;;) {
// 获取尾节点的上一个节点
final Node p = node.predecessor()
// 这里我们说一下,为什么可以去试试:
// 首先,它是队头,这个是第一个条件,其次,当前的head有可能是刚刚初始化的node,
// enq(node) 方法里面有提到,head是延时初始化的,而且new Node()的时候没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试,
if (p == head && tryAcquire(arg)) {
// 上一个节点是头节点而且尝试获取成功进入这里
// 这几步的处理是, 将尾节点设置为头结点, 将尾节点的上一个节点的下一个节点设置为null,
// 将失败标示设置为false, 返回中断标志为false,不需要中断
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 尾节点的上一个节点不是头节点来到这个判断
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// tryAcquire() 方法抛异常failed会为true
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 说明前驱节点状态正常,当前线程需要挂起,直接可以返回true
return true;
if (ws > 0) {
// 前驱节点 waitStatus大于0 ,之前说过,大于0 说明前驱节点取消了排队。
// 这里需要知道这点:进入阻塞队列排队的线程会被挂起,而唤醒的操作是由前驱节点完成的。
// 所以下面这块代码说的是将当前节点的prev指向waitStatus<=0的节点,
// 简单说,为了找到前驱节点, 等着先驱节点唤醒
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// ws 为0, -2, -3会进入这里, 设置为-1, 然后返回false
// 因为上层for循环, 所以再次进来此方法,此时会从第一个分支返回 true
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// shouldParkAfterFailedAcquire(p, node)返回true会执行这个方法
// 这里用了LockSupport.park(this)来挂起线程,等待被唤醒
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
解锁
// 开始解锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
// 先看tryRelease(arg)
if (tryRelease(arg)) {
// 回到这里, 如果没有嵌套锁, 执行这里
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // c==0 没有嵌套锁, 设置free为true
free = true;
setExclusiveOwnerThread(null);
}
// 修改state值, 如果没有嵌套锁,返回true, 否则返回false
setState(c);
return free;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0); // ws小于0设置为0
Node s = node.next;
if (s == null || s.waitStatus > 0) {
// head.next不为null, head.next取消等待会进来这里
s = null;
// 从tail开始往前查找, 若有t.waitSatus<=0, 赋值给s
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒线程
LockSupport.unpark(s.thread);
}
唤醒线程以后,被唤醒的线程将从以下代码中继续往前走:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node的pred是head了
总结
1. 使用volatile修饰 state, 保证多线程的可用性,使用CAS来为state设置值, 它为 0 的时候代表没有线程占有锁,可以去争抢这个锁, 用 CAS 将 state 设为 1,如果 CAS 成功,说明抢到了锁,这样其他线程就抢不到了,如果锁重入的话,state进行 +1 就可以,解锁就是减 1,直到 state 又变为 0,代表释放锁,所以 lock() 和 unlock() 必须要配对啊。然后唤醒等待队列中的第下一个线程。
- 采用了 LockSupport.park与unpark 来挂起与唤醒线程, 操作更精准,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程),增加了灵活性.