只有一个变量,这个sync是继承了AQS
private final Sync sync;
当我们new CountdownLatch的时候,构造器里的值会最终被set到AQS的state里面.
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
Sync
Sync(int count) {
//aqs的方法
setState(count);
}
还有个核心的方法就是countdown了
public void countDown() {
sync.releaseShared(1);
}
AQS的
public final boolean releaseShared(int arg) {
//子类实现.
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
CountdownLatch的实现,也是非常的简单:
先获取state的值,如果==0,说明释放完了,不需要再释放了.
如果大于0,那么cas设置成减一的值.最后判断是否等于0.
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
//减一
int nextc = c-1;
//如果不满足,那么接着死循环,重新获取state的值.
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
还有个重要的方法await.
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
AQS里:这个方法是响应中断的,响应中断,就是检查一下,如果发生中断了,就把异常抛上去.
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//子类实现.
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
如果state不等于0,那么就返回-1.
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
原理:死循环等待state==0.然后为了避免线程一直在空转,加个park方法.
unpark的时机,详见unparkSuccessor,看看调用的地方.
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
//死循环
for (;;) {
final Node p = node.predecessor();
//如果前一个节点是首节点.
if (p == head) {
//子类实现
int r = tryAcquireShared(arg);
//此时,state == 0
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//此时前面还有节点,是否要park线程.
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
//如果失败了,cancel.
if (failed)
cancelAcquire(node);
}
}
waitStatus有哪些枚举值?
static final int CANCELLED = 1;
//waitStatus value to indicate successor's thread needs unparking.暗示下一个节点需要unpark.
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
如果获取
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//什么时候等于SIGNAL?
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.(如果上一个节点的状态是signal,返回ture,需要park线程.)
*/
return true;
//过滤掉已经取消的节点.
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//设置的是上一个节点的waitStatus!!!下一次就会park.多走一次循环
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
停止这个线程,然后判断线程有没有被打断.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}