ReentrantLock的Condition源码解析

前言

这一篇文章,想和大家分享一下Condition的源码学习过程,Condition的应用,其实是很简单的,相信大家在项目中或者demo中或多或少都用过。最不济,在应付面试的时候,相信也有不少小伙伴背过不少的面试题。话不多说,水平有限,文章中有错误的地方也请不吝指正,共同进步。

应用场景

ReentrantLock的Condition的设计场景,我在上一篇博客也分享过,建议先移步上一篇看一下ReentrantLock, Condition只是其中的一个应用。

example:

先上一段代码,看一下如何使用

@Slf4j
public class ReentrantLockTest {
    private static ReentrantLock lock = new ReentrantLock();
    private static boolean hasSmoke = false;
    private static boolean hasSnacks = false;

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5, 5, 1,
                TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
        executor.setCorePoolSize(20);
        Condition smoke = lock.newCondition();
        Condition snacks = lock.newCondition();
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-one,没有烟,干活没动力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-one,有烟了,嘬一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-two,没有烟,干活没动力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-two,有烟了,嘬一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-one,没有零食吃,干活没力气");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-one, 有零食了,吃一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-two,没有零食吃,干活没力气");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-two, 有零食了,吃一口.");
            } finally {
                lock.unlock();
            }
        });
        executor.submit(()->{
            lock.lock();
            try {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                hasSmoke = true;
                log.info("烟来喽");
                smoke.signalAll();
                hasSnacks = true;
                log.info("零食来喽");
                snacks.signalAll();
            }finally {
                lock.unlock();
            }
        });
        executor.shutdown();
    }
}
10:48:05.563 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,没有烟,干活没动力
10:48:05.565 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,没有烟,干活没动力
10:48:05.565 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,没有零食吃,干活没力气
10:48:05.565 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,没有零食吃,干活没力气
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 烟来喽
10:48:07.568 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食来喽
10:48:07.569 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-one,有烟了,嘬一口.
10:48:07.569 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - 我是boy-two,有烟了,嘬一口.
10:48:07.569 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-one, 有零食了,吃一口.
10:48:07.569 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - 我是girl-two, 有零食了,吃一口.

这个场景中,男生需要抽烟,女生需要吃零食,不然就会拒绝干活。当烟来了或者零食来了的时候,大家就停止摸鱼,开始干活。如果只给了零食,那么男生就会无限等待,同理,只给了烟, 那么女生就会一直摸鱼下去,具体的例子大家可以自己跑一跑。
例子中我们可以看到,有4个员工都在摸鱼,但是我们可以根据需求把这4个员工分为两部分,一部分是需要抽烟的,一部分是需要吃零食的,如果我们有零食,我们就可以唤醒需要吃零食的人,烟同理。这样就做到了部分唤醒。

分析

我们大致分析一下原理,我们首先使用lock.newCondition()方法new了两个Condition,每个Condition对象都维护了一个队列。然后boy-one和boy-two调用smoke.await(),会进到smoke对象维护的队列里,同理,girl-one和girl-two会进入到snacks对象的队列里。当我们调用smoke.signalAll()的时候,smoke对象就会把boy-one和boy-two扔到aqs的队列里,当我们调用snacks.signalAll()的时候,snacks对象就会把girl-one和girl-two扔到aqs的队列里。还记不记得之前aqs博文里的时候,node节点有个状态。int CONDITION = -2; 没错,就是给Condition用的。
剩下的线程唤醒操作就交给了aqs。
上面的分析大致看懂了以后,我们带着这个分析去看一下源码:

源码

newCondition()

// 通过调用链,大家已经很清楚了吧,就是返回了一个ConditionObject对象,
// 这个对象里维护了一个node节点的链表,这个node节点不用猜,就是aqs封装的node节点。
public Condition newCondition() {
    return sync.newCondition();
}
final ConditionObject newCondition() {
    return new ConditionObject();
}
public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

    /**
           * Creates a new {@code ConditionObject} instance.
           */
    public ConditionObject() { }
}

