Condition的简单使用
使用Condition实现的有界队列
public class BoundedQueue<T> {
private Object[] items;
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notFull = lock.newCondition();
private Condition notEmpty = lock.newCondition();
public BoundedQueue(int capacity){
this.items = new Object[capacity];
}
public void add(T obj){
lock.lock();
try{
while (count == items.length) ////这里必须使用while而不是if
notFull.await();
items[addIndex] = obj;
if(++addIndex == items.length)
addIndex = 0;
notEmpty.signal();
count ++;
} catch (InterruptedException e) {
//
}finally {
lock.unlock();
}
}
public T get(){
T toReturn = null;
lock.lock();
try{
while (count == 0) //这里必须使用while而不是if
notEmpty.await();
toReturn = (T) items[removeIndex];
if(++removeIndex == items.length)
removeIndex = 0;
count --;
notFull.signal();
} catch (InterruptedException e) {
//
}finally {
lock.unlock();
}
return toReturn;
}
}
核心方法
void await() throws InterruptedException;//类似于Object对象的wait()方法;可中断;返回时已经获取了对象的锁;
void signal();
void signalAll();//唤醒所有等待在Condition上的线程,能够从等待方法返回的线程必须获得Condition相关联的锁。
await()方法源码分析
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();//当前节点加入到该等待队列
int savedState = fullyRelease(node);//释放锁,唤醒同步队列中的后继节点
int interruptMode = 0;
while (!isOnSyncQueue(node)) {//如果当前线程不在同步列中,就继续等待(signal)
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//当前方法如果被signal了,在同步队列中尝试获取同步状态
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
signal 方法
public final void signal() {
if (!isHeldExclusively())//前置条件
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);//唤醒等待队列中的第一个节点,将其移动到同步队列中并使用LockSupport唤醒线程(上面的 while (!isOnSyncQueue(node))循环)去获取同步状态(acquireQueued(node, savedState) )。
}
LockSupport工具
park();//当前线程进入阻塞状态
unpark(Thread t);//唤醒线程t