1.与Lock的关系
Condition在同步锁synchronized中用的比较多。
Condition本身也是一个接口,其功能和wait/notify类似。
public interface Condition {
void await() throws InterruptedException; //等待
void awaitUninterruptibly();
long awaitNanos(long var1) throws InterruptedException;
boolean await(long var1, TimeUnit var3) throws InterruptedException; //计时等待
boolean awaitUntil(Date var1) throws InterruptedException;
void signal(); //唤醒
void signalAll(); //唤醒全部
}
wait()/notify()必须和synchronized一起使用,Condition也必须和Lock一起使用。因此,在Lock的接口中,有一个与Condition相关的接口:(所有的Condition都是从Lock中构造出来的)
public interface Lock {
void lock();
void lockInterruptibly() throws InterruptedException;
boolean tryLock();
boolean tryLock(long var1, TimeUnit var3) throws InterruptedException;
void unlock();
// 所有的Condition都是从Lock中构造出来的
Condition newCondition();
}
2.使用场景
以以ArrayBlockingQueue为例:为一个用数组实现的阻塞队列,执行put(...)操作的时候,队列满了,生产者线程被阻塞;执行take()操作的时候,队列为空,消费者线程被阻塞。
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, Serializable {
final Object[] items;
int takeIndex;
int putIndex;
int count;
// 一把锁+两个条件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
transient ArrayBlockingQueue<E>.Itrs itrs;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0) {
throw new IllegalArgumentException();
} else {
this.items = new Object[capacity];
// 构造器中创建一把锁加两个条件
this.lock = new ReentrantLock(fair);
// 构造器中创建一把锁加两个条件
this.notEmpty = this.lock.newCondition();
// 构造器中创建一把锁加两个条件
this.notFull = this.lock.newCondition();
}
}
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while(this.count == this.items.length) {
// 非满条件阻塞,队列容量已满
this.notFull.await();
}
this.enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E e) {
Object[] items = this.items;
items[this.putIndex] = e;
if (++this.putIndex == items.length) {
this.putIndex = 0;
}
++this.count;
// put数据结束,通知消费者非空条件
this.notEmpty.signal();
}
public E take() throws InterruptedException {
ReentrantLock lock = this.lock;
lock.lockInterruptibly();
Object var2;
try {
while(this.count == 0) {
// 阻塞于非空条件,队列元素个数为0,无法消费
this.notEmpty.await();
}
var2 = this.dequeue();
} finally {
lock.unlock();
}
return var2;
}
private E dequeue() {
Object[] items = this.items;
E e = items[this.takeIndex];
items[this.takeIndex] = null;
if (++this.takeIndex == items.length) {
this.takeIndex = 0;
}
--this.count;
if (this.itrs != null) {
this.itrs.elementDequeued();
}
// 消费成功,通知非满条件,队列中有空间,可以生产元素了。
this.notFull.signal();
return e;
}
3.实现原理
Condition的使用很方便,避免了wait/notify的唤醒操作不仅唤醒生产者也会唤醒消费者问题。
由于Condition必须和Lock一起使用,所以Condition的实现也是Lock的一部分。首先查看互斥锁和读写锁中Condition的构造方法:
public class ReentrantLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return this.sync.newCondition();
}
}
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
public static class ReadLock implements Lock, Serializable {
// 读锁不支持Condition
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
public static class WriteLock implements Lock, java.io.Serializable {
public Condition newCondition() {
return this.sync.newCondition();
}
}
}
首先,读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。虽然它们各自调用的是自己的内部类Sync,但内部类Sync都继承自AQS。因此,上面的代码sync.newCondition最终都调用了AQS中的newCondition。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
public class ConditionObject implements Condition, java.io.Serializable {
// Condition的所有实现,都在ConditionObject类中
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final ConditionObject newCondition() {
return new ConditionObject();
}
}
每一个Condition对象上面,都阻塞了多个线程。因此,在ConditionObject内部也有一个双向链表组成的队列
public class ConditionObject implements Condition, java.io.Serializable {
private transient Node firstWaiter;
private transient Node lastWaiter;
}
static final class Node {
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
}
4.await()实现分析
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#await()实现分析
public final void await() throws InterruptedException {
// 刚要执行await()操作,收到中断信号,抛异常
if (Thread.interrupted()) throw new InterruptedException();
// 加入Condition的等待队列
Node node = addConditionWaiter();
// 阻塞在Condition之前必须先释放锁,否则会死锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 阻塞当前线程 LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break;
}
// 重新获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
.....
关于await,有几个关键点要说明:
- 线程调用 await()的时候,肯定已经先拿到了锁。所以,在 addConditionWaiter()内部,对这个双向链表的操作不需要执行CAS操作,线程天生是安全的,代码如下:
private Node addConditionWaiter() {
// ...
Node t = lastWaiter;
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
- 在线程执行wait操作之前,必须先释放锁。也就是fullyRelease(node),否则会发生死锁。这个和wait/notify与synchronized的配合机制一样。
- 线程从await中被唤醒后,必须用acquireQueued(node, savedState)方法重新拿锁。
- checkInterruptWhileWaiting(node)代码在park(this)代码之后,是为了检测在park期间是否收到过中断信号。当线程从park中醒来时,有两种可能:一种是其他线程调用了unpark,另一种是收到中断信号。这里的await()方法是可以响应中断的,所以当发现自己是被中断唤醒的,而不是被unpark唤醒的时,会直接退出while循环,await()方法也会返回。
- isOnSyncQueue(node)用于判断该Node是否在AQS的同步队列里面。初始的时候,Node只 在Condition的队列里,而不在AQS的队列里。但执行notity操作的时候,会放进AQS的同步队列。
5.signal实现
java.util.concurrent.locks.AbstractQueuedSynchronizer.ConditionObject#signal()
public final void signal() {
//// 只有持有锁的线程,才有资格调用signal()方法
if (!AbstractQueuedSynchronizer.this.isHeldExclusively()) {
throw new IllegalMonitorStateException();
} else {
AbstractQueuedSynchronizer.Node first = this.firstWaiter;
if (first != null) {
// 发起通知
this.doSignal(first);
}
}
}
// 唤醒队列中的第1个线程
private void doSignal(AbstractQueuedSynchronizer.Node first) {
do {
if ((this.firstWaiter = first.nextWaiter) == null) {
this.lastWaiter = null;
}
first.nextWaiter = null;
} while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 先把Node放入互斥锁的同步队列中,再调用unpark方法
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
同 await()一样,在调用 signal()的时候,必须先拿到锁(否则就会抛出上面的异常),是因为前面执行await()的时候,把锁释放了。
然后,从队列中取出firstWaiter,唤醒它。在通过调用unpark唤醒它之前,先用enq(node)方法把这个Node从condition中移除并放入AQS的锁对应的阻塞队列中。也正因为如此,才有了await()方法里面的判断条件:
- while( ! isOnSyncQueue(node))
- 这个判断条件满足,说明await线程不是被中断,而是被unpark唤醒的。
- notifyAll()与此类似。
简述:
阻塞队列使用condition实现,condition必须和Lock一起使用,一般使用ReentrantLock来创建condition阻塞条件,并用ReentrantLock的锁来加锁控制并发。解决了wait/notify同时唤醒生产者和消费者的问题。读写锁中的 ReadLock 是不支持 Condition 的,读写锁的写锁和互斥锁都支持Condition。他们最终都调用了AQS中的newCondition,ConditionObject是condition的具体实现,内部也有一个双向链表组成的队列来存储阻塞的队列。一开始调用await会放入ConditionObject的队列,然后调用最终是调用park原语阻塞,最后调用signal的时候把线程在ConditionObject中移除,放入阻塞队列本身,并调用unpark。