AQS学习

1.独占式同步状态获取

AQS提供了很多模板方法,模板方法中已经定义好了各种行为,只需要实现其中几个关键的行为(接口),就可以复用整体的逻辑,有较好的框架和复用性。

1.1 获取同步执行权-acquire

AQS底层是一个双向队列,也称CLH队列(其实就是仨人名)。当获取执行权时,有两种可能,获取到了(皆大欢喜),没获取到(队尾排队去)。

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

首先尝试获取(==tryAcquire==),如果获取不到则尝试入队(==acquireQueued==,其中会自旋继续尝试获取执行权)。

1.1.1 尝试获取同步执行权-tryAcquire

其中的==tryAcquire==需要继承方线程安全的实现获取方法。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

1.1.2 尝试入队并自旋尝试获取执行权-acquireQueued

尝试入队是通过==addWaiter==方法进行的。完了以后就是在==acquireQueued==方法中,尝试获取执行权。
问:AQS中什么时候能获取到执行权,当封装了==Thread==的节点信息排到队首的时候。
所以==acquireQueued==方法中就会自旋的检查当前节点到没到队首啊,没到的话,继续block。

 1: final boolean acquireQueued(final Node node, int arg) {
 2:     // 记录是否获取同步状态成功
 3:     boolean failed = true;
 4:     try {
 5:         // 记录过程中,是否发生线程中断
 6:         boolean interrupted = false;
 7:         /*
 8:          * 自旋过程,其实就是一个死循环而已
 9:          */
10:         for (;;) {
11:             // 当前线程的前驱节点
12:             final Node p = node.predecessor();
13:             // 当前线程的前驱节点是头结点,且同步状态成功
14:             if (p == head && tryAcquire(arg)) {
15:                 setHead(node);
16:                 p.next = null; // help GC
17:                 failed = false;
18:                 return interrupted;
19:             }
20:             // 获取失败,线程等待--具体后面介绍
21:             if (shouldParkAfterFailedAcquire(p, node) &&
22:                     parkAndCheckInterrupt())
23:                 interrupted = true;
24:         }
25:     } finally {
26:         // 获取同步状态发生异常,取消获取。
27:         if (failed)
28:             cancelAcquire(node);
29:     }
30: }

1.1.3 如何判断是否要进入block状态?- shouldParkAfterFailedAcquire

入参是当前节点的前节点和当前节点
4~9 判断前节点是不是已经处于等待状态(==Node.SIGNAL==),如果前节点已经处于等待状态了,那就说明当前节点更应该处于等待状态,毕竟CLH队列是一个FIFO的队列,判断完成,应该block,返回。
10~18 如果前节点已经被取消(==Node.CANCEL==),已经取消的节点不应该成为当前节点是否应该入队的考虑条件,所以一直向前探,探到第一个状态为非取消的节点,然后返回不应该block,下一次是否应该block由下一次进入==shouldParkAfterFailedAcquire==再决定。【todo,为什么不在本次调用就判断?毕竟可以做到知道前节点的状态。】

 1: private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2:     // 获得前一个节点的等待状态
 3:     int ws = pred.waitStatus;
 4:     if (ws == Node.SIGNAL) //  Node.SIGNAL
 5:         /*
 6:          * This node has already set status asking a release
 7:          * to signal it, so it can safely park.
 8:          */
 9:         return true;
10:     if (ws > 0) { // Node.CANCEL
11:         /*
12:          * Predecessor was cancelled. Skip over predecessors and
13:          * indicate retry.
14:          */
15:         do {
16:             node.prev = pred = pred.prev;
17:         } while (pred.waitStatus > 0);
18:         pred.next = node;
19:     } else { // 0 或者 Node.PROPAGATE
20:         /*
21:          * waitStatus must be 0 or PROPAGATE.  Indicate that we
22:          * need a signal, but don't park yet.  Caller will need to
23:          * retry to make sure it cannot acquire before parking.
24:          */
25:         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
26:     }
27:     return false;
28: }

1.1.4 如何进入block状态? - parkAndCheckInterrupt

什么时候会调用这个方法?首先调用==acquire==方法,尝试获取执行权,如果失败,会尝试入队==acquireQueued==,在入队中,也在尝试获取执行权,如果获取失败,会先调用==shouldParkAfterFailedAcquire==判断当前节点对应的线程是否应该block,如果应该,则调用==parkAndCheckInterrupt==对线程进行block处理。
看下源码实际执行逻辑
首先调用==LockSupport.park==将当前线程block。【todo LockSupport.park的原理】
然后当前线程是否被被打断过,==Thread.interrupted==。【todo interrupted标志位有什么用?】
正常情况下,会有两种被打断的情况

  1. 前节点释放执行权利,唤醒当前节点。
  2. 当前线程被打断导致唤醒。【todo 不甚理解这句话,和前一个todo一起理解】

1.1.5 遇到异常情况,如何取消入队操作 - cancelAcquire

