8. 并发终结之Condition条件变量

Condition接口作为wait/notify的替代品来实现等待/通知,很好的解决了过早唤醒的问题,并且boolean await(long time, TimeUnit unit)解决了public final native void wait(long timeout)方法不能断定是超时返回还是被唤醒的问题。

方法摘要

public interface Condition {
    //使当前线程进入等待直到被唤醒或者中断
    //调用await方法会释放lock锁,并且当前线程进入WAITING状态,直到
    //1. 别的线程调用了signal并且当前线程被选择唤醒
    //2. 别的线程调用了signalAll方法
    //3. 别的线程中断该等待线程,并且当前线程能够响应线程中断
    //4. 发生虚假唤醒(所以await与Object.wait一样需要放在while循环)
    //并且被唤醒之后在await方法返回前,当前线程需要再次申请锁
    void await() throws InterruptedException;
    //大体与await方法类似,只是不会被中断
    //所以进入WAITING状态后只有1,2,4会导致awaitUninterruptibly()结束
    void awaitUninterruptibly();
    //大体与await方法类似,只是等待会超时
    //所以进入WAITING状态后除了1,2,3,4之外,还有
    //5. 规定的等待时间结束
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    //类似于awaitNanos(unit.toNanos(time)) > 0
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    //同样是有过期时间的等待,只是这个是设置了固定的deadline
    boolean awaitUntil(Date deadline) throws InterruptedException;
    //唤醒一个等待线程
    //唤醒跟当前Condition相关的众多等待线程中的一个,
    //且被唤醒的线程在await方法结束前需要重新申请lock
    void signal();
    //唤醒所有的等待线程,每个线程都会去申请锁
    void signalAll();
}

Condition.await/signal的执行线程需要持有创建相应Condition变量的Lock显式锁,对保护条件的判断以及Condition.await的调用也要放在循环语句中,并且该循环语句与目标动作的执行都在一个显式锁引导的临界区内。这都是为了避免虚假唤醒信号丢失等问题。
Condition.wait与Object.wait一样,使得当前线程进入等待并且释放了相应的lock,并且此时await()的调用并未返回
被唤醒的线程继续运行的时候需要再次申请相应的Lock,再次获得显式锁成功之后,Condition.await方法才会返回。

代码示例

class AwaitClass{
    private static final Lock lock = new ReentrantLock();
    private static final Condition condition = lock.newCondition();
    public void awaitMethod(){
        lock.lock();
        try{
            while(保护条件不成立){
                condition.await();
            }
            //在临界区之内执行目标动作
            doActionMethod();
        }finally {
            lock.unlock();
        }
    }
    public void signalMethod(){
        lock.lock();
        try{
            //更新共享变量使得保护条件成立
            changeSharedState();
            //唤醒等待线程,尽量在临界区最边上
            condition.signal();
        }finally {
            lock.unlock();
        }
    }
}

解决过早唤醒的问题

image.png

代码里我们需要注意的是保护条件与Condition之间的关系,让使用不同的保护条件等待线程调用不同的Condition.await()方法来实现其等待;并让通知线程在更新了共享变量之后,仅调用涉及了这些共享变量的保护条件对应的Condition.signal/signalAll()方法来实现通知。

Condition接口有一个具体实现ConditionObject是在AbstractQueuedSynchronizer里面。
ConditionObject里面有两个Node类型的变量

private transient Node firstWaiter;
private transient Node lastWaiter;

而Node是AQS的一个静态内部类,Node是一种CLH同步队列(它是一个自旋锁spinlocks,能确保无饥饿性,提供先来先服务的公平性。同时它也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程只在本地变量上自旋,它不断轮询前驱的状态,如果发现前驱释放了锁就结束自旋。)

