消费者和生产者的例子
public class ConditionDemo {
private static final Queue<Apple> queue = new LinkedList();
private static final Lock lock = new ReentrantLock(false);
private static final Condition customer = lock.newCondition();
private static final Condition producer = lock.newCondition();
public static void main(String[] args) {
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
produce();
}
}
private void produce() {
lock.lock();
try {
while (queue.size() == 5) {
System.out.println("队列已满,无法添加");
producer.await();
}
queue.offer(new Apple(1));
System.out.println(Thread.currentThread().getName() + "生产了一个苹果,目前有" + queue.size() + "个苹果");
customer.signal();
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, i + "号生产者").start();
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
custome();
}
}
private void custome() {
lock.lock();
try {
while (queue.size() == 0) {
System.out.println("队列为空,无法消费");
customer.await();
}
queue.poll();
System.out.println(Thread.currentThread().getName() + "消费了一个苹果,目前有" + queue.size() + "个苹果");
producer.signal();
Thread.sleep(200);
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}, i + "号消费者者").start();
}
}
}
逐行看await方法
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
能够执行到这里证明当前线程肯定是获取到了锁,也就是说已经lock.lock()住了
条件队列是condition中的队列 每次await()之后都会把线程放到条件队列里面,每次signal()之后就会把条件队列里面的firstNode加入到阻塞队列里面
阻塞队列就是lock里面的队列
这个方法挺绕的 要先将node放到条件队列里面,然后释放锁,最后把线程park住,等着signal调用
public final void await() throws InterruptedException {
//判断是不是中断了 这里不是很重要
if (Thread.interrupted())
throw new InterruptedException();
//将当前线程 包装成一个node放到条件队列里面
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
考虑几种情况
lock.newCondition() 实际是创建了一个ConditionObject对象 这个对象有两个属性一个是firstWaiter 一个是 lastWaiter 俩都是node
这俩node都是sqs里面的node
1.条件队列没有其他node
private Node addConditionWaiter() {
//这时first和last 都是null
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//跳过这个判断
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个node 并且设置waitStatus = -2 1是cancel取消 初始化不填是0 -1是signal 只有是-1时才能唤醒aqs里面的内容
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//t确实是null
if (t == null)
//firstWaiter赋值
firstWaiter = node;
else
t.nextWaiter = node;
//lastWaiter赋值
lastWaiter = node;
return node;
}
此时条件队列应该是 node->null
2.条件队列里面已经有两个了 node1->node2->null first是node1 last是node2
private Node addConditionWaiter() {
//t现在是node2
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//假如 node2 的状态不是CONDITION 里面这个unlink方式是清楚调这些不是状态不是condition的node 倒着清除?
//假如 状态是CONDITION那么直接跳过这个方法
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个node 并且设置waitStatus = -2 1是cancel取消 初始化不填是0 -1是signal 只有是-1时才能唤醒aqs里面的内容
Node node = new Node(Thread.currentThread(), Node.CONDITION);
//t现在是last了不是null
if (t == null)
//firstWaiter赋值
firstWaiter = node;
else
//把node 赋值给t 也就是last的nextWaiter
t.nextWaiter = node;
//最后把last指向node
lastWaiter = node;
return node;
}
这之后node就被加入到了条件队列里面了
但是node 还是拿着锁 需要释放锁
int savedState = fullyRelease(node);
//不考虑可重入的问题
final int fullyRelease(Node node) {
boolean failed = true;
try {
//这里返回的值为1
int savedState = getState();
//release()方法就是释放锁的那个方法 主要做了两件事
//第一是吧 (tryRelease(arg)) -> ExclusiveOwnerThread这个属性置为空 然后把state设为0 不考虑可重入
//释放这个锁之后 这时head还是它本身 if (h != null && h.waitStatus != 0) unparkSuccessor(h);
//head不为null 然后node的状态也不是0 0是初始化的状态 没有什么以外的话 这时应该是-1 因为-1是后继节点加入的时候给设置的
//unparkSuccessor(h)就是第二件事 唤醒后继
//这里比较难以理解的是 那这个head怎么处理 后边 后继节点被唤醒后会从park哪里继续运行
//然后 acquireQueued(...)这个方法这里重新设置为新的节点(也就是这个被唤醒的后继)
if (release(savedState)) {
//释放成功 进入这个判断
failed = false;
//返回1
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
//最后不会走这里
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
程序接着会 回到这里 从翻译可以看出是判断它是不是进入了同步队列,这个就是之前说的阻塞队列
如果正常走 这时候肯定是没有在阻塞队列里面
while (!isOnSyncQueue(node)) {
//如果返回false 那么这里就会park住这个node 也就是在这里阻塞了
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
这个方法可以看出 如果是正常node 那么刚刚进入条件队列时这个状态就是condition 并且前驱也是空的
这里node的前驱和后继是在阻塞队列里面的前驱和后继 条件队列里面只有一个nextWaiter 还是node对象
正常情况下返回false
如果不是的话 比如进入到第二个判断 后边源码写的很清楚 如果它存在后继那么肯定是在阻塞队列里面了
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
这里猜想一下 如果想跳出这个循环 那么肯定是需要signal()方法唤醒 ,然后改变某些属性,才能继续;
看不懂了后边
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
signal()方法 这个方法是从条件队列里面唤醒上边被park住的node
public final void signal() {
//方法是判断是不是当前线程调用的
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
考虑两种情况吧 一种是条件队列里面就一个node 既是first也是last
private void doSignal(Node first) {
do {
//这种情况下 是first的nextwaiter是null firstwaiter这之后也就成了null first不是null
if ( (firstWaiter = first.nextWaiter) == null)
//把last置位 null
lastWaiter = null;
//最后first的next也为null
first.nextWaiter = null;
//first之后就成了一个孤岛了 如果想要跳出去 那就必须下面两个判断有一个是false
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
程序走到这里 正常情况下是希望返回true的
final boolean transferForSignal(Node node) {
/*
* 如果不能改变状态 那么这个node 就是被取消了
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//上边变更成功后走到这里 这一段和之前进入条件队列类似 就是一直添加到队尾然后把前一个不是取消状态的node的状态改为-1
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//改好以后唤醒这个node,这个node 封装的线程继续从 之前 while (!isOnSyncQueue(node)) {park()}的这里运行
//它运行它的 这个signal方法应该是其他线程调用的 返回true 就行了
LockSupport.unpark(node.thread);
return true;
}
这里想了半天
从park哪里开始以后 可以看到 其实它是在生产者消费者代码那个await方法哪里继续的
那它是怎么抢到锁的呢
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
第一个if里面的第一个判断就是抢锁的过程 而且这个savedState是之前存的状态
这里就明白多了如果是可重入这里可能是大于1的 然后 这里抢锁成功再设置进去
后边那几个就不看了 看不懂
这个acquireQueue抢到锁 然后增加一个苹果通知消费者的条件队列 最后再释放