AQS 全称 AbstractQueuedSynchronizer 中文翻译同步器 同步器是java中各种锁实现的基础,非常重用的 。比如我们线程池里面的Worker 类 我们的重入锁ReentrantLock,信号量Semaphore,CountDownLatch 等
- Java-AQS同步器 源码解读<一>独占锁加锁
- Java-AQS同步器 源码解读<二>独占锁解锁
- Java-AQS同步器 源码解读<三>共享锁
- Java-AQS同步器 源码解读<四>-条件队列上
- Java-AQS同步器 源码解读<五>-条件队列下
我们一步步的来看 是怎么实现的
首先 我们先看下 这个类文件
abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer
implements java.io.Serializable
从文件中我们看出 AbstractQueuedSynchronizer是个抽象类继承了AbstractOwnableSynchronizer抽象类
那我们就去看看AbstractOwnableSynchronizer有什么
AbstractOwnableSynchronizer
public abstract class AbstractOwnableSynchronizer
implements java.io.Serializable {
/** Use serial ID even though all fields transient. */
private static final long serialVersionUID = 3737899427754241961L;
/**
* 构造器
*/
protected AbstractOwnableSynchronizer() { }
/**
* The current owner of exclusive mode synchronization.
* 从上面的英文翻译 和下面字段的命名 我们能知道 这个是一个线程对象
* 是独占锁模式下 同步器的当前拥有者线程
*/
private transient Thread exclusiveOwnerThread;
/**
* 设置 独占锁 同步器的拥着线程 没什么好说的
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* 获取当前 同步器的拥着线程 也没什么
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
从以上的代码中 我们可以分析得到 AQS 基础这个类 是为了 得到独占锁模式拥有锁的线程 方便监控
AbstractQueuedSynchronizer
那看完 上面的代码 那我们进入AbstractQueuedSynchronizer 里面看下 这个类里面又有什么
从源码中 我们可以将AQS的内部属性分为4类
- 同步器的简单属性,
- 同步队列属性,
- 条件队列属性,
- 公用的Node类,和一个ConditionObject类
一个一个来看吧
先看Node这个类 这个类很重要 它即是同步队列的节点 也是条件队列的节点
AQS 内部Node
/**
* 等待队列的节点类,通过Node 可以实现2个队列 一个是线程同步队列(双向队列) 一个是条件线程队列(单向队列)
*/
static final class Node {
/**
* 标识这个节点用于共享模式
* Marker to indicate a node is waiting in shared mode
*/
static final AbstractQueuedSynchronizer.Node SHARED = new AbstractQueuedSynchronizer.Node();
/**
* 标识这个节点用于 独占模式(排它 反正一个意思)
* Marker to indicate a node is waiting in exclusive mode
*/
static final AbstractQueuedSynchronizer.Node EXCLUSIVE = null;
/** 下面是 waitStatus 的几个常量值 */
/**
* 表明等待线程已经取消
* waitStatus value to indicate thread has cancelled
*/
static final int CANCELLED = 1;
/**
* 表述如果当前节点的前一个节点状态是 SIGNAL 那么就可以阻塞当前自己的线程 不用去争抢资源了 没用 不然会一直尝试去获取资源
* waitStatus value to indicate successor's thread needs unparking
*/
static final int SIGNAL = -1;
/**
* 线程在条件队列中等待
* waitStatus value to indicate thread is waiting on condition
*/
static final int CONDITION = -2;
/**
* 共享模式下 无条件传播 该状态的进程处于可运行状态
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* 当前node 状态
*/
volatile int waitStatus;
/**
* 独占模式下 节点指向的上一个节点
*/
volatile AbstractQueuedSynchronizer.Node prev;
/**
* 独占模式下 节点指向的下一个节点
*/
volatile AbstractQueuedSynchronizer.Node next;
/**
* 入队时的线程
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;
/**
* condition 条件队列中的后继节点
*/
AbstractQueuedSynchronizer.Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* 返回当前节点的前驱节点
*
* @return the predecessor of this node
*/
final AbstractQueuedSynchronizer.Node predecessor() throws NullPointerException {
AbstractQueuedSynchronizer.Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
/**
* 用于addWaiter
*/
Node(Thread thread, AbstractQueuedSynchronizer.Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
/**
* 用于Condition
*/
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
AQS 实例属性
看完了 Node 节点 我看看下AQS 本身的实例属性 有哪些
/**
* 同步队列的头 初始化 或者setHead方法可修改
*/
private transient volatile Node head;
/**
* 同步队列的尾节点 通过enq方法去修改尾节点 后面会分析到
*/
private transient volatile Node tail;
/**
* 同步器的状态 这里 可以去 去看下 Semaphore 这个类 我们初始化的时候 实际
上就是调用的setState 方法 给同步器的状态赋值的 每个线程获取的时候都会去消 耗这个值 达到控制并发效果
*/
private volatile int state;
/**
* 获取同步器状态
*/
protected final int getState() {
return state;
}
/**
* 同步器状态赋值
*/
protected final void setState(int newState) {
state = newState;
}
看完了 上面的2中模式 解锁下独占和共享什么意思吧
独占的意思的同一时刻 只能有一个线程可以获得锁 其余的都排队阻塞,释放资源的时候也只有线程可以释放 比如重入锁ReentrantLock
共享就是允许多个线程获得同一个锁,并且可以设置获取锁的线程数量 比如我 用Semaphore初始状态设置为4 每个线程获取2个 那同时可以有2个线程获取锁
AQS 独占锁
看完了上面的一些接受 下面我们来跟着代码去分析下独占锁 是怎么实现的
独占锁 加锁
-
acquire方法
为什么首先 我们要看这个方法呢,因为这是锁获取资源的入口,那我们就看下代码,用ReentrantLock来举例看看
我们使用ReentrantLock 开始加锁
public void lock() {
sync.lock();
}
我们看下sync.lock 是什么
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
......
}
可以看到sync的lock 方法也是个抽象方法 应该是Sync的子类去实现的
那我们就看下ReentrantLock的构造方法
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
其中得到ReentrantLock 默认构造函数中 sync的实现是NonfairSync公平锁,那我们继续看NonfairSync这个类
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
如果 我们找到了Sync的抽象方法lock 在子类中的实现,其实就是执行了acquire 方法
acquire方法在那呢 ?
FairSync 继承了内部抽象类 Sync类 而从上面的截取的代码中
我们看到Sync类其实就继承了AbstractQueuedSynchronizer 类 这样最终我们会定位到AQS类中acquire方法
那下面我们就来看下acquire 这个方法
/*
*以独占锁的方式 去获取资源 响应中断
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
- tryAcquire(arg) 方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
首先我们看到这个AQS里面 虽然写了tryAcquire方法 但是 抛出一个异常,为什么要这么做呢 其实JDK 作者 是为了方便开发者 我什么这么说呢 因为AQS是个抽象类 这个方法其实可以写成抽象方法 让子类来实现 但是这带来一个问题 因为AQS是很多锁实现的基础 如果写成抽象方法 子类就必须实现 但是可能有些锁 并不需要实现这个方法 没用。所以AQS里面这么去写,是OK的,因为要使用的这个功能的锁 必须去实现重写这个方法,不让就报错 ,不要使用到这个方法的子类 不用管。这样的思想 我们可以去参考 在以后得开发中!
那我们去看下ReentrantLock 类中是否是这样做的
看下代码:
从上面的截图中 我们可以看到 确实是实现了的 ReentrantLock 有一个抽象类Sync 2个静态类 且继承抽象类 Sync 继承了我们AQS 并且实现释放资源的方法,加锁去获取资源的方法tryAcquire 分别由2个子类实现就 一个实现FairSync 公平锁 一个实现NonfairSync非公平锁
tryAcquire 方法是尝试以独占的方式去获取资源
具体各个锁里面的方法实现 自己可以去看下 我们还是看下吧ReentrantLock
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面的代码是非公平锁 实现的TryAcquire方法的版本 其实也很好读懂就是判断我们之前提到过的AQS里面的state ,上面我提到过ReentrantLock 默认是非公平锁,初始化的state值是0,这个一开始用getState 去获取方法state当前的值,如果值等于0,说明锁没有其他线程占用,可以获取锁,使用CAS原子操作State 新增1,如果CAS 成功就设置当前锁的所属的线程 这个一开始我描述过AbstractOwnableSynchronizer里面的功能 ,如果获取的时候state不为0,就判断下当前的线程是否等于锁所属的线程,因为是独占的 ,如果相等就把State 再次加1,这个判断也就是说 我们调了同个线程多次lock了,这个时非公平锁的代码,公平锁的实现和上面的差不多 就是多了一个队列的判断,要从阻塞队列里面的头部节点开始一个个地排队获取,而非公平锁就没有,只要释放了 看谁先来 就谁先获取到锁,看多不公平啊,哈哈!
是否满足,如果满足就返回True说明获取到了锁,不满足就返回false acquire后面的方法会继续执行
好的 我们继续回到acquire方法中 如果tryacquire方法返回false 也就是没有获取到锁,执行下一个方法acquireQueued(addWaiter(Node.EXCLUSIVE), arg) 那进入这个方法的时候 又先执行addWaiter 后执行acquireQueued
- addWaiter(Node.EXCLUSIVE)方法
/**
* 这个方法就是加入当前节点到队列的尾节点
*/
private Node addWaiter(Node mode) {
/*构建node 节点 还记得 我们上面写的方法 用于addWaiter么 可以上面去看下
这里的node 几点里面的nextWaiter 这个字段被复用了,在排他锁模式下用于记录锁类型*/
Node node = new Node(Thread.currentThread(), mode);
/*Try the fast path of enq; backup to full enq on failure
从上面的英文注解上面 我们能得到下面的方法是先尝试快速加入队尾,
成功则返回当前节点,如果失败就进入的 enq方法 最后返回节点*/
Node pred = tail;
if (pred != null) {
//当前节点的头节点指向队列的尾部节点,相当于加入到最后了
node.prev = pred;
//CAS 赋值队列尾部节点
if (compareAndSetTail(pred, node)) {
// 如果成功将尾部节点的下个节点指向我们新加的节点,因为是双向链表,所以要互相指一下的
pred.next = node;
return node;
}
}
// 如果 尾部节点是null 或者CAS赋值尾部节点失败,则进入enq 方法,其实就是采用了自旋的方法加入 走我们进去看下怎么做的
enq(node);
return node;
}
-enq 方法
老规矩 上源码
/*
* 使用自旋的方法 加入尾部节点
*/
private Node enq(final Node node) {
//无条件的循环 自旋 好好理解下这个方法 其实循环只有一个出口 就是成功加入尾部节点返回
for (;;) {
Node t = tail;
if (t == null) { // 这边尾部节点是null 说明整个队列都是空的 必须初始化一下
/*CAS 赋值一个头部节点 如果成功 就头节点赋值给尾节点,
也很好理解初始化么 没有任何节点,当然头就是尾,尾就是头,不理解的可以看看 链表结构如果CAS失败,
就不做什么 反正又出不来这个循环,就继续循坏吧,总会成功的,不要觉得这边会死循环,因为就算CAS不成功,
可能是有别的线程修改了值导致的,下次循环的时候就走不到这边了 t也就是尾节点就不为null了,走到下面的判断*/
if (compareAndSetHead(new Node()))
tail = head;
} else {
//走呀走 终于到这边了,看看这部代码 是不是很熟悉,好好想想,往上面看看,其实addWaiter的方法是一样的,
//只不过这边做了自旋操作,保证了一定加入队尾成功
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
ok 看到这边 小伙伴们 不知道有没有晕哈,不管怎么样,革命尚未成功,继续哈,好的 我们回到acquire中
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
上面我们看了tryAcquire方法 和addWaiter方法,那最后我们看下acquireQueued方法是什么 acquireQueued方法的参数一个就是addWaiter 返回加入尾部的节点,还有一个arg是我们各个实现锁方法传进来的
-acquireQueued方法
/**
* 当前节点加入的队尾后,尝试自旋的方式去获取资源,
* 这个方法还有个作用就是自己的前置节点变为SIGNAL 这样他自己就能阻塞了
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//表述是否拿到资源失败 也就是说false表述拿到资源了
try {
//标记 是否执行中断 为什么有这个是因为此方法没有抛出异常 如果内部发生异常 要通知上一级调用,所有有此标识
boolean interrupted = false;
for (;;) {//这边又是自旋的方法 方法出口只有一个 就是获取到资源
/*
P 是当前节点node的前一个节点 这边判断的意思 是如果当前节点的前一个节点就是头部节点,那就尝试去获取锁资源,
如果能获取到 说明头节点已经释放了锁,那就迅速让自己成为队列的头部节点,让之前的头部节点next指向null
是为了方便GC的回收,然后赋值failed 为true 表示已经获取到资源,并且返回中断状态
*/
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果P不是head节点或者是head节点但是尝试获取资源失败,说明锁资源还没被释放,那就执行到这边
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
那我们在进入shouldParkAfterFailedAcquire 方法中看看 是什么
-shouldParkAfterFailedAcquire 方法
/**
* 更新prev节点状态 ,并根据prev节点状态判断是否自己当前线程需要阻塞
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;// node的prev节点的状态
if (ws == Node.SIGNAL)
// 如果SIGNAL 就返回true 就会执行到parkAndCheckInterrupt方法里面
return true;
if (ws > 0) {
/*
* 如果ws 大于0 这里是只能为1,如果是1说明线程已经取消,相当于无效节点
* 者说明 当前node 节点加入到了一个无效节点后面,那这个必须处理一下
node.prev = pred = pred.prev
* 这个操作 我们拆解下来看,下看 pred = pred.prev这个的意思是把prev节点的prev*节点 赋值给prev节点
*后面再看 node.prev = pred 联合 刚才的赋值 这个的意思就是把prev节点的prev节点和node关联起来,
*原因我上面也说了因为pre节点线程取消了,所以node节点不能指向pre节点 只能一个一个的往前找,
*找到waitStatus 小于或者等于0的结束循环最后再把找到的pre节点执行node节点 ,这样就跳过了所有无效的节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
*这边的操作就是把pred 节点的状态设置为SIGNAL,这样返回false 这样可以返回到上面的自旋中
*再次执行一次,如果还是获取不到锁,那么又回到当前的shouldParkAfterFailedAcquire方法 执行到方法最上面的判断
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
-parkAndCheckInterrupt方法
回到刚才 如果shouldParkAfterFailedAcquire方法返回true 那么就会执行parkAndCheckInterrupt方法
这个方法很简单
/**
* Convenience method to park and then check if interrupted
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
/*
* 这边线程阻塞,只有2中方式唤醒当前线程,
* 一种方式就是 当前线程发生中断
* 另外一个情况就是 资源释放的时候会调unpark 方法 唤醒当前的线程 这个会在下一篇会讲到
*/
LockSupport.park(this);
return Thread.interrupted();
}
没什么好说的 就是阻塞当前线程,并且检查当前线程有没有过中断 返回 正常的话 只有release释放资源后 会唤醒线程 但是线程发生中断也是可以的 所以这边要坚持下 是那种情况 唤醒当前线程的方式 并返回
最后的最后我们再次回到刚才的方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
最后这边 currentThread做出线程中断 如果上面都返回true的话
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
不知道 大家有没有了解acquire 怎么去实现获取资源的了
如果还是不知道 AQS里面的独占锁怎么加锁的 最后做下简短的总结:
1.首先第一步 用tryAcquire方法尝试去获取资源 此方法由各个继承类重写,如果成功就直接返回 获取锁成功,如果失败 则进入步骤2
2.进入addWaiter方法 这个方法主要做的就是把当前的线程包装成node节点放入阻塞队列的队尾,里面有enq方法里面有自旋的操作
3.然后进入acquireQueued方法,入参一是addWaiter方法返回的no的节点,入参二就是acquire 方法的参数arg,因为里面还要调用tryAcquire方法,所以此参数要传的,这个方法主要做的就是 尝试自旋去获取资源,但是尝试获取资源也是条件的 就是当前node节点的prev节点是head,这样就避免了无脑去循环去获取资源,这个方法里面还做了一个事情 就是判断下 如果当前node 节点的prev节点 waitStatus 如果是SIGNAL 那就直接执行下面的parkAndCheckInterrupt方法去阻塞当前线程,为什么会这么写呢,因为如果当前节点状态是SIGNAL 说明自己已经获取过锁,并且没获取到,所以后面的节点 就不要再去获取了 直接阻塞吧!
4.最后 如果阻塞的线程 有中断,那会执行到最后一步selfInterrupt中,就是做出线程中断,好让当前的线程 后面的方法 知道这个中断,并做出回应,如果不清楚这个的话 就要好好去了解下线程的interrupt 的作用 和三个常用的方法
好的简单的总结完毕,如果还是不了解 自己一遍看着我的解锁一个看着源码,自己理解下,我相信一定会有收获的!加油!