Condition
Condition是J.U.C包中的一个接口,提供了三个主要方法await、signal和signalAll。和Object中的wait、notify、notifyAll三个监视器方法语义一致。
监视器方法必须放到synchronized修饰的同步体内,Condition接口也是一样,要放到Lock的监视范围也就是lock和unlock之间的代码块内调用,否则会抛出IllegalMonitorStateException异常。
一个小栗子:
public class ConditionTest {
public static class ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private String response;
public boolean isDone() {// 是否处理完成
return response != null;
}
public String getResponse() throws InterruptedException{ // 获取响应
if (!isDone()) {
lock.lock();
try {
while (!isDone()) {
condition.await();// 线程阻塞等待
if (isDone()) {
break;
}
}
} finally {
lock.unlock();
}
}
return response;
}
public void done(String response) {// 处理完成
lock.lock();
try {
this.response = response;
condition.signal();// 唤醒阻塞等待的线程
} finally {
lock.unlock();
}
}
}
public static void main(String[] args) {
final ResponseFuture responseFuture = new ResponseFuture();
new Thread(new Runnable() {// 请求线程
public void run() {
System.out.println("发送一个同步请求");
try {
// 获取反馈内容,请求没有反馈就会一直等待
System.out.println(responseFuture.getResponse());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {// 处理线程
public void run() {
try {
Thread.sleep(10000);// 模拟处理一会
}catch (InterruptedException e) {
e.printStackTrace();
}
// 处理完成
responseFuture.done("请求处理完成");
}
}).start();
}
}
ConditionObjec
Condition接口的实现ConditionObject是AbstractQueuedSynchronizer的内部
在AQS()中,:
public class ConditionObject implements Condition,
java.io.Serializable {
private static final int REINTERRUPT = 1;
private static final int THROW_IE = -1;
/** 条件队列首节点 */
private transient Node firstWaiter;
/** 条件队列尾节点 */
private transient Node lastWaiter;
public ConditionObject() {}
... ...
}
原理
维护一个链表Node队列,用来进行阻塞线程的排队和调度,称为条件队列。
线程调用了condition.await()后将先释放该线程持有的锁,构造成Node加入条件队列,然后再挂起该线程。
其他线程调用了condition.signal()后,将条件队列中第一个等待的Node节点转移到AQS同步队列,在AQS同步队列中参与锁获取的调度,如果获取到锁则该线程被唤醒。
await流程
// s1
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();// 如果中断 抛出异常
Node node = addConditionWaiter();// 入列 s2
int savedState = fullyRelease(node);// 释放当前锁 s3
int interruptMode = 0;
while (!isOnSyncQueue(node)) {// 不在与同步队列
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
s1:线程未中断开始入列,转入s2,否则直接抛出异常。
入列成功开始释放当前线程持有的锁进入s3。
isOnSyncQueue(node)判断节点是否在AQS同步队列中,如果不在,说明该线程还没有资格参与锁的竞争,将当前线程挂起。
直到node加入了AQS同步队列或者node对应的线程被中断,才退出while{}循环。
checkInterruptWhileWaiting方法会检查中断发生的时机:
signal之前发生的中断返回THROW_IE,表示抛出InterruptedException
signal之后发生的中断返回REINTERRUPT,表示记录中断状态
否则返回0,表示无中断
即便是发生中断transferAfterCancelledWait方法依然会将node转移到AQS同步队列。
调用acquireQueued()方法在AQS同步队列中竞争锁,拿到锁后开始处理中断,reportInterruptAfterWait方法中根据interruptMode选择是抛出异常还是记录状态。
// s2
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾节点状态不为CONDITION,清出队列
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 构造节点并入列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
s2:如果尾节点t不为空或状态不为CONDITION,调用
unlinkCancelledWaiters()方法,这是一个循环方法目的是从头节点开始摘掉所以状态为不为CONDITION的节点。
t==null说明当前的队列还是空队列,将node赋给头节点firstWaiter,否则,将node链入尾部。将尾节点lastWaiter指向node,返回s1。
// s3
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();// 获取当前同步状态
if (release(savedState)) {// 释放锁
failed = false;
return savedState;// 释放成功返回同步状态
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)// 释放失败将节点标识为取消
node.waitStatus = Node.CANCELLED;
}
}
s3:用release()方法释放当前锁,这是独占锁的释放方法,所以Condition对象只支持在独占锁中使用。释放完毕返回s1。
signal流程
public final void signal() {
if (!isHeldExclusively())//非持有线程 抛出异常
throw new IllegalMonitorStateException();
Node first = firstWaiter;//头节点
if (first != null)//队列非空
doSignal(first);
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
s1: 如果first.nextWaiter==null说明队列为空,将尾节点lastWaiter也置空。
first.nextWaiter = null将头节点从队列摘掉。
转入s2进行节点转移。
转移成功则退出循环,转移不成功并且队列中还有等待的节点继续转移下个节点。
// s2
final boolean transferForSignal(Node node) {
//节点状态从Node.CONDITION 置换为0,
//因为CONDITION是条件节点的状态
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//入同步队列
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
s2:CAS将节点状态由CONDITION设置为0。
enq(node)为AQS中同步队列的入列方法,将节点加入同步队列。
ws > 0说明节点被取消了(有可能中断)或者将节点状态置为SIGNAL失败,则直接唤醒线程。
被唤醒的线程会在"await流程s1处"参与锁的竞争或者处理中断。
signalAll和signal逻辑一样,只是在循环中执行transferForSignal(first),将条件队列中的节点依次全部转移到同步队列。
小结:
1:Condition只能使用在独占锁中使用,如ReentrantLock、ReentrantReadWriteLock.writerLock。在共享锁中则无法使用 ,如ReentrantReadWriteLock.readerLock。
2:Condition中的方法,必须要放到Lock的监视范围内,也就是lock和unlock之间的代码块内调用。
码字不易,转载请保留原文连接https://www.jianshu.com/p/e72b43ebd788