前言
这一篇文章,想和大家分享一下Condition的源码学习过程,Condition的应用,其实是很简单的,相信大家在项目中或者demo中或多或少都用过。最不济,在应付面试的时候,相信也有不少小伙伴背过不少的面试题。话不多说,水平有限,文章中有错误的地方也请不吝指正,共同进步。
应用场景
ReentrantLock的Condition的设计场景,我在上一篇博客也分享过,建议先移步上一篇看一下ReentrantLock, Condition只是其中的一个应用。
example:
先上一段代码,看一下如何使用
@Slf4j
public class ReentrantLockTest {
private static ReentrantLock lock = new ReentrantLock();
private static boolean hasSmoke = false;
private static boolean hasSnacks = false;
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
executor.setCorePoolSize(20);
Condition smoke = lock.newCondition();
Condition snacks = lock.newCondition();
executor.submit(()->{
lock.lock();
try {
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-one,没有烟,干活没动力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-one,有烟了,嘬一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-two,没有烟,干活没动力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-two,有烟了,嘬一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-one,没有零食吃,干活没力气");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-one, 有零食了,吃一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-two,没有零食吃,干活没力气");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-two, 有零食了,吃一口.");
} finally {
lock.unlock();
}
});
executor.submit(()->{
lock.lock();
try {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
hasSmoke = true;
log.info("烟来喽");
smoke.signalAll();
hasSnacks = true;
log.info("零食来喽");
snacks.signalAll();
}finally {
lock.unlock();
}
});
executor.shutdown();
}
}
10:48:05.563 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,没有烟,干活没动力
10:48:05.565 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,没有烟,干活没动力
10:48:05.565 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,没有零食吃,干活没力气
10:48:05.565 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,没有零食吃,干活没力气
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 烟来喽
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食来喽
10:48:07.569 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-one,有烟了,嘬一口.
10:48:07.569 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-two,有烟了,嘬一口.
10:48:07.569 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-one, 有零食了,吃一口.
10:48:07.569 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-two, 有零食了,吃一口.
这个场景中,男生需要抽烟,女生需要吃零食,不然就会拒绝干活。当烟来了或者零食来了的时候,大家就停止摸鱼,开始干活。如果只给了零食,那么男生就会无限等待,同理,只给了烟, 那么女生就会一直摸鱼下去,具体的例子大家可以自己跑一跑。
例子中我们可以看到,有4个员工都在摸鱼,但是我们可以根据需求把这4个员工分为两部分,一部分是需要抽烟的,一部分是需要吃零食的,如果我们有零食,我们就可以唤醒需要吃零食的人,烟同理。这样就做到了部分唤醒。
分析
我们大致分析一下原理,我们首先使用lock.newCondition()方法new了两个Condition,每个Condition对象都维护了一个队列。然后boy-one和boy-two调用smoke.await(),会进到smoke对象维护的队列里,同理,girl-one和girl-two会进入到snacks对象的队列里。当我们调用smoke.signalAll()的时候,smoke对象就会把boy-one和boy-two扔到aqs的队列里,当我们调用snacks.signalAll()的时候,snacks对象就会把girl-one和girl-two扔到aqs的队列里。还记不记得之前aqs博文里的时候,node节点有个状态。int CONDITION = -2; 没错,就是给Condition用的。
剩下的线程唤醒操作就交给了aqs。
上面的分析大致看懂了以后,我们带着这个分析去看一下源码:
源码
newCondition()
// 通过调用链,大家已经很清楚了吧,就是返回了一个ConditionObject对象,
// 这个对象里维护了一个node节点的链表,这个node节点不用猜,就是aqs封装的node节点。
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
}
await()
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 这个方法贴在下面了,官方注释解释的简洁明了,就是添加节点到队列尾部
Node node = addConditionWaiter();
// 释放资源当前线程拿到的锁资源,下面详细解释
int savedState = fullyRelease(node);
int interruptMode = 0;
// 调用isOnSyncQueue判断是否在aqs队列里,这个下面会详细解释。
while (!isOnSyncQueue(node)) {
// 不在aqs队列,就调用park方法阻塞,交出锁。
LockSupport.park(this);
// 被唤醒后,首先去检查中断,然后退出循环
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 检查完中断去获取锁,然后做一些校验
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
//
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
// 这个方法会释放当前节点占用的所有资源,注意是所有。
final int fullyRelease(Node node) {
boolean failed = true;
try {
// 直接拿到现在的资源数
int savedState = getState();
// 然后释放全部
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
// 注意,这里会把node节点的状态置为CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
final boolean isOnSyncQueue(Node node) {
// 如果node节点是在CONDITION状态,肯定不在aqs队列。不懂的看下面的transferForSignal()方法
// 如果node节点的prev节点为null,肯定不在aqs队列。这个问题我放到最后单独讲
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
// 这是个兜底逻辑。就是遍历aqs节点,看node节点是否存在。
return findNodeFromTail(node);
}
signalAll()
public final void signalAll() {
// 调用signalAll()会首先校验调用signalAll()的线程是不是当前锁持有线程。不是会抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 从头开始处理队列
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(Node first) {
// 官方注释总是简单明了。。
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
// 把节点从CONDITION设置为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
// 熟悉的方法,不再讲
Node p = enq(node);
int ws = p.waitStatus;
// 把节点从0设置为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
上述这三个方法,就是把Condition对象的队列remove,然后add到aqs的队列里。需要注意一点,add到aqs队列,并不是被唤醒!!!锁一次只能被一个线程持有,所以还是逐个被唤醒的,只有调用signalAll()的线程在释放锁之后,才去唤醒next节点。至此,Condition的源码已经结束了。
流程分析:
我们用一个表格简单分析一下流程,这里我们不考虑指令优化,前面的代码总是优于后面的代码先执行。本来想画个动态图的,奈何不会AE, 凑合看吧。有好用的免费的画图软件,也请@我。
boy-one | boy-two | girl-one | girl-two | 饲养员 | 队列情况 |
---|---|---|---|---|---|
lock()拿到锁 | |||||
调用await() | |||||
把自己添加到smoke队列 | smoke:boy-one | ||||
释放锁,休眠 | |||||
lock()拿到锁 | |||||
调用await() | |||||
把自己添加到smoke队列 | smoke:boy-one-->boy-two | ||||
释放锁,休眠 | |||||
lock()拿到锁 | |||||
调用await() | |||||
把自己添加到snacks队列 | smoke:boy-one-->boy-two snacks:girl-one | ||||
释放锁,休眠 | |||||
lock()拿到锁 | |||||
调用await() | |||||
把自己添加到snacks队列 | smoke:boy-one-->boy-two snacks: girl-one-->girl-two | ||||
释放锁,休眠 | |||||
lock()拿到锁 | |||||
调用smoke.signalAll() | smoke:snacks:girl-one-->girl-two aqs:boy-one-->boy-two | ||||
调用snacks.signalAll() | smoke:snacks: aqs:boy-one-->boy-two-->girl-one-->girl-two | ||||
释放锁 | |||||
aqs唤醒boy-one | |||||
被唤醒,拿到锁 | |||||
执行逻辑 | |||||
释放锁,唤醒下一个 | |||||
被唤醒,拿到锁 | |||||
执行逻辑 | |||||
释放锁,唤醒下一个 | |||||
被唤醒,拿到锁 | |||||
执行逻辑 | |||||
释放锁,唤醒下一个 | |||||
被唤醒,拿到锁 | |||||
执行逻辑 | |||||
释放锁,唤醒下一个 | |||||
发现没有下一个,结束。 |
我们可以看到,整个流程都是围绕一把锁展开的,那么如果我们不加锁,也就是我们把lock代码都去掉,流程又会怎么样呢?
@Slf4j
public class ReentrantLockTest {
private static ReentrantLock lock = new ReentrantLock();
private static boolean hasSmoke = false;
private static boolean hasSnacks = false;
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 5, 1,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
executor.setCorePoolSize(20);
Condition smoke = lock.newCondition();
Condition snacks = lock.newCondition();
executor.submit(()->{
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-one,没有烟,干活没动力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-one,有烟了,嘬一口.");
});
executor.submit(()->{
while (!hasSmoke) {
log.info(Thread.currentThread().getName() + "我是boy-two,没有烟,干活没动力");
try {
smoke.await();
} catch (InterruptedException e) {
}
}
log.info("我是boy-two,有烟了,嘬一口.");
});
executor.submit(()->{
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-one,没有零食吃,干活没力气");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-one, 有零食了,吃一口.");
});
executor.submit(()->{
while (!hasSnacks) {
log.info(Thread.currentThread().getName() + "我是girl-two,没有零食吃,干活没力气");
try {
snacks.await();
} catch (InterruptedException e) {
}
}
log.info("我是girl-two, 有零食了,吃一口.");
});
executor.submit(()->{
lock.lock();
try {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
hasSmoke = true;
log.info("烟来喽");
smoke.signalAll();
hasSnacks = true;
log.info("零食来喽");
snacks.signalAll();
}finally {
lock.unlock();
}
});
executor.shutdown();
}
}
11:52:51.473 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,没有零食吃,干活没力气
11:52:51.473 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,没有烟,干活没动力
11:52:51.473 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,没有零食吃,干活没力气
11:52:51.473 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,没有烟,干活没动力
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 烟来喽
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食来喽
在调用signalAll()方法的时候,会强制校验当前线程是否持有锁,否则会抛出异常,所以饲养员的锁必须加,boy和girl的锁我们去掉了。
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
我们看到运行结果里,boy和girl都不干活了,这是咋回事呢?我们分析一下。
当boy和girl调用smoke或者snacks的await()方法,会把自己添加到相应的队列。然后调用fullyRelease(node)释放锁,释放锁的方法里会校验当前持有线程,不是的话会抛异常
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 这里抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
然后在fullyRelease(Node node)方法里,finally的代码块会执行,node节点ws状态置为CANCELLED。
饲养员在调用signalAll()方法,把队列挪到aqs队列的过程中在doSignalAll(Node first)里的transferForSignal()方法中
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
会首先用cas修改node节点状态,由于节点是CANCELLED,所以cas会失败,失败就直接返回。节点从Condition队列中移除,加入aqs队列失败。因此,这个节点就灰飞烟灭了。
遗留问题
- 如果node节点的prev节点为null,肯定不在aqs队列
为什么这么说呢,Condition维护的队列,是单链表,用nextWaiter指向下个节点
Aqs维护的队列,是双向链表,用prev和next指向前后节点,所以Condition维护的队列的prev必为null,而Aqs维护的队列的真实node节点的prev必不为null,因为初始化队列的时候会在头部放一个dummy node