简介和对比Object监视器方法对比
任何一组对象都用一组监视器方法,主要就是wait和notify方法,这些方法与synchronized关键字一起使用,实现等待/通知模式。juc也实现了类似Object的监视器方法,就是Condition,与lock可以配合使用实现等待/通知模式。功能还是有些差异,下面是Object监视器方法与Condition监视器方法功能对比。
对比项 | Object | Condition |
---|---|---|
实现条件 | 获取Synchronized锁 | 获取Lock锁 |
等待队列个数 | 只有一个,即一个Synchronized锁下,只有一个等待队列 | 多个,Lock可以new多个Condition |
线程释放锁进入等待状态,是否支持中断 | 不支持 | 支持 |
线程释放锁进入等待状态,是否支持等待到具体某个时间 | 不支持 | 支持 |
以上是Object监视器方法与Condition监视器方法的主要区别,可以看出Condition监视器方法功能更多一点。
简单实现
分别用Object监视器方法和Condition监视器方法实现一个简单的阻塞队列
Object监视器方法
private Object a = new Object();
private Object b = new Object();
private int count = 5;
private List<String> list = new ArrayList<>(5);
public void add(String str) {
synchronized (a) {
if (list.size() == count) {
try {
a.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
synchronized (this) {
list.add(str);
}
synchronized (b) {
try {
b.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
public String get() {
synchronized (b) {
if (list.size() == 0) {
try {
b.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
String str = "";
synchronized (this) {
str = list.get(0);
list.remove(0);
}
synchronized (a) {
try {
a.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
return str;
}
Condition监视器方法
private ReentrantLock lock = new ReentrantLock();
private Condition a = lock.newCondition();
private Condition b = lock.newCondition();
private List<String> list = new ArrayList<>(5);
private int count = 5;
public void add(String str) {
try {
lock.lock();
if (list.size() == count) {
a.await();
}
list.add(str);
b.signalAll();
} catch (Exception e) {
} finally {
lock.unlock();
}
}
public String get() {
try {
lock.lock();
if (list.size() == 0) {
b.await();
}
String str = list.get(0);
list.remove(0);
a.signalAll();
return str;
} catch (Exception e) {
} finally {
lock.unlock();
}
return null;
}
从以上代码中可以看出Object监视器方法只有一个等待队列,因此需要定义两组Object监视器方法来实现生产者-消费者pv操作,而Condition支持多个等待队列,因此不用定义多组,直接用一个锁就行。
源码分析
本文主要分析aqs中的实现,即AbstractQueuedSynchronizer.ConditionObject
具体的一些方法即描述如下:
方法 | 描述 |
---|---|
signal | 唤醒一个等待在Condition上的线程 |
signalAll | 唤醒所有等待在Condition上的线程 |
awaitUninterruptibly | 线程进入等待队列等待被唤醒,不响应中断 |
await() throws InterruptedException | 线程进入等待队列等待被唤醒,响应中断,抛出中断异常 |
awaitNanos(long nanosTimeout) throws InterruptedException | 线程进入等待队列等待被唤醒,最多等待nanosTimeout纳秒,响应中断,抛出中断异常 |
awaitUntil(Date deadline) throws InterruptedException | 线程进入等待队列等待被唤醒,最多等待到该具体时间,响应中断,抛出中断异常 |
简单介绍
深入解析方法实现前先简单介绍实现原理AbstractQueuedSynchronizer共有两个等待队列,Sync Queue和Condition Queue,Lock的加锁和wait,signal之间就是两个等待队列转换的过程,同Object监视器方法一样,Condition运行wait和signal方法时同样要先获取锁才能执行。
wait方法:把当前获的锁线程封装成Node节点放到Condition Queue中,然后释放锁资源。
signal方法:把Condition Queue首节点拿到Sync Queue中,完成之后释放锁资源,然后唤醒Sync Queue中的Node节点。
signal 和 signalAll
public final void signal() {
//健康检查,由Lock自己实现,比如ReentrantLock,就是检查当前线程是否是拥有锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//唤醒等待队列中的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
doSignal 和doSignalAll
private void doSignal(Node first) {
do {
// 这里的操作主要是把first节点从Condition Queue中摘出来,如果队列空了,设置lastWaiter为null
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) && // 唤醒该节点,即从Condition Queue转移到Sync Queue中
(first = firstWaiter) != null);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
// 循环唤醒所有节点
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
transferForSignal
final boolean transferForSignal(Node node) {
/*
* cas改变CONDITION到初始状态,如果没成功,唤醒失败
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// enq方法是将node节点插入到Sync Queue中
Node p = enq(node);
int ws = p.waitStatus;
// 如果节点被取消即(ws > 0 )或者设置需要被唤醒状态SIGNAL失败,就唤醒线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
以上是Condition的signal相关方法,唤醒操作,总结一点就是把Condition Queue中节点拿到Sync Queue中,设置成被需要被唤醒状态,等待被唤醒。
中断标志
简单介绍下中断标志,因为Condition的wait支持中断,所以中断很重要
/**
* 这两种的区别是是否是在signal后中断的,THROW_IE是唤醒前中断,需要抛出中断异常,REINTERRUPT是被唤醒后中断
**/
/** Mode meaning to reinterrupt on exit from wait 意思是wait退出前自我中断,不需要抛出中断异常 */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait 退出前需要抛出中断异常*/
private static final int THROW_IE = -1;
awaitUninterruptibly
该方法不支持中断异常,实现比较简单,可以参照wait方法理解
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
wait()throws InterruptedException
wait方法支持中断
public final void await() throws InterruptedException {
// 判断下中断状态
if (Thread.interrupted())
throw new InterruptedException();
//封装node节点到Condition Queue中
Node node = addConditionWaiter();
// 释放锁资源,唤醒其他等待锁资源线程
int savedState = fullyRelease(node);
int interruptMode = 0;
// 等待是否在Sync Queue中,即是否被其他线程signal唤醒,如果没有一直进行阻塞,指导被唤醒进入Sync Queue队列中
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
// 判断下中断类型,如果有中断且已经被signal唤醒,即节点在Sync Queue中,是THROW_IE,
//否则是REINTERRUPT,或者是0即没有中断,代码不深入理解,可以自己去看
// 这里被中断唤醒,而不是signal之后被唤醒,会在Condition Queue中和Sync Queue中都存在
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// acquireQueued是循环的独占方式获取锁资源,即已经被唤醒放到Sync Queue中,
//需要独占的方式再获取锁资源,返回值是是否被中断
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 这里判断是否是被中断唤醒的,如果是中断唤醒的,
//需要把Condition Queue中已被取消的节点清理掉,防止垃圾存在
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
// 最后就是如果有中断,处理下中断,如果是THROW_IE中断模型,抛出中断异常
reportInterruptAfterWait(interruptMode);
checkInterruptWhileWaiting 和 transferAfterCancelledWait
private int checkInterruptWhileWaiting(Node node) {
// 这里就是判断下线程是否被中断,如果中断,判断中断类型
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
// 这个函数的主要是判断节点被取消是否在signal唤醒之前,
final boolean transferAfterCancelledWait(Node node) {
// 如果节点的状态是CONDITION,说明还没被signal唤醒,
//依然在Condition Queue中,然后返回插入到Sync Queue中返回true,
//这里说明了在signal唤醒之前被中断,两个等待队列都会存在节点
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
// 否则说明已经被signal唤醒,返回false
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
awaitNanos(long nanosTimeout)throws InterruptedException
该方法和wait方法基本差不多,就多了个超时等待
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
final long deadline = System.nanoTime() + nanosTimeout;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
// 这里如果超时直接取消,然后退出等待
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return deadline - System.nanoTime(); //返回值表明是否是超时还是被唤醒
剩下的两个与时间相关的方法 public final boolean awaitUntil(Date deadline)和public final boolean await(long time, TimeUnit unit)方法和 awaitNanos(long nanosTimeout)方法基本一样,不做深入理解。
总结
以上是全部的Condition理解,一句话,Condition就是锁状态的一种中间状态转换队列,实现了等待/通知模式