static final class Node {
    //共享模式下的等待标记
    static final Node SHARED = new Node();
     //独占模式下的等待标记
    static final Node EXCLUSIVE = null;
    //waitStatus值,标识线程已经被cancelled
    static final int CANCELLED =  1;
    //waitStatus值,标识后继Node的线程需要唤醒
    static final int SIGNAL    = -1;
    //waitStatus值,标识当前Node的线程正在条件队列里等待
    static final int CONDITION = -2;
    //waitStatus值,标识下一个acquireShared应该无条件传播
    static final int PROPAGATE = -3;
    //SIGNAL:此节点的后继节点已经是(或者即将)被阻塞(通过park方法),
    //       所以当前节点在它要释放锁或者被取消Acquire的时候唤醒unpark它的后继节点,
    //       为了避免竞争,申请锁的方法必须先标识他们需要signal,然后atomic acquire
    //CANCELLED:一个节点由于timeout或者Interrupt而成为CANCELLED状态。
    //       这个状态的节点永远不会变,特别是,一个线程在Cancelled的node里就再也不会阻塞。
    //CONDITION:当前节点正在条件等待队列(condition queue),它不是用作同步节点(sync queue)
    //       直到被transferred,这时候Status会被设置成0
    //PROPAGATE:应该将releaseShared传播到其他节点,仅仅在doReleaseShared里面对头结点进行设置
    //       来确保传播继续进行
    //0:除了上面的集中Status
    volatile int waitStatus;
    /**
     链接到当前节点/线程所依赖的前继节点,用于检查waitStatus。 在入队期间分配,
     并且(出于GC的考虑)仅在出队时退出时设成null。 
     另外,取消前任节点的时候,我们会短路直到找到一个会一直存在的且不是cancelled的节点,
     因为头节点永远不会被取消:一个节点称为头结点只有成功acquire。
     一个被取消的线程永远不会成功acquire,而且一个线程只能取消自身,而不取消其他任何节点。
     */
    volatile Node prev;
     /**
     在当前节点/线程释放锁被unpark,链接到当前节点/线程的后继节点。 
     入队时分配,在绕过cancelled的那些前继节点的时候调整,并且在出队列的时候设置成null
     */
    volatile Node next;
    /**
     *保存当前入节点相关联的线程 
     */
    volatile Thread thread;
    /**
     * 指向条件队列里下一个等待的node,或者对于share mode会放一个特殊的值
     */
    Node nextWaiter;
    /**
     * 如果是shared mode,则返回true
     */
    final boolean isShared() {
        return nextWaiter == SHARED;
    }
    /**
     * 返回当前节点的前继节点
     */
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }
    Node() {    // Used to establish initial head or SHARED marker
    }
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

代码解析await()

1.If current thread is interrupted, throw InterruptedException.
2.Save lock state returned by getState.
3.Invoke release with saved state as argument, throwing IllegalMonitorStateException if it fails.
4.Block until signalled or interrupted.
5.Reacquire by invoking specialized version of acquire with saved state as argument.
6.If interrupted while blocked in step 4, throw InterruptedException.

Java doc上明确说明await方法主要有6个步骤

public final void await() throws InterruptedException {
      //先做Interrupt的检查,如果当前线程已经被中断,则抛出InterruptedException
      if (Thread.interrupted())
          throw new InterruptedException();
      //添加一个新的Node(CONDITION状态)到condition queue
      Node node = addConditionWaiter();
      //释放当前线程所持有的锁,不管重入多少次,并唤醒该锁上的其他等待线程
      int savedState = fullyRelease(node);
      int interruptMode = 0;
      //检查当前node是不是在sync queue,如果不是就会一直挂起当前线程
      while (!isOnSyncQueue(node)) {
          //不在sync queue,则会挂起当前线程,当前await方法并没有返回
          LockSupport.park(this);
          //这一步线程被唤醒之后,需要检查当前线程是不是被中断,如果被中断则跳出循环
          //   且被中断的node会尝试进入sync queue
          if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
              break;
      }
      //acquireQueued开始尝试获取锁(只有前继节点是head的情况下),
      //  如果失败则阻塞当前线程,直到获取成功
      if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
          interruptMode = REINTERRUPT;
      if (node.nextWaiter != null) // clean up if cancelled
          unlinkCancelledWaiters();
      if (interruptMode != 0)
          //acquireQueued是不响应中断的,这里会响应中断
          reportInterruptAfterWait(interruptMode);
  }

