4.Condition源码解析

Condition 是一个多线程协调通信的工具类,可以让某些线程一起等待某个条件(condition),只有满足条件时,线程才会被唤醒,相当于object的 wait 和 notify 的功能。

public class ConditionWait implements Runnable{

    private Lock lock;
    private Condition condition;

    public ConditionWait(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        try {
            lock.lock(); //竞争锁
            try {
                System.out.println("begin - ConditionWait");
                condition.await();//阻塞(1. 释放锁, 2.阻塞当前线程, FIFO(单向、双向))
                System.out.println("end - ConditionWait");

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }finally {
            lock.unlock();//释放锁
        }
    }
}
public class ConditionNotify implements Runnable{

    private Lock lock;
    private Condition condition;

    public ConditionNotify(Lock lock, Condition condition) {
        this.lock = lock;
        this.condition = condition;
    }

    @Override
    public void run() {
        try{
            lock.lock();//获得了锁.
            System.out.println("begin - conditionNotify");
            condition.signal();//唤醒阻塞状态的线程
            System.out.println("end - conditionNotify");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock(); //释放锁
        }
    }
}

Condition 源码分析

condition.await

调用 Condition,需要获得 Lock 锁,所以意味着会存在一个 AQS 同步队列,在上面那个案例中,假如两个线程同时运行的话,那么 AQS 的队列可能是下面这种情况


image.png

那么这个时候 ThreadA 调用了 condition.await 方法,它做了什么事情呢?