await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())
      throw new InterruptedException();
    // 这个方法贴在下面了,官方注释解释的简洁明了,就是添加节点到队列尾部
    Node node = addConditionWaiter();
    // 释放资源当前线程拿到的锁资源,下面详细解释
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    // 调用isOnSyncQueue判断是否在aqs队列里,这个下面会详细解释。
    while (!isOnSyncQueue(node)) {
      // 不在aqs队列,就调用park方法阻塞,交出锁。
      LockSupport.park(this);
      // 被唤醒后,首先去检查中断,然后退出循环
      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    }
    // 检查完中断去获取锁,然后做一些校验
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
      interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
      unlinkCancelledWaiters();
    if (interruptMode != 0)
      reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
      // 
      unlinkCancelledWaiters();
      t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
      firstWaiter = node;
    else
      t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
// 这个方法会释放当前节点占用的所有资源,注意是所有。
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
      // 直接拿到现在的资源数
      int savedState = getState();
      // 然后释放全部
      if (release(savedState)) {
        failed = false;
        return savedState;
      } else {
        throw new IllegalMonitorStateException();
      }
    } finally {
      // 注意,这里会把node节点的状态置为CANCELLED
      if (failed)
        node.waitStatus = Node.CANCELLED;
    }
}

final boolean isOnSyncQueue(Node node) {
    // 如果node节点是在CONDITION状态,肯定不在aqs队列。不懂的看下面的transferForSignal()方法
    // 如果node节点的prev节点为null,肯定不在aqs队列。这个问题我放到最后单独讲
    if (node.waitStatus == Node.CONDITION || node.prev == null)
      return false;
    if (node.next != null) // If has successor, it must be on queue
      return true;
    /*
           * node.prev can be non-null, but not yet on queue because
           * the CAS to place it on queue can fail. So we have to
           * traverse from tail to make sure it actually made it.  It
           * will always be near the tail in calls to this method, and
           * unless the CAS failed (which is unlikely), it will be
           * there, so we hardly ever traverse much.
           */
    // 这是个兜底逻辑。就是遍历aqs节点,看node节点是否存在。
    return findNodeFromTail(node);
}

signalAll()

public final void signalAll() {
    // 调用signalAll()会首先校验调用signalAll()的线程是不是当前锁持有线程。不是会抛异常
    if (!isHeldExclusively())
      throw new IllegalMonitorStateException();
    // 从头开始处理队列
    Node first = firstWaiter;
    if (first != null)
      doSignalAll(first);
}
/**
 * Removes and transfers all nodes.
 * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
    // 官方注释总是简单明了。。
    lastWaiter = firstWaiter = null;
    do {
      Node next = first.nextWaiter;
      first.nextWaiter = null;
      transferForSignal(first);
      first = next;
    } while (first != null);
}
final boolean transferForSignal(Node node) {
  /*
   * If cannot change waitStatus, the node has been cancelled.
   */
  // 把节点从CONDITION设置为0
  if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
    return false;

  /*
   * Splice onto queue and try to set waitStatus of predecessor to
   * indicate that thread is (probably) waiting. If cancelled or
   * attempt to set waitStatus fails, wake up to resync (in which
   * case the waitStatus can be transiently and harmlessly wrong).
   */
    // 熟悉的方法,不再讲
    Node p = enq(node);
    int ws = p.waitStatus;
    // 把节点从0设置为SIGNAL
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
      LockSupport.unpark(node.thread);
    return true;
}

上述这三个方法,就是把Condition对象的队列remove,然后add到aqs的队列里。需要注意一点,add到aqs队列,并不是被唤醒!!!锁一次只能被一个线程持有,所以还是逐个被唤醒的,只有调用signalAll()的线程在释放锁之后,才去唤醒next节点。至此,Condition的源码已经结束了。

流程分析:

我们用一个表格简单分析一下流程,这里我们不考虑指令优化,前面的代码总是优于后面的代码先执行。本来想画个动态图的,奈何不会AE, 凑合看吧。有好用的免费的画图软件,也请@我。

boy-one boy-two girl-one girl-two 饲养员 队列情况
lock()拿到锁
调用await()
把自己添加到smoke队列 smoke:boy-one
释放锁,休眠
lock()拿到锁
调用await()
把自己添加到smoke队列 smoke:boy-one-->boy-two
释放锁,休眠
lock()拿到锁
调用await()
把自己添加到snacks队列 smoke:boy-one-->boy-two snacks:girl-one
释放锁,休眠
lock()拿到锁
调用await()
把自己添加到snacks队列 smoke:boy-one-->boy-two snacks: girl-one-->girl-two
释放锁,休眠
lock()拿到锁
调用smoke.signalAll() smoke:snacks:girl-one-->girl-two aqs:boy-one-->boy-two
调用snacks.signalAll() smoke:snacks: aqs:boy-one-->boy-two-->girl-one-->girl-two
释放锁
aqs唤醒boy-one
被唤醒,拿到锁
执行逻辑
释放锁,唤醒下一个
被唤醒,拿到锁
执行逻辑
释放锁,唤醒下一个
被唤醒,拿到锁
执行逻辑
释放锁,唤醒下一个
被唤醒,拿到锁
执行逻辑
释放锁,唤醒下一个
发现没有下一个,结束。

