1.独占式同步状态获取
AQS提供了很多模板方法,模板方法中已经定义好了各种行为,只需要实现其中几个关键的行为(接口),就可以复用整体的逻辑,有较好的框架和复用性。
1.1 获取同步执行权-acquire
AQS底层是一个双向队列,也称CLH队列(其实就是仨人名)。当获取执行权时,有两种可能,获取到了(皆大欢喜),没获取到(队尾排队去)。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先尝试获取(==tryAcquire==),如果获取不到则尝试入队(==acquireQueued==,其中会自旋继续尝试获取执行权)。
1.1.1 尝试获取同步执行权-tryAcquire
其中的==tryAcquire==需要继承方线程安全的实现获取方法。
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
1.1.2 尝试入队并自旋尝试获取执行权-acquireQueued
尝试入队是通过==addWaiter==方法进行的。完了以后就是在==acquireQueued==方法中,尝试获取执行权。
问:AQS中什么时候能获取到执行权,当封装了==Thread==的节点信息排到队首的时候。
所以==acquireQueued==方法中就会自旋的检查当前节点到没到队首啊,没到的话,继续block。
1: final boolean acquireQueued(final Node node, int arg) {
2: // 记录是否获取同步状态成功
3: boolean failed = true;
4: try {
5: // 记录过程中,是否发生线程中断
6: boolean interrupted = false;
7: /*
8: * 自旋过程,其实就是一个死循环而已
9: */
10: for (;;) {
11: // 当前线程的前驱节点
12: final Node p = node.predecessor();
13: // 当前线程的前驱节点是头结点,且同步状态成功
14: if (p == head && tryAcquire(arg)) {
15: setHead(node);
16: p.next = null; // help GC
17: failed = false;
18: return interrupted;
19: }
20: // 获取失败,线程等待--具体后面介绍
21: if (shouldParkAfterFailedAcquire(p, node) &&
22: parkAndCheckInterrupt())
23: interrupted = true;
24: }
25: } finally {
26: // 获取同步状态发生异常,取消获取。
27: if (failed)
28: cancelAcquire(node);
29: }
30: }
1.1.3 如何判断是否要进入block状态?- shouldParkAfterFailedAcquire
入参是当前节点的前节点和当前节点
4~9 判断前节点是不是已经处于等待状态(==Node.SIGNAL==),如果前节点已经处于等待状态了,那就说明当前节点更应该处于等待状态,毕竟CLH队列是一个FIFO的队列,判断完成,应该block,返回。
10~18 如果前节点已经被取消(==Node.CANCEL==),已经取消的节点不应该成为当前节点是否应该入队的考虑条件,所以一直向前探,探到第一个状态为非取消的节点,然后返回不应该block,下一次是否应该block由下一次进入==shouldParkAfterFailedAcquire==再决定。【todo,为什么不在本次调用就判断?毕竟可以做到知道前节点的状态。】
1: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
2: // 获得前一个节点的等待状态
3: int ws = pred.waitStatus;
4: if (ws == Node.SIGNAL) // Node.SIGNAL
5: /*
6: * This node has already set status asking a release
7: * to signal it, so it can safely park.
8: */
9: return true;
10: if (ws > 0) { // Node.CANCEL
11: /*
12: * Predecessor was cancelled. Skip over predecessors and
13: * indicate retry.
14: */
15: do {
16: node.prev = pred = pred.prev;
17: } while (pred.waitStatus > 0);
18: pred.next = node;
19: } else { // 0 或者 Node.PROPAGATE
20: /*
21: * waitStatus must be 0 or PROPAGATE. Indicate that we
22: * need a signal, but don't park yet. Caller will need to
23: * retry to make sure it cannot acquire before parking.
24: */
25: compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
26: }
27: return false;
28: }
1.1.4 如何进入block状态? - parkAndCheckInterrupt
什么时候会调用这个方法?首先调用==acquire==方法,尝试获取执行权,如果失败,会尝试入队==acquireQueued==,在入队中,也在尝试获取执行权,如果获取失败,会先调用==shouldParkAfterFailedAcquire==判断当前节点对应的线程是否应该block,如果应该,则调用==parkAndCheckInterrupt==对线程进行block处理。
看下源码实际执行逻辑
首先调用==LockSupport.park==将当前线程block。【todo LockSupport.park的原理】
然后当前线程是否被被打断过,==Thread.interrupted==。【todo interrupted标志位有什么用?】
正常情况下,会有两种被打断的情况
- 前节点释放执行权利,唤醒当前节点。
- 当前线程被打断导致唤醒。【todo 不甚理解这句话,和前一个todo一起理解】
1.1.5 遇到异常情况,如何取消入队操作 - cancelAcquire
入队发生异常时,调用cancelAcquire(Node node),node为想要入队的节点。目的是取消当前节点的入队操作,并且当前节点从同步队列中删除。
当前节点分三种状态,队首,队尾和队中。位于队尾和队中的节点,将自己从队中删除即可。但是涉及到队首的节点,队首的节点有什么特殊意义呢?排到队首的节点自动获取当前同步状态的执行权利,所以不能简单的还将自己删除,还需要将执行权利向下传递。这也就是24~26(队首),29~36(队中)和38(队首)代码的含义。具体的传递执行权利的逻辑还需要看 1.1.6 unparkSuccessor。
3行 常规的判空操作
6行 将要删除的节点对应的线程置空 【todo,为什么要置空,是不是其他调用处用到了==thread == null==去做已删除节点的判断】
9~11行 跳过已经处于取消状态的节点,前探到第一个非取消状态的节点。
16行 获取当前节点的前节点。看下注释,==predNext==引用指向的节点是明显的第一个不用再跳过的节点,如果不是的化,下面的CAS操作将会失败。失败的原因是另一个线程比我们执行的快,它可能提前进行了取消或者通知操作。所以在这步,不需要额外的操作。
21行 将当前节点状态置成取消状态。下面的删除操作有可能执行不到,所以需要将当前节点状态置成取消状态,这样的话,其他节点操作时,可以凭借此状态,判断当前节点不需要再被执行,继而跳过当前节点。
24~26 如果当前节点是尾节点,则使用CAS,将前节点的==next域==置null。这里就用到了第16行的逻辑,使用CAS保证的是当其他线程已经更改了当前节点的前节点的话,这里将会失败,然后退出==cancelAcquire方法==。继而使用第21行的逻辑,其他节点执行操作时,会凭借==Node.CANCELLED==状态,跳过当前节点。
27~36 删除队中节点。【todo,看下30~33行的逻辑,为什么当前节点要被取消了,就要改变前节点的ws?只要<=0就要改成==Node.Signal==,我推测应该是ws之间有正确的流转状态,这个很重要,要探索出来。】
36行 然后给前节点和后节点做桥,将二者关联起来。相当于将自己从队列中删除掉了。
33行处于29~36(队中),但是是为了防止误删队首节点,导致执行权利无法向下传递的保证。因为可能开始执行29行时,当前节点还是队中,但随着执行,当前节点可能就被消费到了队首的位置,只有队首节点的==Node.Thread==才有可能为null。
38行 队首节点对应的分支,唤醒继任者,具体的逻辑看1.1.6 unparkSuccessor。
看下源码
1: private void cancelAcquire(Node node) {
2: // Ignore if node doesn't exist
3: if (node == null)
4: return;
5:
6: node.thread = null;
7:
8: // Skip cancelled predecessors
9: Node pred = node.prev;
10: while (pred.waitStatus > 0)
11: node.prev = pred = pred.prev;
12:
13: // predNext is the apparent node to unsplice. CASes below will
14: // fail if not, in which case, we lost race vs another cancel
15: // or signal, so no further action is necessary.
16: Node predNext = pred.next;
17:
18: // Can use unconditional write instead of CAS here.
19: // After this atomic step, other Nodes can skip past us.
20: // Before, we are free of interference from other threads.
21: node.waitStatus = Node.CANCELLED;
22:
23: // If we are the tail, remove ourselves.
24: if (node == tail && compareAndSetTail(node, pred)) {
25: compareAndSetNext(pred, predNext, null);
26: } else {
27: // If successor needs signal, try to set pred's next-link
28: // so it will get one. Otherwise wake it up to propagate.
29: int ws;
30: if (pred != head &&
31: ((ws = pred.waitStatus) == Node.SIGNAL ||
32: (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
33: pred.thread != null) {
34: Node next = node.next;
35: if (next != null && next.waitStatus <= 0)
36: compareAndSetNext(pred, predNext, next);
37: } else {
38: unparkSuccessor(node);
39: }
40:
41: node.next = node; // help GC
42: }
43: }
1.1.6 唤醒执行权利的继承者 - unparkSuccessor
【todo,将waitstutas置为0,0的含义是什么,在cancelAcquire中,删除队中节点时(29~36),也判断了0】
首先判断当前当前节点的状态,如果<0,意味着当前节点的状态还为存活状态,因为只有被取消的节点状态,才置为1。然后将当前节点状态置为0,0我理解是已经获取过执行权利的节点状态值。
随后唤醒下一个可被唤醒的节点,已经取消的节点会被跳过【todo,不会存在 可用-不可用-可用-不可用的节点顺序吗?】
最终调用==LockSupport.unpark==对后继结点进行唤醒操作。【todo,LockSupport原理】
private void unparkSuccessor(Node node) {
//当前节点状态
int ws = node.waitStatus;
//当前状态 < 0 则设置为 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//当前节点的后继节点
Node s = node.next;
//后继节点为null或者其状态 > 0 (超时或者被中断了)
if (s == null || s.waitStatus > 0) {
s = null;
//从tail节点来找可用节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//唤醒后继节点
if (s != null)
LockSupport.unpark(s.thread);
}