看这篇文章前最好看下https://www.jianshu.com/p/89132109d49d
回顾一下java中的等待/通知机制
我们有时会遇到这样的场景:线程A执行到某个点的时候,因为某个条件condition不满足,需要线程A暂停;等到线程B修改了条件condition,使condition满足了线程A的要求时,A再继续执行。
自旋实现的等待通知
最简单的实现方法就是将condition设为一个volatile的变量,当A线程检测到条件不满足时就自旋,类似下面:
public class Test {
private static volatile int condition = 0;
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
while (!(condition == 1)) {
// 条件不满足,自旋
}
System.out.println("a executed");
}
});
A.start();
Thread.sleep(2000);
condition = 1;
}
}
这种方式的问题在于自旋非常耗费CPU资源,当然如果在自旋的代码块里加入Thread.sleep(time)将会减轻CPU资源的消耗,但是如果time设的太大,A线程就不能及时响应condition的变化,如果设的太小,依然会造成CPU的消耗。
Object提供的等待通知
因此,java在Object类里提供了wait()和notify()方法,使用方法如下:
class Test1 {
private static volatile int condition = 0;
private static final Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
synchronized (lock) {
while (!(condition == 1)) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
System.out.println("a executed by notify");
}
}
});
A.start();
Thread.sleep(2000);
condition = 1;
synchronized (lock) {
lock.notify();
}
}
}
通过代码可以看出,在使用一个对象的wait()、notify()方法前必须要获取这个对象的锁。
当线程A调用了lock对象的wait()方法后,线程A将释放持有的lock对象的锁,然后将自己挂起,直到有其他线程调用notify()/notifyAll()方法或被中断。可以看到在lock.wait()前面检测condition条件的时候使用了一个while循环而不是if,那是因为当有其他线程把condition修改为满足A线程的要求并调用notify()后,A线程会重新等待获取锁,获取到锁后才从lock.wait()方法返回,而在A线程等待锁的过程中,condition是有可能再次变化的。
因为wait()、notify()是和synchronized配合使用的,因此如果使用了显示锁Lock,就不能用了。所以显示锁要提供自己的等待/通知机制,Condition应运而生。
显示锁提供的等待通知
我们用Condition实现上面的例子:
class Test2 {
private static volatile int condition = 0;
private static Lock lock = new ReentrantLock();
private static Condition lockCondition = lock.newCondition();
public static void main(String[] args) throws InterruptedException {
Thread A = new Thread(new Runnable() {
@Override
public void run() {
lock.lock();
try {
while (!(condition == 1)) {
lockCondition.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
System.out.println("a executed by condition");
}
});
A.start();
Thread.sleep(2000);
condition = 1;
lock.lock();
try {
lockCondition.signal();
} finally {
lock.unlock();
}
}
}
可以看到通过 lock.newCondition() 可以获得到 lock 对应的一个Condition对象lockCondition ,lockCondition的await()、signal()方法分别对应之前的Object的wait()和notify()方法。整体上和Object的等待通知是类似的。
实现一个阻塞队列
上面我们看到了Condition实现的等待通知和Object的等待通知是非常类似的,而Condition提供的等待通知功能更强大,最重要的一点是,一个lock对象可以通过多次调用 lock.newCondition() 获取多个Condition对象,也就是说,在一个lock对象上,可以有多个等待队列,而Object的等待通知在一个Object上,只能有一个等待队列。用下面的例子说明,下面的代码实现了一个阻塞队列,当队列已满时,add操作被阻塞有其他线程通过remove方法删除元素;当队列已空时,remove操作被阻塞直到有其他线程通过add方法添加元素。
public class BoundedQueue1<T> {
public List<T> q; //这个列表用来存队列的元素
private int maxSize; //队列的最大长度
private Lock lock = new ReentrantLock();
private Condition addConditoin = lock.newCondition();
private Condition removeConditoin = lock.newCondition();
public BoundedQueue1(int size) {
q = new ArrayList<>(size);
maxSize = size;
}
public void add(T e) {
lock.lock();
try {
while (q.size() == maxSize) {
addConditoin.await();
}
q.add(e);
removeConditoin.signal(); //执行了添加操作后唤醒因队列空被阻塞的删除操作
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public T remove() {
lock.lock();
try {
while (q.size() == 0) {
removeConditoin.await();
}
T e = q.remove(0);
addConditoin.signal(); //执行删除操作后唤醒因队列满而被阻塞的添加操作
return e;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;
} finally {
lock.unlock();
}
}
}
demo
Condition是JUC里面提供于控制线程释放锁, 然后进行等待其他获取锁的线程发送 signal 信号来进行唤醒的工具类.
- Condition内部主要是由一个装载线程节点 Node 的 Condition Queue 实现。
- 对 Condition 的方法(await, signal等) 的调用必需是在本线程获取了独占锁的前提下。
- 因为 操作Condition的方法的前提是获取独占锁, 所以 Condition Queue 内部是一条不支持并发安全的单向 queue
public class ConditionTest {
private static final Logger logger = Logger.getLogger(ConditionTest.class);
static final Lock lock = new ReentrantLock();
static final Condition condition = lock.newCondition();
public static void main(String[] args) throws Exception{
final Thread thread1 = new Thread("Thread 1 "){
@Override
public void run() {
lock.lock(); // 线程 1获取 lock
logger.info(Thread.currentThread().getName() + " 正在运行 .....");
try {
Thread.sleep(2 * 1000);
logger.info(Thread.currentThread().getName() + " 停止运行, 等待一个 signal ");
condition.await(); // 调用 condition.await 进行释放锁, 将当前节点封装成一个 Node 放入 Condition Queue 里面, 等待唤醒
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(Thread.currentThread().getName() + " 获取一个 signal, 继续执行 ");
lock.unlock(); // 释放锁
}
};
thread1.start(); // 线程 1 线运行
Thread.sleep(1 * 1000);
Thread thread2 = new Thread("Thread 2 "){
@Override
public void run() {
lock.lock(); // 线程 2获取lock
logger.info(Thread.currentThread().getName() + " 正在运行.....");
thread1.interrupt(); // 对线程1 进行中断 看看中断后会怎么样? 结果 线程 1还是获取lock, 并且最后还进行 lock.unlock()操作
try {
Thread.sleep(2 * 1000);
}catch (Exception e){
}
condition.signal(); // 发送唤醒信号 从 AQS 的 Condition Queue 里面转移 Node 到 Sync Queue
logger.info(Thread.currentThread().getName() + " 发送一个 signal ");
logger.info(Thread.currentThread().getName() + " 发送 signal 结束");
lock.unlock(); // 线程 2 释放锁
}
};
thread2.start();
}
}
执行过程
线程 1 开始执行, 获取 lock, 然后开始睡眠 2秒
当线程1睡眠到 1秒时, 线程2开始执行, 但是lock被线程1获取, 所以 等待
线程 1 睡足2秒 调用 condition.await() 进行锁的释放, 并且将 线程1封装成一个 node 放到 condition 的 Condition Queue里面,等待其他获取锁的线程给他 signal, 或对其进行中断,中断后可以到 Sync Queue里面进而获取 锁
线程 2 获取锁成功, 中断 线程1, 线程被中断后, node 从 Condition Queue 转移到 Sync Queue 里面, 但是 lock 还是被 线程2获取者, 所以 node呆在 Sync Queue 里面等待获取 lock
线程 2睡了 2秒, 开始用signal唤醒 Condition Queue 里面的节点(此时代表 线程1的node已经到 Sync Queue 里面
线程 2释放lock, 并且在 Sync Queue 里面进行唤醒等待获取锁的节点 node
7.线程1 得到唤醒, 获取锁
- 线程1 释放锁
[2017-02-08 22:43:09,557] INFO Thread 1 (ConditionTest.java:26) - Thread 1 正在运行 .....
[2017-02-08 22:43:11,565] INFO Thread 1 (ConditionTest.java:30) - Thread 1 停止运行, 等待一个 signal
[2017-02-08 22:43:11,565] INFO Thread 2 (ConditionTest.java:48) - Thread 2 正在运行.....
java.lang.InterruptedException
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:57) - Thread 2 发送一个 signal
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
[2017-02-08 22:43:13,566] INFO Thread 2 (ConditionTest.java:58) - Thread 2 发送 signal 结束
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
[2017-02-08 22:43:13,567] INFO Thread 1 (ConditionTest.java:35) - Thread 1 获取一个 signal, 继续执行
at com.lami.tuomatuo.search.base.concurrent.aqs.ConditionTest$1.run(ConditionTest.java:31)
源码分析
之前我们介绍AQS的时候说过,AQS的同步排队用了一个隐式的双向队列,同步队列的每个节点是一个AbstractQueuedSynchronizer.Node实例。
Node的主要字段有:
- waitStatus:等待状态,所有的状态见下面的表格。
- prev:前驱节点
- next:后继节点
- thread:当前节点代表的线程
- nextWaiter:Node既可以作为同步队列节点使用,也可以作为Condition的等待队列节点使用(将会在后面讲Condition时讲到)。在作为同步队列节点时,nextWaiter可能有两个值:EXCLUSIVE、SHARED标识当前节点是独占模式还是共享模式;在作为等待队列节点使用时,nextWaiter保存后继节点。
状态 | 值 | 含义 |
---|---|---|
CANCELLED | 1 | 当前节点因为超时或中断被取消同步状态获取,该节点进入该状态不会再变化 |
SIGNAL | -1 | 标识后继的节点处于阻塞状态,当前节点在释放同步状态或被取消时,需要通知后继节点继续运行。每个节点在阻塞前,需要标记其前驱节点的状态为SIGNAL。 |
CONDITION | -2 | 标识当前节点是作为等待队列节点使用的。 |
PROPAGATE | -3 | |
0 | 0 | 初始状态 |
Condition实现等待的时候内部也有一个等待队列,等待队列是一个隐式的单向队列,等待队列中的每一个节点也是一个AbstractQueuedSynchronizer.Node实例。
每个Condition对象中保存了firstWaiter和lastWaiter作为队列首节点和尾节点,每个节点使用Node.nextWaiter保存下一个节点的引用,因此等待队列是一个单向队列。
每当一个线程调用Condition.await()方法,那么该线程会释放锁,构造成一个Node节点加入到等待队列的队尾。
等待
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //构造一个新的等待队列Node加入到队尾
int savedState = fullyRelease(node); //释放当前线程的独占锁,不管重入几次,都把state释放为0
int interruptMode = 0;
//如果当前节点没有在同步队列上,即还没有被signal,则将当前线程阻塞
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//后面的蓝色代码都是和中断相关的,主要是区分两种中断:是在被signal前中断还是在被signal后中断,如果是被signal前就被中断则抛出 InterruptedException,否则执行 Thread.currentThread().interrupt();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断则直接退出自旋
break;
}
//退出了上面自旋说明当前节点已经在同步队列上,但是当前节点不一定在同步队列队首。acquireQueued将阻塞直到当前节点成为队首,即当前线程获得了锁。然后await()方法就可以退出了,让线程继续执行await()后的代码。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
//如果当前节点状态是CONDITION或node.prev是null,则证明当前节点在等待队列上而不是同步队列上。之所以可以用node.prev来判断,是因为一个节点如果要加入同步队列,在加入前就会设置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//如果node.next不为null,则一定在同步队列上,因为node.next是在节点加入同步队列后设置的
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node); //前面的两个判断没有返回的话,就从同步队列队尾遍历一个一个看是不是当前节点。
}
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
通知
Condition.signal() 方法的源码如下:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException(); //如果同步状态不是被当前线程独占,直接抛出异常。从这里也能看出来,Condition只能配合独占类同步组件使用。
Node first = firstWaiter;
if (first != null)
doSignal(first); //通知等待队列队首的节点。
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && //transferForSignal方法尝试唤醒当前节点,如果唤醒失败,则继续尝试唤醒当前节点的后继节点。
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//如果当前节点状态为CONDITION,则将状态改为0准备加入同步队列;如果当前状态不为CONDITION,说明该节点等待已被中断,则该方法返回false,doSignal()方法会继续尝试唤醒当前节点的后继节点
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node); //将节点加入同步队列,返回的p是节点在同步队列中的先驱节点
int ws = p.waitStatus;
//如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,线程被唤醒后会执行acquireQueued方法,该方法会重新尝试将节点的先驱状态设为SIGNAL并再次park线程;如果当前设置前驱节点状态为SIGNAL成功,那么就不需要马上唤醒线程了,当它的前驱节点成为同步队列的首节点且释放同步状态后,会自动唤醒它。
//其实笔者认为这里不加这个判断条件应该也是可以的。只是对于CAS修改前驱节点状态为SIGNAL成功这种情况来说,如果不加这个判断条件,提前唤醒了线程,等进入acquireQueued方法了节点发现自己的前驱不是首节点,还要再阻塞,等到其前驱节点成为首节点并释放锁时再唤醒一次;而如果加了这个条件,线程被唤醒的时候它的前驱节点肯定是首节点了,线程就有机会直接获取同步状态从而避免二次阻塞,节省了硬件资源。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition等待通知的本质
总的来说,Condition的本质就是等待队列和同步队列的交互:
当一个持有锁的线程调用Condition.await()时,它会执行以下步骤:
- 构造一个新的等待队列节点加入到等待队列队尾
- 释放锁
- 自旋,直到它在等待队列上的节点移动到了同步队列(通过其他线程调用signal())或被中断
- 阻塞当前节点,直到它获取到了锁,也就是它在同步队列上的节点排队排到了队首。
当一个持有锁的线程调用Condition.signal()时,它会执行以下操作:
- 从等待队列的队首开始,尝试对队首节点执行唤醒操作;如果节点CANCELLED,就尝试唤醒下一个节点;如果再CANCELLED则继续迭代。
对每个节点执行唤醒操作时,首先将节点加入同步队列,此时await()操作的步骤3的解锁条件就已经开启了。然后分两种情况讨论:
- 如果先驱节点的状态为CANCELLED(>0) 或设置先驱节点的状态为SIGNAL失败,那么就立即唤醒当前节点对应的线程,此时await()方法就会完成步骤3,进入步骤4.
- 如果成功把先驱节点的状态设置为了SIGNAL,那么就不立即唤醒了。等到先驱节点成为同步队列首节点并释放了同步状态后,会自动唤醒当前节点对应线程的,这时候await()的步骤3才执行完成,而且有很大概率快速完成步骤4.
如果知道Object的等待通知机制,Condition的使用是比较容易掌握的,因为和Object等待通知的使用基本一致。
对Condition的源码理解,主要就是理解等待队列,等待队列可以类比同步队列,而且等待队列比同步队列要简单,因为等待队列是单向队列,同步队列是双向队列。
之所以同步队列要设计成双向的,是因为在同步队列中,节点唤醒是接力式的,由每一个节点唤醒它的下一个节点,如果是由next指针获取下一个节点,是有可能获取失败的,因为虚拟队列每添加一个节点,是先用CAS把tail设置为新节点,然后才修改原tail的next指针到新节点的。因此用next向后遍历是不安全的,但是如果在设置新节点为tail前,为新节点设置prev,则可以保证从tail往前遍历是安全的。因此要安全的获取一个节点Node的下一个节点,先要看next是不是null,如果是null,还要从tail往前遍历看看能不能遍历到Node。
而等待队列就简单多了,等待的线程就是等待者,只负责等待,唤醒的线程就是唤醒者,只负责唤醒,因此每次要执行唤醒操作的时候,直接唤醒等待队列的首节点就行了。等待队列的实现中不需要遍历队列,因此也不需要prev指针。
Condition 构造函数级基本属性
主要是Condition Queue 的头尾节点
/** First node of condition queue */
/** Condition Queue 里面的头节点 */
private transient Node firstWaiter;
/** Last node of condition queue */
/** Condition Queue 里面的尾节点 */
private transient Node lastWaiter;
/** Creates a new {@code ConditionObject} instance */
/** 构造函数 */
public ConditionObject(){}
Condition Queue enqueue节点方法 addConditionWaiter
addConditionWaiter方法主要用于调用 Condition.await 时将当前节点封装成 一个Node, 加入到 Condition Queue里面。
/**
* Adds a new waiter to wait queue
* 将当前线程封装成一个 Node 节点 放入大 Condition Queue 里面
* 大家可以注意到, 下面对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
* @return
*/
private Node addConditionWaiter(){
Node t = lastWaiter; // 1. Condition queue 的尾节点
// If lastWaiter is cancelled, clean out // 2.尾节点已经Cancel, 直接进行清除,
// 这里有1个问题, 1 何时出现t.waitStatus != Node.CONDITION -> 在对线程进行中断时 ConditionObject -> await -> checkInterruptWhileWaiting -> transferAfterCancelledWait "compareAndSetWaitStatus(node, Node.CONDITION, 0)" <- 导致这种情况一般是 线程中断或 await 超时
// 一个注意点: 当Condition进行 awiat 超时或被中断时, Condition里面的节点是没有被删除掉的, 需要其他 await 在将线程加入 Condition Queue 时调用addConditionWaiter而进而删除, 或 await 操作差不多结束时, 调用 "node.nextWaiter != null" 进行判断而删除 (PS: 通过 signal 进行唤醒时 node.nextWaiter 会被置空, 而中断和超时时不会)
if(t != null && t.waitStatus != Node.CONDITION){
unlinkCancelledWaiters(); // 3. 调用 unlinkCancelledWaiters 对 "waitStatus != Node.CONDITION" 的节点进行删除(在Condition里面的Node的waitStatus 要么是CONDITION(正常), 要么就是 0 (signal/timeout/interrupt))
t = lastWaiter; // 4. 获取最新的 lastWaiter
}
Node node = new Node(Thread.currentThread(), Node.CONDITION); // 5. 将线程封装成 node 准备放入 Condition Queue 里面
if(t == null){
firstWaiter = node; // 6 .Condition Queue 是空的
}else{
t.nextWaiter = node; // 7. 最加到 queue 尾部
}
lastWaiter = node; // 8. 重新赋值 lastWaiter
return node;
}
对 Condition Queue 的操作都没考虑到 并发(Sync Queue 的队列是支持并发操作的), 这是为什么呢? 因为在进行操作 Condition 是当前的线程已经获取了AQS的独占锁, 所以不需要考虑并发的情况
Condition 唤醒 first节点方法 doSignal
这里的唤醒指的是将节点从 Condition Queue 转移到 Sync Queue 里面
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters
* @param first
*/
/**
* 唤醒 Condition Queue 里面的头节点, 注意这里的唤醒只是将 Node 从 Condition Queue 转到 Sync Queue 里面(这时的 Node 也还是能被 Interrupt)
*/
private void doSignal(Node first){
do{
if((firstWaiter = first.nextWaiter) == null){ // 1. 将 first.nextWaiter 赋值给 nextWaiter 为下次做准备
lastWaiter = null; // 2. 这时若 nextWaiter == null, 则说明 Condition 为空了, 所以直接置空 lastWaiter
}
first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
}while(!transferForSignal(first) && (first = firstWaiter) != null); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面, 返回不成功的话, 将 firstWaiter 赋值给 first
}
Condition 唤醒 所有 节点方法 doSignalAll
/**
* Removes and transfers all nodes
* @param first (non-null) the first node on condition queue
*/
/**
* 唤醒 Condition Queue 里面的所有的节点
*/
private void doSignalAll(Node first){
lastWaiter = firstWaiter = null; // 1. 将 lastWaiter, firstWaiter 置空
do{
Node next = first.nextWaiter; // 2. 初始化下个换新的节点
first.nextWaiter = null; // 3. first.nextWaiter == null 是判断 Node 从 Condition queue 转移到 Sync Queue 里面是通过 signal 还是 timeout/interrupt
transferForSignal(first); // 4. 调用 transferForSignal将 first 转移到 Sync Queue 里面
first = next; // 5. 开始换新 next 节点
}while(first != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
Condition 删除取消节点的方法 unlinkCancelledWaiters
一般的节点都会被 signal 唤醒, 从 Condition Queue 转移到 Sync Queue, 而若遇到 interrupt 或 等待超时, 则直接改变 node 的状态(从 CONDITION 变成 0), 并直接放入 Sync 里面, 而不清理Condition Queue 里面的节点,所以需要下面的函数
/**
* 在 调用 addConditionWaiter 将线程放入 Condition Queue 里面时 或 awiat 方法获取 差不多结束时 进行清理 Condition queue 里面的因 timeout/interrupt 而还存在的节点
* 这个删除操作比较巧妙, 其中引入了 trail 节点, 可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
*/
private void unlinkCancelledWaiters(){
Node t = firstWaiter;
Node trail = null;
while(t != null){
Node next = t.nextWaiter; // 1. 先初始化 next 节点
if(t.waitStatus != Node.CONDITION){ // 2. 节点不有效, 在Condition Queue 里面 Node.waitStatus 只有可能是 CONDITION 或是 0(timeout/interrupt引起的)
t.nextWaiter = null; // 3. Node.nextWaiter 置空
if(trail == null){ // 4. 一次都没有遇到有效的节点
firstWaiter = next; // 5. 将 next 赋值给 firstWaiter(此时 next 可能也是无效的, 这只是一个临时处理)
}else{
trail.nextWaiter = next; // 6. next 赋值给 trail.nextWaiter, 这一步其实就是删除节点 t
}
if(next == null){ // 7. next == null 说明 已经 traverse 完了 Condition Queue
lastWaiter = trail;
}
}else{
trail = t; // 8. 将有效节点赋值给 trail
}
t = next;
}
}
这是一段非常精巧的queue节点删除, 主要还是在 节点 trail 上, trail 节点可以理解为traverse整个 Condition Queue 时遇到的最后一个有效的节点
Condition 唤醒首节点方法 signal
/**
* 将 Condition queue 的头节点转移到 Sync Queue 里面
* 在进行调用 signal 时, 当前的线程必须获取了 独占的锁
*/
@Override
public void signal() {
if(!isHeldExclusively()){ // 1. 判断当前的线程是否已经获取 独占锁
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignal(first); // 2. 调用 doSignal 进行转移
}
}
Condition 唤醒所有节点方法 signalAll
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively()}
* return {@code false}
*/
/**
* 将 Condition Queue 里面的节点都转移到 Sync Queue 里面
*/
public final void signalAll(){
if(!isHeldExclusively()){
throw new IllegalMonitorStateException();
}
Node first = firstWaiter;
if(first != null){
doSignalAll(first);
}
}
Condition 释放锁进行等待方法 awaitUninterruptibly
awaitUninterruptibly 方法是一个不响应 中断的方法
整个流程
将当前的线程封装成 Node 加入到 Condition 里面
丢弃当前线程所拥有的 独占锁
等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
/**
* 不响应线程中断的方式进行 await
*/
public final void awaitUninterruptibly(){
Node node = addConditionWaiter(); // 1. 将当前线程封装成一个 Node 放入 Condition Queue 里面
int savedState = fullyRelease(node); // 2. 释放当前线程所获取的所有的独占锁(PS: 独占的锁支持重入), 等等, 为什么要释放呢? 以为你调用 awaitUninterruptibly 方法的前提就是你已经获取了 独占锁
boolean interrupted = false; // 3. 线程中断标识
while(!isOnSyncQueue(node)){ // 4. 这里是一个 while loop, 调用 isOnSyncQueue 判断当前的 Node 是否已经被转移到 Sync Queue 里面
LockSupport.park(this); // 5. 若当前 node 不在 sync queue 里面, 则先 block 一下等待其他线程调用 signal 进行唤醒; (这里若有其他线程对当前线程进行 中断的换, 也能进行唤醒)
if(Thread.interrupted()){ // 6. 判断这是唤醒是 signal 还是 interrupted(Thread.interrupted()会清楚线程的中断标记, 但没事, 我们有步骤7中的interrupted进行记录)
interrupted = true; // 7. 说明这次唤醒是被中断而唤醒的,这个标记若是true的话, 在 awiat 离开时还要 自己中断一下(selfInterrupt), 其他的函数可能需要线程的中断标识
}
}
if(acquireQueued(node, savedState) || interrupted){ // 8. acquireQueued 返回 true 说明线程在 block 的过程中式被 inetrrupt 过(其实 acquireQueued 返回 true 也有可能其中有一次唤醒是 通过 signal)
selfInterrupt(); // 9. 自我中断, 外面的线程可以通过这个标识知道, 整个 awaitUninterruptibly 运行过程中 是否被中断过
}
}
Condition 中断标示
/**
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire
*/
/**
* 下面两个是用于追踪 调用 awaitXXX 方法时线程有没有被中断过
* 主要的区别是
* REINTERRUPT: 代表线程是在 signal 后被中断的 (REINTERRUPT = re-interrupt 再次中断 最后会调用 selfInterrupt)
* THROW_IE: 代表在接受 signal 前被中断的, 则直接抛出异常 (Throw_IE = throw inner exception)
*/
/** Mode meaning to reinterrupt on exit from wait */
/** 在离开 awaitXX方法, 退出前再次 自我中断 (调用 selfInterrupt)*/
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
/** 在离开 awaitXX方法, 退出前再次, 以为在 接受 signal 前被中断, 所以需要抛出异常 */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted
*/
/**
* 检查 在 awaitXX 方法中的这次唤醒是否是中断引起的
* 若是中断引起的, 则将 Node 从 Condition Queue 转移到 Sync Queue 里面
* 返回值的区别:
* 0: 此次唤醒是通过 signal -> LockSupport.unpark
* THROW_IE: 此次的唤醒是通过 interrupt, 并且 在 接受 signal 之前
* REINTERRUPT: 线程的唤醒是 接受过 signal 而后又被中断
*/
private int checkInterruptWhileWaiting(Node node){
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode
*/
`Condition 中断处理方法 reportInterruptAfterWait`
/**
* 这个方法是在 awaitXX 方法离开前调用的, 主要是根据
* interrupMode 判断是抛出异常, 还是自我再中断一下
*/
private void reportInterruptAfterWait(int interrupMode) throws InterruptedException{
if(interrupMode == THROW_IE){
throw new InterruptedException();
}
else if(interrupMode == REINTERRUPT){
selfInterrupt();
}
}
Condition 释放锁 进行等待的方法 await
await 此方法响应中断请求, 当接受到中断请求后会将节点从 Condition Queue 转移到 Sync Queue
/**
* Implements interruptible condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Blocking until signalled or interrupted
* Reacquire by invoking specifized version of
* {@link #acquire(int)} with saved state as argument.
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*
* @throws InterruptedException
*/
/**
* 支持 InterruptedException 的 await <- 注意这里即使是线程被中断,
* 还是需要获取了独占的锁后, 再 调用 lock.unlock 进行释放锁
*/
@Override
public final void await() throws InterruptedException {
if(Thread.interrupted()){ // 1. 判断线程是否中断
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
LockSupport.park(this); // 5. 当前线程没在 Sync Queue 里面, 则进行 block
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 6. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 7. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // clean up if cancelled // 8. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 9. 进行 cancelled 节点的清除
}
if(interruptMode != 0){ // 10. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 11. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
}
}
Condition 释放锁 进行等待的方法 awaitNanos
awaitNanos 具有超时功能, 与响应中断的功能, 不管中断还是超时都会 将节点从 Condition Queue 转移到 Sync Queue
**
* Impelemnts timed condition wait
*
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until aignalled, interrupted, or timed out
* Reacquire by invoking specified version of
* {@link #acquire(int)} with saved state as argument
* If interrupted while blocked in step 4, throw InterruptedException
* </li>
*/
/**
* 所有 awaitXX 方法其实就是
* 0. 将当前的线程封装成 Node 加入到 Condition 里面
* 1. 丢到当前线程所拥有的 独占锁,
* 2. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
* 3. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
* 4. 最后还是需要调用 lock./unlock 进行释放锁
*/
@Override
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
if(Thread.interrupted()){ // 1. 判断线程是否中断
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
final long deadline = System.nanoTime() + nanosTimeout; // 4. 计算 wait 的截止时间
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 5. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
if(nanosTimeout <= 0L){ // 6. 等待时间超时(这里的 nanosTimeout 是有可能 < 0),
transferAfterCancelledWait(node); // 7. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
break;
}
if(nanosTimeout >= spinForTimeoutThreshold){ // 8. 当剩余时间 < spinForTimeoutThreshold, 其实函数 spin 比用 LockSupport.parkNanos 更高效
LockSupport.parkNanos(this, nanosTimeout); // 9. 进行线程的 block
}
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 10. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
}
nanosTimeout = deadline - System.nanoTime(); // 11. 计算剩余时间
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 12. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 13. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 14. 进行 cancelled 节点的清除
}
if(interruptMode != 0){ // 15. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 16. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
}
return deadline - System.nanoTime(); // 17 这个返回值代表是 通过 signal 还是 超时
}
Condition 释放锁 进行等待的方法 awaitUntil
/**
* Implements absolute timed condition wait
* <li>
* If current thread is interrupted, throw InterruptedException
* Save lock state returned by {@link #getState()}
* Invoke {@link #release(int)} with saved state as argument,
* throwing IllegalMonitorStateException if it fails
* Block until signalled, interrupted, or timed out
* Reacquire by invoking specialized version of
* {@link #acquire(int)} with saved state as argument
* if interrupted while blocked in step 4, throw InterruptedException
* If timeed out while blocked in step 4, return false, else true
* </li>
*/
/**
* 所有 awaitXX 方法其实就是
* 0. 将当前的线程封装成 Node 加入到 Condition 里面
* 1. 丢到当前线程所拥有的 独占锁,
* 2. 等待 其他获取 独占锁的线程的唤醒, 唤醒从 Condition Queue 到 Sync Queue 里面, 进而获取 独占锁
* 3. 最后获取 lock 之后, 在根据线程唤醒的方式(signal/interrupt) 进行处理
* 4. 最后还是需要调用 lock./unlock 进行释放锁
*
* awaitUntil 和 awaitNanos 差不多
*/
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
long abstime = deadline.getTime(); // 1. 判断线程是否中断
if(Thread.interrupted()){
throw new InterruptedException();
}
Node node = addConditionWaiter(); // 2. 将线程封装成一个 Node 放到 Condition Queue 里面, 其中可能有些清理工作
int savedState = fullyRelease(node); // 3. 释放当前线程所获取的所有的锁 (PS: 调用 await 方法时, 当前线程是必须已经获取了独占的锁)
boolean timeout = false;
int interruptMode = 0;
while(!isOnSyncQueue(node)){ // 4. 判断当前线程是否在 Sync Queue 里面(这里 Node 从 Condtion Queue 里面转移到 Sync Queue 里面有两种可能 (1) 其他线程调用 signal 进行转移 (2) 当前线程被中断而进行Node的转移(就在checkInterruptWhileWaiting里面进行转移))
if(System.currentTimeMillis() > abstime){ // 5. 计算是否超时
timeout = transferAfterCancelledWait(node); // 6. 调用 transferAfterCancelledWait 将 Node 从 Condition 转移到 Sync Queue 里面
break;
}
LockSupport.parkUntil(this, abstime); // 7. 进行 线程的阻塞
if((interruptMode = checkInterruptWhileWaiting(node)) != 0){ // 8. 判断此次线程的唤醒是否因为线程被中断, 若是被中断, 则会在checkInterruptWhileWaiting的transferAfterCancelledWait 进行节点的转移; 返回值 interruptMode != 0
break; // 说明此是通过线程中断的方式进行唤醒, 并且已经进行了 node 的转移, 转移到 Sync Queue 里面
}
}
if(acquireQueued(node, savedState) && interruptMode != THROW_IE){ // 9. 调用 acquireQueued在 Sync Queue 里面进行 独占锁的获取, 返回值表明在获取的过程中有没有被中断过
interruptMode = REINTERRUPT;
}
if(node.nextWaiter != null){ // 10. 通过 "node.nextWaiter != null" 判断 线程的唤醒是中断还是 signal, 因为通过中断唤醒的话, 此刻代表线程的 Node 在 Condition Queue 与 Sync Queue 里面都会存在
unlinkCancelledWaiters(); // 11. 进行 cancelled 节点的清除
}
if(interruptMode != 0){ // 12. "interruptMode != 0" 代表通过中断的方式唤醒线程
reportInterruptAfterWait(interruptMode); // 13. 根据 interruptMode 的类型决定是抛出异常, 还是自己再中断一下
}
return !timeout; // 13. 返回是否通过 interrupt 进行线程的唤醒
}