 public final void await() throws InterruptedException {
        if (Thread.interrupted()) //表示 await 允许被中断
            throw new InterruptedException();
        Node node = addConditionWaiter(); //创建一个新的节点,节点状态为 condition,采用的数据结构仍然是链表
        int savedState = fullyRelease(node); //释放当前的锁,得到锁的状态,并唤醒 AQS 队列中的一个线程
        int interruptMode = 0;
        //如果当前节点没有在同步队列上,即还没有被 signal,则将当前线程阻塞
        while (!isOnSyncQueue(node)) {//判断这个节点是否在 AQS 队列上,第一次判断的是 false,因为前面已经释放锁了
            LockSupport.park(this); //通过 park 挂起当前线程
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // 当这个线程醒来,会尝试拿锁, 当 acquireQueued 返回 false 就是拿到锁了.
        // interruptMode != THROW_IE -> 表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了.
        // 将这个变量设置成 REINTERRUPT.
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 如果 node 的下一个等待者不是 null, 则进行清理,清理 Condition 队列上的节点.
        // 如果是 null ,就没有什么好清理的了.
        if (node.nextWaiter != null) // clean up if cancelled
            unlinkCancelledWaiters();
        // 如果线程被中断了,需要抛出异常.或者什么都不做
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

此方法使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await()方法返回,当前线程一定获取了 Condition相关联的锁

查看Node node = addConditionWaiter(); 方法看看

    private Node addConditionWaiter() {
        Node t = lastWaiter;
        // 如果 lastWaiter 不等于空并且 waitStatus 不等于 CONDITION 时,把这个节点从链表中移除
        if (t != null && t.waitStatus != Node.CONDITION) {
            unlinkCancelledWaiters();
            t = lastWaiter;
        }
        //构建一个 Node,waitStatus=CONDITION。 这里的链表是一个单向的,所以相比 AQS 来说会简单很多
        Node node = new Node(Thread.currentThread(), Node.CONDITION);
        if (t == null)
            firstWaiter = node;
        else
            t.nextWaiter = node;
        lastWaiter = node;
        return node;
    }

这个方法的主要作用是把当前线程封装成 Node,添加到等待队列。这里的队列不再是双向链表,而是单向链表

执行完 addConditionWaiter 这个方法之后,就会产生一个这样的 condition 队

image.png

再看看fullyRelease(node); 方法

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState(); //获得重入的次数
            if (release(savedState)) {//释放锁并且唤醒下一个同步队列中的线程
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

fullRelease,就是彻底的释放锁,什么叫彻底呢,就是如果当前锁存在多次重入,那么在这个方法中只需要释放一次就会把所有的重入次数归零。

此时,同步队列会触发锁的释放和重新竞争,ThreadB 获得了锁


image.png

接着看 while (!isOnSyncQueue(node)) 方法

final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev
                == null)
            return false;
        if (node.next != null) // If has successor, it must
            be on queue
        return true;

        return findNodeFromTail(node);
    }

此方法判断当前节点是否在同步队列中,返回 false 表示不在,返回 true 表示在
如果不在 AQS 同步队列,说明当前节点没有唤醒去争抢同步锁,所以需要把当前线程阻塞起
来,直到其他的线程调用 signal 唤醒
如果在 AQS 同步队列,意味着它需要去竞争同步锁去获得执行程序执行权限
为什么要做这个判断呢?原因是在 condition 队列中的节点会重新加入到 AQS 队列去竞争锁。也就是当调用 signal 的时候,会把当前节点从 condition 队列转移到 AQS 队列
➢ 思考一下,基于现在的逻辑结构。如何去判断 ThreadA 这个节点是否存在于 AQS 队列中呢?

  1. 如果 ThreadA 的 waitStatus 的状态为 CONDITION,说明它存在于 condition 队列中,不在 AQS 队列。因为 AQS 队列的状态一定不可能有 CONDITION
  2. 如果 node.prev 为空,说明也不存在于 AQS 队列,原因是 prev=null 在 AQS 队列中只有一种可能性,就是它是 head 节点,head 节点意味着它是获得锁的节点。
  3. 如果 node.next 不等于空,说明一定存在于 AQS 队列中,因为只有 AQS 队列才会存在next 和 prev 的关系
  4. findNodeFromTail,表示从 tail 节点往前扫描 AQS 队列,一旦发现 AQS 队列的节点和当前节点相等,说明节点一定存在于 AQS 队列中

Condition.signal

await 方法会阻塞 ThreadA,然后 ThreadB 抢占到了锁获得了执行权限,这个时候在 ThreadB中调用了 Condition 的 signal()方法,将会唤醒在等待队列中节点

    public final void signal() {
        if (!isHeldExclusively()) //先判断当前线程是否获得了锁,这个判断比较简单,直接
            用获得锁的线程和当前线程相比即可
        throw new IllegalMonitorStateException();
        Node first = firstWaiter; // 拿到 Condition 队列上第一个节点
        if (first != null)
            doSignal(first);
    }
  private void doSignal(Node first) {
        do {
            //从 Condition 队列中删除 first 节点
            if ( (firstWaiter = first.nextWaiter) == null)
                lastWaiter = null; // 将 next 节点设置成 null
            first.nextWaiter = null;
        } while (!transferForSignal(first) &&
                (first = firstWaiter) != null);
    }

对 condition 队列中从首部开始的第一个 condition 状态的节点,执行 transferForSignal 操作,将 node 从 condition 队列中转换到 AQS 队列中,同时修改 AQS 队列中原先尾节点的状态

接着看transferForSignal方法

 final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//更新节点的状态为0,如果更新失败,只有一种可能就是节点被 CANCELLED 了
            return false;
        Node p = enq(node);//调用 enq,把当前节点添加到 AQS 队列。并且返回返回按当前节点的上一个节点,也就是原 tail 节点
        int ws = p.waitStatus;
        // 如果上一个节点的状态被取消了, 或者尝试设置上一个节点的状态为 SIGNAL 失败了(SIGNAL 表示它的 next 节点需要停止阻塞),
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread); // 唤醒节点上的线程.
        return true; //如果 node 的 prev 节点已经是 signal 状态,那么被阻塞的 ThreadA 的唤醒工作由 AQS 队列来完成
    }

该方法先是 CAS 修改了节点状态,如果成功,就将这个节点放到 AQS 队列中,然后唤醒这个节点上的线程。此时,那个节点就会在 await 方法中苏醒

执行完 doSignal 以后,会把 condition 队列中的节点转移到 aqs 队列上,逻辑结构图如下,这个时候会判断 ThreadA 的 prev 节点也就是 head 节点的 waitStatus,如果大于 0 或者设置 SIGNAL 失败,表示节点被设置成了 CANCELLED 状态。这个时候会唤醒 ThreadA 这个线程。否则就基于 AQS 队列的机制来唤醒,也就是等到 ThreadB 释放锁之后来唤醒ThreadA


image.png

被阻塞的线程唤醒后的逻辑

前面在分析 await 方法时,线程会被阻塞。而通过 signal 被唤醒之后又继续回到上次执行的
checkInterruptWhileWaiting方法那里。

