Lock接口
public interface Lock {
// 阻塞式获取锁。如果不能立刻获取,则加入到FIFO队列中等待被唤醒。
void lock();
// 如果线程在FIFO中AQS挂起,或者在方法执行前线程中断,则直接抛异常。
void lockInterruptibly() throws InterruptedException;
// 不加入AQS的FIFO队列, 只执行一次, 如果当前锁被其他线程占用,则获取失败。
boolean tryLock();
// 在指定时间内获取锁,且在指定时间内线程未中断,则成功获取锁。
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
void unlock();
//返回绑定到此锁实例的新条件实例。
Condition newCondition();
}
AbstractQueuedSynchronizer 抽象队列同步器
Java并发之AQS源码分析(一) ps:感谢作者, 写的真好。
AQS 全称是 AbstractQueuedSynchronizer,顾名思义,是一个用来构建锁和同步器的框架,它底层用了 CAS 技术来保证操作的原子性,同时利用 FIFO 队列实现线程间的锁竞争,将基础的同步相关抽象细节放在 AQS,这也是 ReentrantLock、CountDownLatch 等同步工具实现同步的底层实现机制。它能够成为实现大部分同步需求的基础,也是 J.U.C 并发包同步的核心基础组件。
AQS核心字段和方法
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
// ...
}
head 字段为等待队列的头节点,表示当前正在执行的节点;
tail 字段为等待队列的尾节点;
state 字段为同步状态,其中 state > 0 为有锁状态,每次加锁就在原有 state 基础上加 1,即代表当前持有锁的线程加了 state 次锁,反之解锁时每次减一,当 statte = 0 为无锁状态;
通过 compareAndSetState 方法操作 CAS 更改 state 状态,保证 state 的原子性。
有没有发现,这几个字段都用 volatile 关键字进行修饰,以确保多线程间保证字段的可见性。
AQS 提供了两种锁,分别是独占锁和共享锁,独占锁指的是操作被认作一种独占操作,比如 ReentrantLock,它实现了独占锁的方法,而共享锁则指的是一个非独占操作,比如一些同步工具 CountDownLatch 和 Semaphore 等同步工具,下面是 AQS 对这两种锁提供的抽象方法。
独占锁:
// 获取锁方法
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 释放锁方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
共享锁:
// 获取锁方法
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
// 释放锁方法
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
在我们平时开发中,基本不用直接使用 AQS,我们平时都是直接使用 JDK 自带的同步类工具,如 ReentrantLock、CountDownLatch 和 Semaphore 等,它们已经可以满足绝大部分的需求了,后面会抽几篇文章单独讲一下这些同步类工具是如何使用 AQS 的,这对于我们如何构建自定义的同步工具,有很大的帮助。
下面是同步队列节点的结构:
用大神的注释来形象地描述一下队列的模型:
/**
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*/
这是一个普通双向链表的节点结构,多了 thread 字段用于存储当前线程对象,同时每个节点都有一个 waitStatus 等待状态,一共有四种状态:
CANCELLED(1):取消状态,如果当前线程的前置节点状态为 CANCELLED,则表明前置节点已经等待超时或者已经被中断了,这时需要将其从等待队列中删除。
SIGNAL(-1):等待触发状态,如果当前线程的前置节点状态为 SIGNAL,则表明当前线程需要阻塞。
CONDITION(-2):等待条件状态,表示当前节点在等待 condition,即在 condition 队列中。
PROPAGATE(-3):状态需要向后传播,表示 releaseShared 需要被传播给后续节点,仅在共享锁模式下使用。
可以这么理解:head 节点可以表示成当前持有锁的线程的节点,其余线程竞争锁失败后,会加入到队尾,tail 始终指向队列的最后一个节点。
AQS 的结构大概可总结为以下 3 部分:
用 volatile 修饰的整数类型的 state 状态,用于表示同步状态,提供 getState 和 setState 来操作同步状态;
提供了一个 FIFO 等待队列,实现线程间的竞争和等待,这是 AQS 的核心;
AQS 内部提供了各种基于 CAS 原子操作方法,如 compareAndSetState 方法,并且提供了锁操作的acquire和release方法。
独占锁
独占锁的原理是如果有线程获取到锁,那么其它线程只能是获取锁失败,然后进入等待队列中等待被唤醒。
获取锁
获取独占锁方法:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
源码解读:
通过 tryAcquire(arg) 方法尝试获取锁,这个方法需要实现类自己实现获取锁的逻辑,获取锁成功后则不执行后面加入等待队列的逻辑了;
如果尝试获取锁失败后,则执行 addWaiter(Node.EXCLUSIVE) 方法将当前线程封装成一个 Node 节点对象,并加入队列尾部;
把当前线程执行封装成 Node 节点后,继续执行 acquireQueued 的逻辑,该逻辑主要是判断当前节点的前置节点是否是头节点,来尝试获取锁,如果获取锁成功,则当前节点就会成为新的头节点,这也是获取锁的核心逻辑。
基于上面源码的步骤分析后,我们一步步往下看源码具体实现:
private Node addWaiter(Node mode) {
// 创建一个基于当前线程的节点,该节点是 Node.EXCLUSIVE 独占式类型
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
// 这里先判断队尾是否为空,如果不为空则直接将节点加入队尾
if (pred != null) {
node.prev = pred;
// 采取 CAS 操作,将当前节点设置为队尾节点,由于采用了 CAS 原子操作,无论并发怎么修改,都有且只有一条线程可以修改成功,其余都将执行后面的enq方法
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
简单来说 addWaiter(Node mode) 方法做了以下事情:
创建基于当前线程的独占式类型的节点;
利用 CAS 原子操作,将节点加入队尾。
我们继续看 enq(Node node) 方法:
private Node enq(final Node node) {
// 自旋操作
for (;;) {
Node t = tail;
// 如果队尾节点为空,那么进行CAS操作初始化队列
if (t == null) {
// 这里很关键,即如果队列为空,那么此时必须初始化队列,初始化一个空的节点表示队列头,用于表示当前正在执行的节点,头节点即表示当前正在运行的节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// 这一步也是采取CAS操作,将当前节点加入队尾,如果失败的话,自旋继续修改直到成功为止
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(final Node node) 方法主要做了以下事情:
采用自旋机制,这是 aqs 里面很重要的一个机制;
如果队尾节点为空,则初始化队列,将头节点设置为空节点,头节点即表示当前正在运行的节点;
如果队尾节点不为空,则继续采取 CAS 操作,将当前节点加入队尾,不成功则继续自旋,直到成功为止;
对比了上面两段代码,不难看出,首先是判断队尾是否为空,先进行一次 CAS 入队操作,如果失败则进入 enq(final Node node) 方法执行完整的入队操作。
完整的入队操作简单来说就是:如果队列为空,初始化队列,并将头节点设为空节点,表示当前正在运行的节点,然后再将当前线程的节点加入到队列尾部。
关于队列的初始化与入队,务必理解透彻。
经过上面 CAS 不断尝试,这时当前节点已经成功加入到队尾了,接下来就到了acquireQueued 的逻辑,我们继续往下看源码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// 线程中断标记字段
boolean interrupted = false;
for (;;) {
// 获取当前节点的 pred 节点
final Node p = node.predecessor();
// 如果 pred 节点为 head 节点,那么再次尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁之后,那么当前节点也就成为了 head 节点
setHead(node);
p.next = null; // help GC
failed = false;
// 不需要挂起,返回 false
return interrupted;
}
// 获取锁失败,则进入挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 唤醒线程后, 判断线程当前中断状态, 如果当前未被中断,则返回false, 所以if(false) , 不执行 interrupted = true;
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private final boolean parkAndCheckInterrupt() {
// 挂起线程, stop execute
LockSupport.park(this);
// 唤醒线程后, 判断线程当前中断状态, 如果当前未被中断,则返回false
return Thread.interrupted();
}
这一步 acquireQueued(final Node node, int arg) 方法主要做了以下事情:
判断当前节点的 pred 节点是否为 head 节点,如果是,则尝试获取锁;
获取锁失败后,进入挂起逻辑。
提醒一点:我们上面也说过,head 节点代表当前持有锁的线程,那么如果当前节点的 pred 节点是 head 节点,很可能此时 head 节点已经释放锁了,所以此时需要再次尝试获取锁。
接下来继续看挂起逻辑源码:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果 pred 节点为 SIGNAL 状态,返回true,说明当前节点需要挂起
return true;
// 如果ws > 0,说明节点状态为CANCELLED,需要从队列中删除
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果是其它状态,则操作CAS统一改成SIGNAL状态
// 由于这里waitStatus的值只能是0或者PROPAGATE,所以我们将节点设置为SIGNAL,从新循环一次判断
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
这一步 shouldParkAfterFailedAcquire(Node pred, Node node) 方法主要做了以下事情:
判断 pred 节点状态,如果为 SIGNAL 状态,则直接返回 true 执行挂起;
删除状态为 CANCELLED 的节点;
若 pred 节点状态为 0 或者 PROPAGATE,则将其设置为为 SIGNAL,再从 acquireQueued 方法自旋操作从新循环一次判断。
通俗来说就是:根据 pred 节点状态来判断当前节点是否可以挂起,如果该方法返回 false,那么挂起条件还没准备好,就会重新进入 acquireQueued(final Node node, int arg) 的自旋体,重新进行判断。如果返回 true,那就说明当前线程可以进行挂起操作了,那么就会继续执行挂起。
这里需要注意的时候,节点的初始值为 0,因此如果获取锁失败,会尝试将节点设置为 SIGNAL。
继续看挂起逻辑:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
LockSupport 是用来创建锁和其他同步类的基本线程阻塞原语。LockSupport 提供 park() 和 unpark() 方法实现阻塞线程和解除线程阻塞。release 释放锁方法逻辑会调用 LockSupport.unPark 方法来唤醒后继节点。
获取独占锁流程图:
释放锁
释放锁方法:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 由后继节点自己主动升级为head节点
return true;
}
return false;
}
释放锁的方法源码就很好理解,通过 tryRelease(arg) 方法尝试释放锁,这个方法需要实现类自己实现释放锁的逻辑,释放锁成功后则执行后面的唤醒后续节点的逻辑了,然后判断 head 节点不为空并且 head 节点状态不为 0,因为 addWaiter 方法默认的节点状态为 0,此时节点还没有进入就绪状态。
继续往下看源码:
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
// 将头节点的状态设置为0
// 这里会尝试清除头节点的状态,改为初始状态
compareAndSetWaitStatus(node, ws, 0);
// 后继节点
Node s = node.next;
// 如果后继节点为null,或者已经被取消了
if (s == null || s.waitStatus > 0) {
s = null;
// for循环从队列尾部一直往前找可以唤醒的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 唤醒后继节点
LockSupport.unpark(s.thread);
}
从源码可看出:释放锁主要是将头节点的后继节点唤醒,如果后继节点不符合唤醒条件,则从队尾一直往前找,直到找到符合条件的节点为止。
总结
这篇文章主要讲述了 AQS 的内部结构和它的同步实现原理,并从源码的角度深度剖析了 AQS 独占锁模式下的获取锁与释放锁的逻辑,并且从源码中我们得出:在独占锁模式下,用 state 值表示锁并且 0 表示无锁状态,0 -> 1 表示从无锁到有锁,仅允许一条线程持有锁,其余的线程会被包装成一个 Node 节点放到队列中进行挂起,队列中的头节点表示当前正在执行的线程,当头节点释放后会唤醒后继节点,从而印证了 AQS 的队列是一个 FIFO 同步队列。
Java并发之AQS源码分析(二)
共享锁
获取锁
public final void acquireShared(int arg) {
// 尝试获取共享锁,小于0表示获取失败
if (tryAcquireShared(arg) < 0)
// 执行获取锁失败的逻辑
doAcquireShared(arg);
}
这里的 tryAcquireShared 方法是留给实现方去实现获取锁的具体逻辑的,我们主要看 doAcquireShared 方法的实现逻辑:
private void doAcquireShared(int arg) {
// 添加共享锁类型节点到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 再次尝试获取共享锁
int r = tryAcquireShared(arg);
// 如果在这里成功获取共享锁,会进入共享锁唤醒逻辑
if (r >= 0) {
// 共享锁唤醒逻辑
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 与独占锁相同的挂起逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
看到上面的代码,是不是有一种熟悉的感觉,同样是采用了自旋机制,在线程挂起之前,不断地循环尝试获取锁,不同的是,一旦获取共享锁,会调用 setHeadAndPropagate 方法同时唤醒后继节点,实现共享模式,下面是唤醒后继节点代码逻辑:
private void setHeadAndPropagate(Node node, int propagate) {
// 头节点
Node h = head;
// 设置当前节点为新的头节点
// 这里不需要加锁操作,因为获取共享锁后,会从FIFO队列中依次唤醒队列,并不会产生并发安全问题
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
// 后继节点
Node s = node.next;
// 如果后继节点为空或者后继节点为共享类型,则进行唤醒后继节点
// 这里后继节点为空意思是只剩下当前头节点了
if (s == null || s.isShared())
doReleaseShared();
}
}
该方法主要做了两个重要的步骤:
将当前节点设置为新的头节点,这点很重要,这意味着当前节点的前置节点(旧头节点)已经获取共享锁了,从队列中去除;
调用 doReleaseShared 方法,它会调用 unparkSuccessor 方法唤醒后继节点。
释放锁
public final boolean releaseShared(int arg) {
// 由用户自行实现释放锁条件
if (tryReleaseShared(arg)) {
// 执行释放锁
doReleaseShared();
return true;
}
return false;
}
下面是释放锁逻辑:
private void doReleaseShared() {
for (;;) {
// 从头节点开始执行唤醒操作
// 这里需要注意,如果从setHeadAndPropagate方法调用该方法,那么这里的head是新的头节点
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
//表示后继节点需要被唤醒
if (ws == Node.SIGNAL) {
// 初始化节点状态
//这里需要CAS原子操作,因为setHeadAndPropagate和releaseShared这两个方法都会顶用doReleaseShared,避免多次unpark唤醒操作
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
// 如果初始化节点状态失败,继续循环执行
continue; // loop to recheck cases
// 执行唤醒操作
unparkSuccessor(h);
}
//如果后继节点暂时不需要唤醒,那么当前头节点状态更新为PROPAGATE,确保后续可以传递给后继节点
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果在唤醒的过程中头节点没有更改,退出循环
// 这里防止其它线程又设置了头节点,说明其它线程获取了共享锁,会继续循环操作
if (h == head) // loop if head changed
break;
}
}
共享锁的释放锁逻辑比独占锁的释放锁逻辑稍微复杂,原因是共享锁需要释放队列中所有共享类型的节点,因此需要循环操作,由于释放锁过程中会涉及多个地方修改节点状态,此时需要 CAS 原子操作来并发安全。
java.util.concurrent.locks.ReentrantLock
非公平锁、可重入实现
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 不管之前是否有线程在队列中等待, 只要锁是空闲的, 直接抢占
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入实现, 是同一个线程, 则可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
公平锁的实现、可重入实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 如果之前就有线程在等待,则入队。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入实现, 是同一个线程, 则可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
java.util.concurrent.locks.Condition
AbstractQueuedSynchronizer 原理分析 - Condition 实现原理
public interface Condition {
/**
* Causes the current thread to wait until it is signalled or
* {@linkplain Thread#interrupt interrupted}.
*/
void await() throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled.
*/
void awaitUninterruptibly();
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified waiting time elapses.
*/
long awaitNanos(long nanosTimeout) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified waiting time elapses.
*/
boolean await(long time, TimeUnit unit) throws InterruptedException;
/**
* Causes the current thread to wait until it is signalled or interrupted,
* or the specified deadline elapses.
*/
boolean awaitUntil(Date deadline) throws InterruptedException;
/**
* Wakes up one waiting thread.
*/
void signal();
/**
* Wakes up all waiting threads.
*/
void signalAll();
}
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject
头节点和尾节点, 并且都是Node类型的
private transient Node firstWaiter;
private transient Node lastWaiter;
添加一个waiter到FIFO队列
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首次添加waiter节点, firstWaiter=lastWaiter=node;
第二次添加 waiter节点, lastWaiter.waitStatus=Node.Condition则继续在队列尾添加节点。 lastWaiter.waitStatus!=Node.Condition, 清空队列中所有waitStatus!=Node.Condition的节点。
清空队列中所有waitStatus!=Node.Condition的节点
private void unlinkCancelledWaiters() {
Node t = firstWaiter; // 头节点
Node trail = null; // 尾节点
while (t != null) {
Node next = t.nextWaiter; //头节点的后继节点、
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next; // 因为头节点被清空了, 之前的后继节点升级为头节点
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
从头节点开始, 向后继查找所有waitStatus!=Node.Condition的节点,并清空掉。
线程主动释放锁, 并被挂起
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 加入到等待队列, 当前node的waitStatus变成了Node.Condition
Node node = addConditionWaiter();
// 当前线程释放锁, 并唤醒锁等待队列中的线程
// 等价于lock.unlock()
// 当前线程被从锁同步队列中给移除了; 同时在condition等待同步队列中新增了一个节点
int savedState = fullyRelease(node);
int interruptMode = 0;
//Block until signalled or interrupted.
while (!isOnSyncQueue(node)) { // 当前节点不在锁同步队列中,则进入循环, 否则推出循环
// 如果当前线程节点不在锁等待队列中了, 则挂起当前线程
// 当在其他线程调用被挂起的这个线程的interrupt方法, 被挂起的线程会被唤醒。
LockSupport.park(this);
// 线程被挂起于此, 代码终止
----- stop execute-----
// SignallALL 被其他线程唤醒了;
// 当线程被重新唤醒, 继续执行下面的代码
----- start execute -----
// 检测线程在挂起期间是否被打断过
Checks for interrupt, returning THROW_IE if interrupted before signalled, REINTERRUPT if after signalled, or 0 if not interrupted.
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 如果node不在锁等待队列中, 则加入
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
lock.unlock() 将会把释放锁的线程从锁同步队列中移除的
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
// 移除头节点操作
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
释放当前线程占用的锁
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState(); //获取当前线程的锁同步状态
if (release(savedState)) { // 释放当前线程的锁同步状态,并唤醒锁队列中的后继节点
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED; //如果当前线程释放锁失败, ,则将该节点从等待队列移除
}
}
public final boolean release(int arg) {
if (tryRelease(arg)) { // 当前线程释放锁
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 锁队列的头节点从挂起中恢复, 执行获取锁的逻辑
return true;
}
return false;
}
判断节点是否在同步队列中
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
signalALL唤醒其他线程
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
// 关键代码
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 啥都不做, 因为已经被其他线程改过了。有两个线程调用调用了signalAll; 避免重复插入
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 将condition等待节点加入到锁同步队列, 重新开启锁的竞争。
Node p = enq(node);
int ws = p.waitStatus;
// 因为是并发环境, 锁已经被线程取消了, 唤醒线程
// 回到await
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition总结
AQS在维护了锁等待队列的同时,又维护了一个Condition队列,并且锁等待队列只有一个, 但是Condition队列可以存在多个。
如果调用了await
不管因为哪个Condition导致线程等待, 总之线程应该交出锁,并被移除出锁同步队列, 直到 等待被Condition唤醒或者被打断, signall, signalAll 还会加入到锁同步队列, 如果被打断, 则直接抛异常。
同步工具:
java.util.concurrent.CountDownLatch
CountDownLatch,它的含义是允许一个或多个线程等待其它线程的操作执行完毕后再执行后续的操作。
java.util.concurrent.Semaphore
同步工具 java.util.concurrent.CyclicBarrier
CyclicBarrier,回环栅栏,当一组线程都达到某个状态,在最后一个达到的线程中先执行barrierAction,然后放行所有线程继续执行后续操作。
用于并行计算, 将大任务拆分成几个小任务, 每个线程执行一个小任务, 提前完成任务的线程进入Condition等待队列,最后一个完成小任务的线程负责执行barrierAction, 在barrierAction中负责将多个小任务的执行结果合并。