简介
AQS(AbstractQueuedSynchronizer)是Java并发工具基础,要掌握Java并发工具类首先得熟悉AQS,通过对AQS的学习,我们将进一部理解共享锁、独占锁、公平锁、非公平锁、重入锁等常见锁。AQS作为一个模板方法,它定义并发工具类处理流程,接下来让我们来了解下AQS的处理流程。
AQS接口
AQS既然是基于模板方法实现,那它即提供了模板方法同时也提供可重新的方法。
可重写的方法
方法名称 | 描述 |
---|---|
protected boolean tryAcquire(int arg) | 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态 |
protected boolean tryRelease(int arg) | 独占式释放同步状态,等待后去同步状态的线程将有机会获取同步状态 |
protected boolean tryAcquireShare(int arg) | 共享式获取同步状态,返回大于等于0的值则成功,反之,获取失败 |
protected boolean tryReleaseShare(int arg) | 共享式释放同步状态 |
protected boolean isHeldExclusively() | 当前同步器是否被当前线程独占 |
模板方法
方法名称 | 描述 |
---|---|
void acquire(int arg) | 独占式获取同步状态,当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待,将会调用tryAcquire(int arg)方法 |
void acquireInterruptibly(int arg) | 与acquire(int arg),但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则该线程会抛出InterrupteredException并返回 |
protected boolean tryAcquireNanos(int arg,long nanos) | 在acquireInterruptibly基础上新增超时时间,如果当前线程在超时时间内没有获取到同步状态,返回false,如果获取到返回true |
protected boolean acquireShared(int arg) | 共享式获取同步状态,与独占的区别是在同一时刻可以友有多个线程获取同步状态 |
protected boolean tryAcquireInterruptibly(int arg) | 与acquireShared(int arg)相同,该方法响应中断 |
protected boolean tryAcquireSharedNanos(int arg,long nanos) | 在tryAcquireInterruptibly(int arg)方法上新增超时限制 |
boolean release(int arg) | 独占式的释放同步状态,该方法会在释放同步状态后 ,将同步队列中第一个节点包含的线程唤醒 |
boolean releaseShared(int arg) | 同步式的释放同步状态 |
Collection<Tread> getQueuedThreads() | 获取等待在同步队列上的线程集合 |
AQS基本结构
AQS依赖内部的同步队列来完成同步状态的管理,同步队列中的节点(Node)用来保存"获取同步状态失败的线程"引用、等待状态以及前驱和后继节点。节点的属性类型与名称及描述:
属性类型和名称 | 描述 |
---|---|
int waitStatus | 等待状态:(1)CANCELLED,值为1,由于在同步队列中的线程等待超时或者被中断,需要从同步队列中取消等待 (2)SIGNAL,值为-1,后继节点的线程处于等待状态,而当前节点的线程如果或者释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行 (3)CONDITION,值是-2,节点在等待队列中,节点线程在Condition上,其他线程对Condition调用了signal()方法后,该节点将会从等待队列中转移到同步队列中,加入到该同步状态的获取中 (4)PROPAGATE,值为-3,表示下一次共享式同步状态获取将会无条件地传播下去 (5)INITIAL,值为0,初始状态 |
Node prev | 前驱节点 |
Node next | 后继节点 |
Node nextWaiter | 等待队列的后继节点。如果当前节点是共享的,那么这个字段将是一个SHARE常量,也就是说节点类型和等待队列中的后继节点共用同一个字段 |
Thread thread | 获取同步状态的线程 |
根据Node基本属性可得下图:AQS基于一个双向链表实现的同步队列,同时有个基于单向链表实现的条件队列,注意这个条件队列不是必须的,它也可以有多个。
源码走读
独占式获取锁
首先我们看一个流程图,熟悉其大致流程
源码
//acquire(int arg)
public final void acquire(int arg) {
//首先调用tryAcquire(arg)方法,这个具体在子类中实现,通过这个方法可以实现公平锁和不公平锁,如果这个方法获取锁成功则直接返回,不再执行后面的代码
//获取失败后便执行addWaiter(Node.EXCLUSIVE), arg)方法,将其添加到队列中,同时标记为独占锁
//acquireQueued(final Node node, int arg)方法中首先尝试获取锁,如果获取不到则阻塞线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//addWaiter(Node mode)
private Node addWaiter(Node mode) {
//创建当前线程的Node
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
//如果tail节点不为null
if (pred != null) {
//当前线程节点前驱指向tail
node.prev = pred;
if (compareAndSetTail(pred, node)) {//CAS 当前线程Node设置为tail成功
//上个tail的后继节点指向现在的tail
pred.next = node;
//返回当前线程Node
return node;
}
}
enq(node);
return node;
}
//enq(node);
private Node enq(final Node node) {
for (;;) {//自旋
Node t = tail;
if (t == null) { // 如果tail节点为null
//初始化head,同时tail指向head
//从后面的代码可以看出头结点是傀儡节点,
//每次通过判读线程节点的前驱是否是头结点来决定是否获取锁
if (compareAndSetHead(new Node()))
tail = head;
} else {//将node插入到队尾
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//acquireQueued(final Node node, int arg)
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {自旋
//获取当前线程节点的前驱继节点
final Node p = node.predecessor();
//如果前驱是头节点,尝试获取锁
if (p == head && tryAcquire(arg)) {//获取成功
//设置当前节点是头节点
setHead(node);
p.next = null;
failed = false;
return interrupted;
}
//获取失败阻塞其他线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//获取失败
if (failed)
//取消正在进行的尝试获取
cancelAcquire(node);
}
}
// shouldParkAfterFailedAcquire(Node pred, Node node)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//如果前驱节点状态是Node.SIGNAL直接返回,
//Node.SIGNAL表示前驱获取锁后会唤醒后继节点的线程
return true;
if (ws > 0) {
//pred.waitStatus表示Node.CANCELLED,下面的操作跳过状态为CANCELLED的所有节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//waitStatus 等于0(初始化)或PROPAGATE,
//说明线程还没有park,会先重试确定无法acquire到再park,
//这里主要是共享锁时传播共享状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//阻塞当前线程
LockSupport.park(this);
//返回线程状态同时清除标记位,在acquire(int arg)中会再次中断状态的中断线程
return Thread.interrupted();
}
//cancelAcquire(Node node)
private void cancelAcquire(Node node) {
//如果节点为null,直接返回
if (node == null)
return;
node.thread = null;
//跳过当前节点的所有前驱是CANCELLED状态的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
Node predNext = pred.next;
//标记节点状态为Node.CANCELLED
node.waitStatus = Node.CANCELLED;
//如果当期节点是尾节点则直接删除
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
int ws;
//下面的操作是如果当前线程的节点的前驱不能满足唤醒后继的条件,则删除当前节点,
//满足则唤醒当前节点的后继节点
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
独占式释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {//tryRelease(arg)方法子类重新
Node h = head;
if (h != null && h.waitStatus != 0) //head存在
//唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}
通过上面连个方法我们可以了解独占锁获取释放锁的流程,除去独占锁外还有共享锁、带有超时时间的独占式和共享锁。共享锁的获取锁后唤醒后继节点传播共享,具体大家可以查看源码。
自己简单的理解:其实具体的流程很简单,首先创建一个节点,head和tail都指向这个节点,然后各个线程去获取锁,其实就是判断它的前驱节点是不是头结点同时还有tryAcquire()是否返回true。如果获取成功设置当前节点为头节点,获取失败则将其插入到同步队列,设置状态为SIGNAL同时阻塞当前线程,SIGNAL可以保证前驱节点释放后唤醒当前节点,当前节点可以放心阻塞。然后每个出去头结点的节点开始自旋尝试获取锁,直到获取到锁返回。