/**
 * 添加一个新的waiter到等待队列,设置ConditionObject的firstWaiter和lastWaiter
 */
private Node addConditionWaiter() {
    //拿到等待队列里的最后一个node
    Node t = lastWaiter;
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {
        //将condition queue里面cancelled的node移除
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    //新建一个CONDITION的node并添加到condition queue中
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}
//释放当前线程所持有的锁,并唤醒其他阻塞的线程,释放锁异常时cancel这个node
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}
//判断这个node是不是转移到sync queue了
final boolean isOnSyncQueue(Node node) {
    //如果node的状态时Condition或者node的前继节点是空的,则表明不在sync queue
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果这个节点的后继节点不为空,说明有线程争抢锁失败被阻塞,则当前节点实在sync queue
    if (node.next != null) // If has successor, it must be on queue
        return true;
    //如果上面条件都不满足,就要从sync queue的最后一个开始check,是不是当前node在queue里面
    return findNodeFromTail(node);
}
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT =  1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE    = -1;

/**
 * 检查当前线程是否被中断,
 * 如果重新入sync queue成功,则表示是唤醒之前被中断,则返回THROW_IE(-1)
 * 如果唤醒之后被中断则返回REINTERRUPT(1)
 * 如果没有被中断则返回0
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}
//在唤醒之后转移node到sync queue
final boolean transferAfterCancelledWait(Node node) {
    //CAS操作,如果当前node的状态时CONDITION,则设置成0,并且enq()到sync queue,返回true
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }
    /*
     *上面CAS设值失败,这种情况很少见,所以这边就自旋一直到这个node加入sync queue
     */
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}
//对于已经在sync queue的线程开始尝试获取锁
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            //拿到当前节点的前继节点
            final Node p = node.predecessor();
            //只有前继节点是head,才开始tryAcquire(这里就涉及Fair和NonFair)尝试获取锁
            if (p == head && tryAcquire(arg)) {
                //尝试获取锁成功,则设置head = node,设置node的prev=null,thread=null
                setHead(node);
                //将原来的head节点踢出sync queue
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //如果当前节点不是head,或者尝试获取锁失败,就看前继节点的状态是不是SIGNAL
            //  如果是SIGNAL,那么就要挂起当前线程,等待unlock释放锁唤醒
            //  如果不是SIGNAL,那么就一直循环直到变成head并且获取锁成功,
            //      或者前继节点是SIGNAL,park挂起当前线程
            if (shouldParkAfterFailedAcquire(p, node) &&
                //如果shouldParkAfterFailedAcquire返回true,
                //  则挂起当前线程,等待head释放唤醒或者被中断唤醒
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
//根据前继节点的状态判断是不是要挂起当前线程
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        /*
         * 如果前继节点已经是signal,则node节点的线程可以阻塞,等待前继节点释放锁并唤醒
         */
        return true;
    if (ws > 0) {
        /*
         * 如果前继节点被cancelled,说明前继节点等待超时或者相应中断而取消了自己,
           就需要绕过这些cancelled的节点,找到waitStatus<=0的节点,
           并且继续for循环尝试获取锁,直到前继节点是SIGNAL,然后park当前线程
         */
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * 到了这说明waitStatus要么是0要么是PROPAGARE,must be 0 or PROPAGATE,
           CAS设置waitStatus为signal,并且继续for循环尝试获取锁,
            直到前继节点是SIGNAL,然后park当前线程
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

await方法在释放所持有的锁之后就park挂起当前线程,等待signal唤醒。

代码解析signal()

public final void signal() {
    //检查当前线程是不是独占锁的拥有者
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //拿到condition queue里面的第一个等待者
        doSignal(first);
}
//自旋知道唤醒一个condition queue的node
private void doSignal(Node first) {
    do {
         //将firstWaiter往后移动,将nextWaiter设置成firstWaiter
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        //将first提出等待队列
        first.nextWaiter = null;
        //transferForSignal唤醒first,如果失败则first去往下一个节点进行循环
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
//将node从condition queue转移到sync queue
final boolean transferForSignal(Node node) {
    /*
     * 如果这里CAS更新node的waitStatus失败,说明node已经不是Condition状态
        -- 参考await方法里checkInterruptWhileWaiting,
            如果线程被中断这会更新Condition并enq到sync queue
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;

    /*
     * 这边Condition状态更新成功之后,开始enq压入sync queue。
        --参考await方法里while(!isOnSyncQueue(node)),那边会一直循环(如果没有中断的话)
          直到这边enq到sync queue,while循环才会退出
       另外enq(node)返回的是node的prev继节点
     */
    Node p = enq(node);//返回的p是node的前继节点
    int ws = p.waitStatus;
    //如果前继节点是cancelled或者设置前继节点的状态到Signal失败,则唤醒当前线程(提升性能)
    //  参考await方法while(!isOnSyncQueue(node))退出之后进入acquireQueued()方法
    //    acquireQueued里面如果尝试获取锁失败会调用shouldParkAfterFailedAcquire()
    //    这里面会看node的前继节点如果是SIGNAL,则会park当前线程;
    //    如果不是SIGNAL,且不是cancelled,会尝试设置前继节点为SIGNAL,
    //    并且不会park当前线程,而是继续for循环尝试获取锁,直到前继节点是SIGNAL,然后park当前线程
    //所以这里如果设置前继节点状态SIGNAL失败,就唤醒node的线程,让它也进入for循环开始尝试获取锁
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        //为什么要唤醒当前线程?
        //  前继节点不是SIGNAL,就需要唤醒当前线程去尝试获取锁(提升性能)
        LockSupport.unpark(node.thread);
    return true;
}

以上分析完Condition里面的await()和signal()的主要方法。
在signal之后,唤醒线程会进行unlock,unlock方法会拿到sync queue的head,并唤醒head的后继节点(因为await方法里面acquireQueued()在前继节点是head,并且获取锁成功后,会将当前node设为head,并继续执行await()的后续代码,所以unlock唤醒的是head的后继节点)。
后续将分析基于AQS的ReentrantLock的实现。
在分析完AQS之后,我们再看看sync queue和condition queue是怎么合作来完成lock, await, unlock, lock ,signal, unlock的。


lock-await过程

我们来看lock-await这个过程,假设lock2-thread线程里面,会判断共享条件不成立,然后调用await():

  1. 首先lock2-thread对应的Node2节点,在unlock被唤醒之后,会重新进入acquireQueued的for循环,判断前继节点Node1是不是head,现在是,那么就开始tryAcquire尝试获取锁,获取锁成功之后,将Node2设置为head节点。
  2. 获取锁并设置head节点之后,开始进入await方法,会新增一个Node2,waitStatus是CONDITION,然后释放锁,并park当前线程,等待signal。
  3. 上一步释放锁之后,会唤醒lock3-thread来尝试获取锁。假设有lock4-thead提供condition.signal方法, 因为是非公平锁,lock4-thread和lock3-thread会争抢锁,如果lock4-thread争抢成功,那么lock3-thread还是继续在sync queue中等待唤醒;lock4-thread获取到锁之后会进行signal,拿到condition queue中firstWaiter,即Node2,将它的状态变成0,并加入sync queue的末端,并设置它的前继节点状态到SIGNAL,此时firstWaiter=lastWaiter=null;


    lock-signal
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335