从CountDownLatch的构造方法开始一步一步的分析,代码里面有详细的步骤,以数字1开头到16是调用countDown方法并且,state值大于零的情况。以aaa开头是处理state为0的情况。
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
//2、调用从AbstractQueuedSynchronizer继承的setState方法
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
//EEE 这里可以看到判断了一下state值,基于我们刚才的假设【AAA 这里是先基于countDown次数是3 ,刚调用一个countDown接着执行到await的情况分析】
//这返回的是-1 回到刚才CCC
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//8、countdownLatch的count方法实际起作用的首先是这个方法,releases就是构造方法里面的count也是AQS里面的state值,假设传的3
protected boolean tryReleaseShared(int releases) {
for (;;) {
//aaa、这里如果自旋最后直到0了也没拿到看下逻辑,先返回false,结合之前分析的第15步可以看到直接结束了,和没调用一样,看下最后一个人-1成功的处理,搜bbb步
int c = getState();
if (c == 0)
return false;
//9、先看这里不为0的情况,减1操作
int nextc = c-1;
//10、CAS方法在AQS里面,过去看一下
//12、经过第十一步可以知道,这里有可能返回false,所以有了 for (;;) 这个自旋,号称自旋锁就是一个死循环
//13、这里我们先按没有并发,所以成功并且nextc==2,接着往下走
if (compareAndSetState(c, nextc))
//14、这里返回false
//bbb、这里如果是最后一个人拿到会返回true,到了ccc
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
//1、new了一个Sync的类
this.sync = new Sync(count);
}
//AAA 这里是先基于countDown次数是3 ,刚调用一个countDown接着执行到await的情况分析
public void await() throws InterruptedException {
///BBB 这个方法是从AQS继承过来的,过去看
sync.acquireSharedInterruptibly(1);
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
//4、开始调用countDown,sync里面没这个方法,从AQS继承的,过去看看
//16、经过第15步结束,总结一下,countDown()就是利用CAS把state-1了
public void countDown() {
sync.releaseShared(1);
}
public long getCount() {
return sync.getCount();
}
public String toString() {
return super.toString() + "[Count = " + sync.getCount() + "]";
}
}
//AQS类部分源码
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
private volatile int state;
protected final int getState() {
return state;
}
//3、把countDownLatch构造方法的count值传到了这里
protected final void setState(int newState) {
state = newState;
}
//5、调用countDownLatch的countDown实际调用的是这里
public final boolean releaseShared(int arg) {
//6、先看tryReleaseShared方法
//15、经过14得出这里是false,先return到了第四步
if (tryReleaseShared(arg)) {
//ccc、最后一个人-1成功到了这里,搜ddd看
doReleaseShared();
return true;
}
return false;
}
//7、这里是一个抽象方法,具体实现交给了Sync([模板设计模式](https://www.jianshu.com/p/ba99bd254494))再回到Sync里面
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
//11、调用的unsafe方法,//public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);native代码,水平有限不往下了,解释下两个参数,就是检查想改的变量还是不是改之前拿到的值 类似sql update a=1 from table1 where a=2;
//结合注释 Atomically sets synchronization state to the given updated,可以看出来state会被设置为update的值
//怎么进去的,搜FirstMark
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
//FirstMark: AQS的代码 就是通过反射来设置的,哈哈哈哈,11补里面的 stateOffset 就是state参数
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
//ddd、最后一个人countDown成功到了这里
private void doReleaseShared() {
//又是一个自旋锁,从之前分析可以看到只是new CountDownLatch()和调用countDown方法,head是没赋过值的,所以这里直接跳出,啥也没干,await的时候会有用搜AAA进入await方法分析
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
/**UUU 通过TTT过来的 会到这里,刚才TTT里面执行到的shouldParkAfterFailedAcquire方法会设置waitStatus为Node.SIGNA,发现是SIGNAL 调用unpark恢复被await掉的线程,over
这里具体说一下 比如 new CountDownLatch(3) 一直countDown 不调用await,就是第一步分析的,减成0以后
再调用也没用了
tryReleaseShared方法里的这个三行代码
int c = getState();
if (c == 0)
return false;
如果先执行的await,那就是AAA到UUU分析的可能到了unpark(这里还有别的场景),等待最后一次countDown来调用park唤醒
**/
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
//CCC 到了这里
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
//直接看这个方法到DDD
if (tryAcquireShared(arg) < 0)
//FFF 从EEE知道这里-1 执行这个方法
doAcquireSharedInterruptibly(arg);
}
//DDD 抽象方法实现在Sync中
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//GGG 到了这里,往下看
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//HHH 先进入这里 SHARED 是Node里面的new的一个对象 static final Node SHARED = new Node(); 直接放这里了,进入JJJ (addWaiter比较复杂可以参照图1是执行之后的结果)
final Node node = addWaiter(Node.SHARED);
//QQQ 经过addWait的执行,node有了值
boolean failed = true;
try {
for (;;) {
//RRR 这个方法就是把prev拿出来,这时候的prev刚才说 是个双向链表 所有它有值就是那个空内容的Node对象
final Node p = node.predecessor();
//SSS 根据图1 可以知道 这两个不相等
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//TTT 到了这里,shouldParkAfterFailedAcquire这个方法就是设置了waitStatus ,然后parkAndCheckInterrupt 这个就是调用park (和sleep很像,有几点区别比如无编译异常 可以调用unpark主动唤醒等)进入休眠,然后就在这自旋旋啊旋 这时候又得回到countDown中提到的每次countdown完毕都会调用doReleaseShared 这个方法,再过去看看搜UUU
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//III 到这里
private Node addWaiter(Node mode) {
//JJJ 这里其实就是刚才主动new的对象作为当前线程的next 构造方法 实现
/**
Node(Thread thread, Node mode) { // Used by addWaiter 这里都有备注就是为了addWaiter使用的
this.nextWaiter = mode;
this.thread = thread;
}
**/
//也就是到了这里大概的意思就是 谁掉用await就把它设置为当前执行线程的next,这其实就已经构建了一个链表了 单核情况下 只有一个在执行 就是一条链表,多CPU就是多条链表(到这里是这样理解)
Node node = new Node(Thread.currentThread(), mode);
// KKK 这里知道 之前没动过tail 这就是null
Node pred = tail;
//LLL 所以这个不走
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//MMM 进入NNN看
enq(node);
//经过NNN的操作 就是搞了node ,它的thread指向当前的线程,prev指向新new的对象,新new对象的next指向自己,回到QQQ
return node;
}
//NNN到了这里
private Node enq(final Node node) {
for (;;) { //又来一个自旋
Node t = tail;//这里还是null
if (t == null) { // Must initialize 源码的注释 开始初始化,进入OOO
if (compareAndSetHead(new Node()))
//PPP 成功以后赋值给了tail 总结一下就是new 了个空Node 给tail和head,再进入循环以后 由于tail不是null,所以进入了else代码
tail = head;
} else {
//到了这里 就是把参数带进来的node.pre 指向了刚创建的空node,再把刚创建的next指向了node,双向链表回到MMM
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//OOO 又是调用unsafe的方法 通过之前知道 就是head指向了这个对象
private final boolean compareAndSetHead(Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
}