CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行。
它是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务。
CountDownLatch的组成
首先来一下Sync,它是CountDownLatch的同步控件(计数器),使用AQS状态表示计数。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
// 重写AQS获取共享锁的方法,AQSstate==0的时候,表示锁是空闲的
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
// 重写AQS释放共享锁的方法
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 计数-1,CAS更新计数。等于0的时候释放成功
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
构造函数
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
构造函数就是初始化Sync的计数,也就是AQS的state属性
等待方法
await 使当前线程等待,直到计数到零为止。除非该线程被中断了
public void await() throws InterruptedException {
// 调用AQS的获取共享锁的方法(可中断),加入同步队列,并挂起当前线程
sync.acquireSharedInterruptibly(1);
}
减少计数方法
减少计数,如果计数达到零,则释放所有等待线程。
public void countDown() {
sync.releaseShared(1);
}
releaseShared源码里可以看到只释放了head.next节点的线程,它是怎么释放所有等待线程的?
这里贴一下releaseShared的源码,可以看到releaseShared调用了tryReleaseShared获取共享模式的锁,如果获取成功了则调用doReleaseShared唤醒后继节点
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
tryReleaseShared必须要子类实现,下面看下CountDownLatch的tryReleaseShared
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
// 减CountDownLatch计数
int nextc = c-1;
if (compareAndSetState(c, nextc))
// 如果计数是0那么返回true
return nextc == 0;
}
}
如果tryReleaseShared返回true就会调用doReleaseShared唤醒后继节点
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
// 旧head的waitStatus=-1,那么调用unparkSuccessor唤醒后继节点
// 并且把旧head的waitStatus更新成0
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
// 旧head的waitStatus已经是0了,那么更新成-3,也就是PROPAGATE
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// head没有变过,break
if (h == head) // loop if head changed
break;
}
}
关键点就在调用unparkSuccessor这,唤醒了后继节点的线程,那么这个被挂起的线程是在哪被挂起的呢?是调用了CountDownLatch的await方法之后最终调到AQS的doAcquireSharedInterruptibly方法,parkAndCheckInterrupt里面挂起的
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
被唤醒的节点被挂起的线程,还会继续循环,此时tryAcquireShared的返回值一定会大于0,因为此时CountDownLatch的计数是0(上面tryAcquireShared方法)。那么进入到setHeadAndPropagate方法,这个方法会唤醒node.next节点的线程,依次类推,node.next还会唤醒node.next.next
也就是说调用countDown方法,如果CountDownLatch的计数是0了,那么会唤醒head.next节点的线程,被唤醒的线程还会唤醒他后继节点的线程.