任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。
通过对比Object的监视器方法和Condition接口,可以更详细地了解Condition的特性在功能特性上还是有很多的不同:
1.Condition能够支持响应中断,而通过使用Object方式不支持;
2.Condition能够支持多个等待队列(new 多个Condition对象),而Object方式只能支持一个
3.Condition能够支持超时时间的设置,而Object不支持
关于实现:以可重入锁为例
Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的newCondition()方法)创建出来的,换句话说,Condition是依赖Lock对象的。
可重入锁为例
//如下的代码
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
lock.lock();
try {
condition.await();
} finally {
lock.unlock();
}
} public void conditionSignal() throws InterruptedException {
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
//在ReentrantLock类中
public Condition newCondition() {
return sync.newCondition();
}
//在ReentrantLock类的sync中重写了
final ConditionObject newCondition() {
return new ConditionObject();
}
然后我们接着翻,可以看到
该类真正实现是在AQS中
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
// public methods
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
以上只留了几个重要的实现方法。
可以看出来ConditionObject通过持有等待队列的头尾指针来管理等待队列。
当具有多个不同等待队列时有以下应用场景。
https://www.jianshu.com/p/3e9458fb56ff
Condition的(部分)方法以及描述:
下面来看下Condition的实现,主要包括:等待队列、等待和通知
等待队列
等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node。
一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点
Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的
Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器
提供的方法,相当于每个Condition都拥有所属同步器的引用。
2.等待await()
调用condition.await()方法会发生什么是呢?当前线程会被构造成一个节点,然后从尾部加入到等待队里中并释放当前线程持有的锁,当前先被阻塞,然后唤醒同步队列中的后继节点,同时当前线程的状态变为等待状态(释放同步状态),然后就在等待队列中一直等待着...
上面的过程可以这样看待,就是将同步器中同步队列的首节点(获取了锁的节点)的线程移动到了等待队列的尾部。
代码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程构造成节点然后添加到condition等待队列的尾部
Node node = addConditionWaiter();
//释放锁(释放同步状态)
int savedState = fullyRelease(node);
int interruptMode = 0;
//如果当前线程在同步队列中,isOnSyncQueue(node)返回true
while (!isOnSyncQueue(node)) {
//阻塞当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//调用同步器的acquireQueued()方法加入到获取同步状态的竞争中
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode)
}
调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出
InterruptedException。
同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方
法把当前线程构造成一个新的节点并将其加入等待队列中。
3.通知
调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了
isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。
整理不易,欢迎交流。
待续。。。。。。。。