此篇文章分析AbstractQueuedSynchronizer(AQS)类里的ConditionObject。
Condition
首先要明白Condition的应用场景,Condition经常用于生产者-消费者模型中。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class BoundedBuffer {
final Lock lock = new ReentrantLock();
// condition 依赖于 lock 来产生
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生产
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); // 队列已满,等待,直到 not full 才能继续生产
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
} finally {
lock.unlock();
}
}
// 消费
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
return x;
} finally {
lock.unlock();
}
}
}
无论是await()还是signal()都需要获取到锁才可以操作,put()方法里当notFull.await()时,当前线程首先释放持有的锁,然后在此堵塞,等待唤醒并获取锁,获取锁之后才能返回并继续向下。执行take()方法里notFull().signal()会将之前阻塞的线程唤醒,然后lock.unlock()释放锁。
这里要注意的是,await()方法是首先释放锁,然后阻塞,只有当signal()将其唤醒并重新获得锁才会继续向下运行。
Conditon的初始化
每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject是AbstractQueuedSynchronizer(AQS)中的内部类。
ConditonObject类的属性:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue.条件队列的头节点 */
private transient Node firstWaiter;
/** Last node of condition queue. 条件队列的尾节点*/
private transient Node lastWaiter;
..............................
}
上一篇文章分析的是AQS的阻塞队列,这里引入了一个条件队列。
node里的属性:
volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile Node prev; //阻塞队列的前驱
volatile Node next;//阻塞队列的后继
volatile Thread thread;
Node nextWaiter; //条件队列的后继
不管是阻塞队列还是条件队列的节点,都是node的实例,因为条件队列中的节点需要移动到阻塞队列里。
一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。每个condition就是一个条件队列。
根据这个图片再具体说明其简单流程:
1.线程1调用condition1.await()时首先释放锁,将当前线程1实例化成一个node加入条件队列condition1中,并在此阻塞。
2.线程2调用condition1.signal()将条件队列condition1中的firstWaiter(队头)移动到阻塞队列的队尾,等待获取锁,抢到锁之后condition1.await()才能返回,继续向下执行。
await()
public final void await() throws InterruptedException {
//首先判断线程的中断状态,如果中断标志位为true抛出异常。
if (Thread.interrupted())
throw new InterruptedException();
//将此线程实例成node加入条件队列。
Node node = addConditionWaiter();
//完全释放锁(完全释放的原因是可能存在可重入的情况),返回值是释放锁之前的state值。
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断是否已经转移到阻塞队列(同步队列)中,不在的话在此处挂起阻塞。
//退出循环有两种情况:1.节点被转移到阻塞队列中 2.响应中断,break退出循环
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//interruptMode代表不同的中断情况。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//无论哪种情况在while循环中退出,此节点一定已经在阻塞队列中,等待获取锁。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//进入这个if条件体里是因为此节点是通过中断进入到阻塞队列中的,需要调用一次unlinkCancelledWaiters(),清除在条件队列中已经取消等待的节点。
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//根据中断的不同情况进行不同的操作。
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
addConditionWaiter()
private Node addConditionWaiter() {
//判断当前线程是否是持有锁的线程,如果不是抛出异常。所以进行await()首先要获取到锁。
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//判断条件队列中节点是否取消排队通过(t.waitStatus != Node.CONDITION)来判断,如果尾节点取消排队,将队列中取消排队的节点清除。
if (t != null && t.waitStatus != Node.CONDITION) {
//从头节点遍历条件队列,把取消排队的节点清除。
unlinkCancelledWaiters();
t = lastWaiter;
}
//把此线程实例化一个node,node.waitStatus=Node.CONDITION;node.thread=currentThread;
Node node = new Node(Node.CONDITION);
//把此node从队尾加入到条件队列中。
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
将此线程实例化成节点加入到条件队列中的过程:
1.判断此线程是否已经获取锁,没有获取锁抛出异常。
2.遍历条件队列,把已经取消排队的节点从队列中移除。
3.把此线程实例化成节点从队尾加入到条件队列中。
fullyRelease(Node node)
因为在await时首先是获取独占锁,所以在把节点加入条件队列之后需要把锁释放,才可以让其他线程获取锁进行signal。
final int fullyRelease(Node node) {
try {
int savedState = getState();
//因为ReentrantLock是可重入锁,所以通过savedState作为参数完全释放锁,将state置为0。重入n次,savedState即为n。
if (release(savedState))
return savedState;
throw new IllegalMonitorStateException();
} catch (Throwable t) {
//释放锁失败,会将节点设置为取消状态,并抛出IllegalMonitorStateException()异常
node.waitStatus = Node.CANCELLED;
throw t;
}
}
isOnSyncQueue(node)
正常情况下完全释放锁之后节点没有被signal或者中断,所以不在阻塞队列中,将此线程在这里挂起。
while (!isOnSyncQueue(node)) {
//不在阻塞队列中,线程在此挂起。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
isOnSyncQueue判断是否在阻塞队列中
final boolean isOnSyncQueue(Node node) {
//节点的waitStatus== Node.CONDITION说明节点还没有移动到阻塞队列中,因为后面的signal方法将节点移动到阻塞队列中会把waitStatus置为0。
//如果在阻塞队列中,节点的前驱一定不为null。
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
//node有后继,说明一定在阻塞队列中。因为next在阻塞队列中才会使用到。
if (node.next != null) // If has successor, it must be on queue
return true;
//那么这里判断是否在阻塞队列中还剩一种情况,node.prev() != null ,但是这个情况并不能判断是否在阻塞队列中。
//因为在入阻塞队列时,首先是node.prev=tail,然后通过CAS将自己设置为新的tail,但是这次CAS可能失败,所以通过findNodeFromTail(node)来判断。
//此方法从阻塞队尾开始找,如果找到此节点,说明在阻塞队列中,返回true。
return findNodeFromTail(node);
}
//从阻塞队列队尾开始遍历寻找。
private boolean findNodeFromTail(Node node) {
// We check for node first, since it's likely to be at or near tail.
// tail is known to be non-null, so we could re-order to "save"
// one null check, but we leave it this way to help the VM.
for (Node p = tail;;) {
if (p == node)
return true;
if (p == null)
return false;
p = p.prev;
}
}
从上面的循环中可知,不在阻塞队列时,isOnSyncQueue返回false,进入循环体,LockSupport.park(this);线程在此挂起,等待唤醒。接下来进行signal,但是signal只是把节点从条件队列中移动到了阻塞队列中,真正的唤醒其实还是从lock.unlock()这一步进行唤醒。
signal()
把条件队列中的节点转移到阻塞队列。
public final void signal() {
//和await一样,首先判断是否此线程已经获取锁,没有获取锁则抛出异常。
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
//从条件队列队头开始遍历,找出第一个符合转移条件的节点,因为有些节点会取消排队,但可能还在条件队列中。
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
//从条件队列头节点开始转移,转移不成功就转移下一个,直到转移成功或者队列为null。
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
//把节点从条件队列转移到阻塞队列中。
final boolean transferForSignal(Node node) {
//CAS失败说明当前节点的waitStatus不是Node.CONDITION,即说明此节点取消,不需要进行转移,返回false。 否则将 waitStatus 置为 0。
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
//将此节点加入阻塞队列的队尾。返回的p为此节点在阻塞队列的前驱。
Node p = enq(node);
int ws = p.waitStatus;
//如果前驱节点>0,说明其前驱节点取消了排队。
//果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
//进入此处说明前驱节点取消了排队或者CAS失败,直接从此处唤醒此线程。
LockSupport.unpark(node.thread);
return true;
}
正常情况下不进入 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))语句块中,所以正常情况下就是把节点从条件队列转移到阻塞队列中。如果其阻塞队列中的前驱节点取消等待或者CAS失败,那么直接唤醒此节点对应的线程。
唤醒后检查中断状态
在进行signal的操作时正常情况是把节点加入到阻塞队列中,但并没有唤醒对应挂起的节点,而是等待lock.unlock()释放锁并唤醒阻塞队列中的节点。
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//唤醒后从这里继续运行,执行checkInterruptWhileWaiting(node)。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
interruptMode可以取值为 REINTERRUPT(1),THROW_IE(-1),0。
- REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
- THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
- 0 :说明在 await 期间,没有发生中断
有三种情况会让LockSupport.park(this);返回,继续向下执行:
- 正常情况:signal—>把条件队列中的节点转移到阻塞队列—>lock.unlock()唤醒阻塞队列中的节点对应的线程。
- 线程中断:线程中断会使 LockSupport.park(this);立即返回,退出阻塞状态。
- 在进行signal时将节点转移到阻塞队列之后。其前驱节点取消排队或者CAS失败。
线程在被唤醒之后会进入if语句执行checkInterruptWhileWaiting(Node node),此方法用来判断在线程挂起期间是否发生中断,没有发生中断返回0,在signal之前发生中断返回THROW_IE(-1),在signal之后发生中断返回REINTERRUPT(1)。
方法Thread.interrupted():如果当前线程被中断,返回true,并把中断标志位设为false。即第二次访问时会返回false。
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
//发生中断,执行 (transferAfterCancelledWait(node)。否则直接返回0
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
final boolean transferAfterCancelledWait(Node node) {
//如果进入这个if语句,说明中断是signal之前发生的。
//因为在signal过程中,会把node.waitStauts由Node.CONDITION改成0,所以如果这里CAS成功说明waitStatus还没有被signal改为0,即中断是signal之前发生的。
if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
//把节点加到阻塞队列里,即使中断也会把节点放入阻塞队列中。
//这里需要注意的是,在正常情况下signal将节点放入阻塞队列时会把 first.nextWaiter = null;即把此节点从条件队列中断开。
//但是这里并没有从条件队列中断开,而是保留了其在条件队列中的结构。
enq(node);
return true;
}
//到这里即CAS失败,说明进行了signal将waitStatus置为0,但是signal可能还没有将节点转移到阻塞队列,所以这里自旋等待signal将节点转移到阻塞队列。
//这种情况是罕见的:signal执行后,没完成转移之前,发生了中断
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
中断也会将节点加入到阻塞队列,但是没有切断与条件队列的联系。因为如果从这里切断了与条件队列的联系,在正常的signal()转移节点时会找不到后面的节点,因为signal()转移时是根据node.nextWaiter找到下一个节点。
准备获取锁
退出while循环后说明节点一定进入了阻塞队列,准备换取锁。acquireQueued(node, savedState)进行自旋获取锁,没有获取成功从这里再次挂起。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
这里需要注意的是前面因为中断到达这一步时,此节点的线程可能并没有成功获取锁,所以需要再次从此处挂起。等待正常唤醒或者中断继续尝试获取锁。因为acquireQueued返回值是线程是否被中断,但是因为方法Thread.interrupted()返回后会将中断标志位置为false,即第二次会返回false。所以这里比较绕的有五种情况:
- 通过正常lock.unlock()唤醒,进入acquireQueued(node, savedState)挂起,没有通过中断唤醒,返回false,interruptMode=0,继续向下执行。
- 通过正常lock.unlock()唤醒,进入acquireQueued(node, savedState)挂起,通过中断唤醒,返回true,interruptMode != THROW_IE,所以将 interruptMode = REINTERRUPT;,也可以说明其是在signal之后中断的。
- 第一次通过中断唤醒,进入acquireQueued(node, savedState)直接获取锁成功,acquireQueued(node, savedState)返回false,根据第一次中断设置的interruptMode来进行对应的中断处理。
- 第一次通过中断唤醒,进入acquireQueued(node, savedState)后没有成功获取锁,挂起,通过lock.unlock()唤醒,acquireQueued(node, savedState)同样返回false,根据第一次中断设置的interruptMode来进行对应的中断处理
- 第一次通过中断唤醒,进入acquireQueued(node, savedState)后没有成功获取锁,挂起,再次对其进行中断,这时acquireQueued(node, savedState)返回true,然后判断interruptMode的值进行对应的中断处理。
获取锁之后继续执行:
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
前面有提到,因为在signal之前进行中断时,也会将节点加入到阻塞队列中,但是并没有设置 node.nextWaiter = null,所以在这里会调用unlinkCancelledWaiters();清除条件队列中已经取消等待的节点。
如果发生过中断,那么interruptMode!=0,所以执行reportInterruptAfterWait(interruptMode);来处理不同的中断请求。
interruptMode的取值:
- 0:什么都不做,没有被中断过;
- THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断,即在signal()之前发生中断;
- REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断。
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
这个方法执行完就彻底从await()返回,结束。