public final void await() throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            //唤醒之后接着往下走
            if ((interruptMode =
                    checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        if (acquireQueued(node, savedState) &&
                interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null) // clean up if
            cancelled
        unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
    }

checkInterruptWhileWaiting 这个方法是干嘛呢?其实从名字就可以看出来,就是 ThreadA在 condition 队列被阻塞的过程中,有没有被其他线程触发过中断请求

 private int checkInterruptWhileWaiting(Node node) {
        return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE :
                        REINTERRUPT) : 0;
    }

    final boolean transferAfterCancelledWait(Node node) {
        //使用 cas 修改节点状态,如果还能修改成功,说明线程被中断时, signal 还没有被调用。
        // 这里有一个知识点,就是线程被唤醒,并不一定是在 java 层面执行了
        //locksupport.unpark,也可能是调用了线程的 interrupt()方法,这
        //个方法会更新一个中断标识,并且会唤醒处于阻塞状态下的线程。
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node); //如果 cas 成功,则把 node 添加到 AQS 队列
            return true;
        }
        //如果 cas 失败,则判断当前 node 是否已经在 AQS 队列上,如果不在,则让给其他线程执行
        //当 node 被触发了 signal 方法时,node 就会被加到 aqs 队列上
        while (!isOnSyncQueue(node))//循环检测 node 是否已经成功添加到 AQS 队列中。如果没有,则通过 yield,
            Thread.yield();
        return false;
    }

如果当前线程被中断,则调用 transferAfterCancelledWait 方法判断后续的处理应该是抛出
InterruptedException 还是重新中断。
这里需要注意的地方是,如果第一次 CAS 失败了,则不能判断当前线程是先进行了中断还是先进行了 signal 方法的调用,可能是先执行了 signal 然后中断,也可能是先执行了中断,后执行了 signal,当然,这两个操作肯定是发生在 CAS 之前。这时需要做的就是等待当前线程的 node 被添加到 AQS 队列后,也就是 enq 方法返回后,返回 false 告诉checkInterruptWhileWaiting 方法返回 REINTERRUPT(1),后续进行重新中断。简单来说,该方法的返回值代表当前线程是否在 park 的时候被中断唤醒,如果为 true 表示中断在 signal 调用之前,signal 还未执行,那么这个时候会根据 await 的语义,在 await 时遇到中断需要抛出 interruptedException,返回 true 就是告诉checkInterruptWhileWaiting返回 THROW_IE(-1)。如果返回 false,否则表示 signal 已经执行过了,只需要重新响应中断即可

再继续走acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
 boolean failed = true;
 try {
    boolean interrupted = false;
    for (;;) {
       final Node p = node.predecessor();//获取当前节点的 prev 节点
       if (p == head && tryAcquire(arg)) {//如果是 head 节点,说明有资格去争抢锁
          setHead(node);//获取锁成功,也就是ThreadA 已经释放了锁,然后设置 head 为 ThreadB 获得执行权限
          p.next = null; //把原 head 节点从链表中移除
          failed = false;
          return interrupted;
      }
      //ThreadA 可能还没释放锁,使得 ThreadB 在执行 tryAcquire 时会返回 false
      if (shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt())
          interrupted = true; //并且返回当前线程在等待过程中有没有中断过。
   }
 } finally {
    if (failed)
      cancelAcquire(node);
    }
}

这个方法在讲上一篇 ReentrantLock 分析过,是当前被唤醒的节点 ThreadA 去抢占同步锁。并且要恢复到原本的重入次数状态。调用完这个方法之后,AQS 队列的状态如下,将 head 节点的 waitStatus 设置为-1,Signal 状态。


image.png

最后看看最下面的reportInterruptAfterWait方法

  private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
        if (interruptMode == THROW_IE)
            throw new InterruptedException();
        else if (interruptMode == REINTERRUPT)
            selfInterrupt();
    }

根据 checkInterruptWhileWaiting 方法返回的中断标识来进行中断上报。
如果是 THROW_IE,则抛出中断异常
如果是 REINTERRUPT,则重新响应中断

await 和 signal 的总结

image.png

线程 awaitThread 先通过lock.lock()方法获取锁成功后调用了 condition.await 方法进入等待队列,而另一个线程signalThread 通过 lock.lock()方法获取锁成功后调用了 condition.signal 或者 signalAll 方法,使得线程 awaitThread 能够有机会移入到同步队列中,当其他线程释放 lock 后使得线程 awaitThread 能够有机会获取 lock,从而使得线程 awaitThread 能够从 await 方法中退出执行后续操作。如果 awaitThread 获取 lock 失败会直接进入到同步队列。

阻塞:await()方法中,在线程释放锁资源之后,如果节点不在 AQS 等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁
释放:signal()后,节点会从 condition 队列移动到 AQS 等待队列,则进入正常锁的获取流程

——学自咕泡学院

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,539评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,911评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,337评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,723评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,795评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,762评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,742评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,508评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,954评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,247评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,404评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,104评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,736评论 3 324
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,352评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,557评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,371评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,292评论 2 352

推荐阅读更多精彩内容