我们可以看到,整个流程都是围绕一把锁展开的,那么如果我们不加锁,也就是我们把lock代码都去掉,流程又会怎么样呢?

@Slf4j
public class ReentrantLockTest {
    private static ReentrantLock lock = new ReentrantLock();
    private static boolean hasSmoke = false;
    private static boolean hasSnacks = false;

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
                5, 5, 1,
                TimeUnit.MINUTES, new ArrayBlockingQueue<>(10));
        executor.setCorePoolSize(20);
        Condition smoke = lock.newCondition();
        Condition snacks = lock.newCondition();
        executor.submit(()->{
            
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-one,没有烟,干活没动力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-one,有烟了,嘬一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSmoke) {
                    log.info(Thread.currentThread().getName() + "我是boy-two,没有烟,干活没动力");
                    try {
                        smoke.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是boy-two,有烟了,嘬一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-one,没有零食吃,干活没力气");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-one, 有零食了,吃一口.");
            
        });
        executor.submit(()->{
            
                while (!hasSnacks) {
                    log.info(Thread.currentThread().getName() + "我是girl-two,没有零食吃,干活没力气");
                    try {
                        snacks.await();
                    } catch (InterruptedException e) {
                    }
                }
                log.info("我是girl-two, 有零食了,吃一口.");
           
        });
        executor.submit(()->{
            lock.lock();
            try {
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                }
                hasSmoke = true;
                log.info("烟来喽");
                smoke.signalAll();
                hasSnacks = true;
                log.info("零食来喽");
                snacks.signalAll();
            }finally {
                lock.unlock();
            }
        });
        executor.shutdown();
    }
}
11:52:51.473 [pool-1-thread-4] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-4我是girl-two,没有零食吃,干活没力气
11:52:51.473 [pool-1-thread-1] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-1我是boy-one,没有烟,干活没动力
11:52:51.473 [pool-1-thread-3] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-3我是girl-one,没有零食吃,干活没力气
11:52:51.473 [pool-1-thread-2] INFO com.yameng.concurrent.ReentrantLockTest - pool-1-thread-2我是boy-two,没有烟,干活没动力
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 烟来喽
11:52:53.473 [pool-1-thread-5] INFO com.yameng.concurrent.ReentrantLockTest - 零食来喽

在调用signalAll()方法的时候,会强制校验当前线程是否持有锁,否则会抛出异常,所以饲养员的锁必须加,boy和girl的锁我们去掉了。

public final void signalAll() {
  if (!isHeldExclusively())
  throw new IllegalMonitorStateException();
  Node first = firstWaiter;
  if (first != null)
  doSignalAll(first);
}

我们看到运行结果里,boy和girl都不干活了,这是咋回事呢?我们分析一下。

当boy和girl调用smoke或者snacks的await()方法,会把自己添加到相应的队列。然后调用fullyRelease(node)释放锁,释放锁的方法里会校验当前持有线程,不是的话会抛异常

protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    // 这里抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
      throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
      free = true;
      setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

然后在fullyRelease(Node node)方法里,finally的代码块会执行,node节点ws状态置为CANCELLED。

饲养员在调用signalAll()方法,把队列挪到aqs队列的过程中在doSignalAll(Node first)里的transferForSignal()方法中

if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

会首先用cas修改node节点状态,由于节点是CANCELLED,所以cas会失败,失败就直接返回。节点从Condition队列中移除,加入aqs队列失败。因此,这个节点就灰飞烟灭了。

遗留问题

  1. 如果node节点的prev节点为null,肯定不在aqs队列
    为什么这么说呢,Condition维护的队列,是单链表,用nextWaiter指向下个节点
    Aqs维护的队列,是双向链表,用prev和next指向前后节点,所以Condition维护的队列的prev必为null,而Aqs维护的队列的真实node节点的prev必不为null,因为初始化队列的时候会在头部放一个dummy node
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容