入队发生异常时,调用cancelAcquire(Node node),node为想要入队的节点。目的是取消当前节点的入队操作,并且当前节点从同步队列中删除。
当前节点分三种状态,队首,队尾和队中。位于队尾和队中的节点,将自己从队中删除即可。但是涉及到队首的节点,队首的节点有什么特殊意义呢?排到队首的节点自动获取当前同步状态的执行权利,所以不能简单的还将自己删除,还需要将执行权利向下传递。这也就是24~26(队首),29~36(队中)和38(队首)代码的含义。具体的传递执行权利的逻辑还需要看 1.1.6 unparkSuccessor
3行 常规的判空操作
6行 将要删除的节点对应的线程置空 【todo,为什么要置空,是不是其他调用处用到了==thread == null==去做已删除节点的判断】
9~11行 跳过已经处于取消状态的节点,前探到第一个非取消状态的节点。
16行 获取当前节点的前节点。看下注释,==predNext==引用指向的节点是明显的第一个不用再跳过的节点,如果不是的化,下面的CAS操作将会失败。失败的原因是另一个线程比我们执行的快,它可能提前进行了取消或者通知操作。所以在这步,不需要额外的操作。
21行 将当前节点状态置成取消状态。下面的删除操作有可能执行不到,所以需要将当前节点状态置成取消状态,这样的话,其他节点操作时,可以凭借此状态,判断当前节点不需要再被执行,继而跳过当前节点。
24~26 如果当前节点是尾节点,则使用CAS,将前节点的==next域==置null。这里就用到了第16行的逻辑,使用CAS保证的是当其他线程已经更改了当前节点的前节点的话,这里将会失败,然后退出==cancelAcquire方法==。继而使用第21行的逻辑,其他节点执行操作时,会凭借==Node.CANCELLED==状态,跳过当前节点。
27~36 删除队中节点。【todo,看下30~33行的逻辑,为什么当前节点要被取消了,就要改变前节点的ws?只要<=0就要改成==Node.Signal==,我推测应该是ws之间有正确的流转状态,这个很重要,要探索出来。】
36行 然后给前节点和后节点做桥,将二者关联起来。相当于将自己从队列中删除掉了。
33行处于29~36(队中),但是是为了防止误删队首节点,导致执行权利无法向下传递的保证。因为可能开始执行29行时,当前节点还是队中,但随着执行,当前节点可能就被消费到了队首的位置,只有队首节点的==Node.Thread==才有可能为null。
38行 队首节点对应的分支,唤醒继任者,具体的逻辑看1.1.6 unparkSuccessor

看下源码

 1: private void cancelAcquire(Node node) {
 2:     // Ignore if node doesn't exist
 3:     if (node == null)
 4:         return;
 5: 
 6:     node.thread = null;
 7: 
 8:     // Skip cancelled predecessors
 9:     Node pred = node.prev;
10:     while (pred.waitStatus > 0)
11:         node.prev = pred = pred.prev;
12: 
13:     // predNext is the apparent node to unsplice. CASes below will
14:     // fail if not, in which case, we lost race vs another cancel
15:     // or signal, so no further action is necessary.
16:     Node predNext = pred.next;
17: 
18:     // Can use unconditional write instead of CAS here.
19:     // After this atomic step, other Nodes can skip past us.
20:     // Before, we are free of interference from other threads.
21:     node.waitStatus = Node.CANCELLED;
22: 
23:     // If we are the tail, remove ourselves.
24:     if (node == tail && compareAndSetTail(node, pred)) {
25:         compareAndSetNext(pred, predNext, null);
26:     } else {
27:         // If successor needs signal, try to set pred's next-link
28:         // so it will get one. Otherwise wake it up to propagate.
29:         int ws;
30:         if (pred != head &&
31:             ((ws = pred.waitStatus) == Node.SIGNAL ||
32:              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
33:             pred.thread != null) {
34:             Node next = node.next;
35:             if (next != null && next.waitStatus <= 0)
36:                 compareAndSetNext(pred, predNext, next);
37:         } else {
38:             unparkSuccessor(node);
39:         }
40: 
41:         node.next = node; // help GC
42:     }
43: }

1.1.6 唤醒执行权利的继承者 - unparkSuccessor

【todo,将waitstutas置为0,0的含义是什么,在cancelAcquire中,删除队中节点时(29~36),也判断了0】
首先判断当前当前节点的状态,如果<0,意味着当前节点的状态还为存活状态,因为只有被取消的节点状态,才置为1。然后将当前节点状态置为0,0我理解是已经获取过执行权利的节点状态值。
随后唤醒下一个可被唤醒的节点,已经取消的节点会被跳过【todo,不会存在 可用-不可用-可用-不可用的节点顺序吗?】
最终调用==LockSupport.unpark==对后继结点进行唤醒操作。【todo,LockSupport原理】


private void unparkSuccessor(Node node) {
    //当前节点状态
    int ws = node.waitStatus;
    //当前状态 < 0 则设置为 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //当前节点的后继节点
    Node s = node.next;
    //后继节点为null或者其状态 > 0 (超时或者被中断了)
    if (s == null || s.waitStatus > 0) {
        s = null;
        //从tail节点来找可用节点
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //唤醒后继节点
    if (s != null)
        LockSupport.unpark(s.thread);
}

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容