深入理解Condition

我们知道在线程的同步时可以使一个线程阻塞而等待一个信号,同时放弃锁使其他线程可以能竞争到锁。在synchronized中我们可以使用Object的wait()和notify方法实现这种等待和唤醒,但是在Lock中怎么实现这种wait和notify呢?答案是Condition,学习Condition主要是为了方便以后学习blockqueue和concurrenthashmap的源码,同时也进一步理解ReentrantLock。
首先,Condition是一个接口,主要有以下的方法:

Paste_Image.png

截取一段Condition的注释,也是它的用法:

 *class BoundedBuffer {
 *   <b>final Lock lock = new ReentrantLock();</b>
 *   final Condition notFull  = <b>lock.newCondition(); </b>
 *   final Condition notEmpty = <b>lock.newCondition(); </b>
 *
 *   final Object[] items = new Object[100];
 *   int putptr, takeptr, count;
 *
 *   public void put(Object x) throws InterruptedException {
 *     <b>lock.lock();
 *     try {</b>
 *       while (count == items.length)
 *         <b>notFull.await();</b>
 *       items[putptr] = x;
 *       if (++putptr == items.length) putptr = 0;
 *       ++count;
 *       <b>notEmpty.signal();</b>
 *     <b>} finally {
 *       lock.unlock();
 *     }</b>
 *   }
 *
 *   public Object take() throws InterruptedException {
 *     <b>lock.lock();
 *     try {</b>
 *       while (count == 0)
 *         <b>notEmpty.await();</b>
 *       Object x = items[takeptr];
 *       if (++takeptr == items.length) takeptr = 0;
 *       --count;
 *       <b>notFull.signal();</b>
 *       return x;
 *     <b>} finally {
 *       lock.unlock();
 *     }</b>
 *   }
 * }

主要是ReentrantLock的实例lock调用的newCondition()方法:

    public Condition newCondition() {
        return sync.newCondition();
    }

此方法生成了一个新的Condition实例,这也就是说在多线程共享一个Lock的时候,只能调用一次newCondition()使所有的线程共享此Condition。
调用Sync的newCondition()。Sync是AbstractQueuedSynchronizer的子类:

        final ConditionObject newCondition() {
            return new ConditionObject();
        }

ConditionObject是AbstractQueuedSynchronizer的内部类,ConditionObject的实现代码很长,就不贴出来了。
在获取到了Condition实例之后,在获取了lock之后,调用Condition的await()方法,可以实现当前线程阻塞等待,同时放弃锁的效果。await()方法的实现如下:

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with
         *      saved state as argument, throwing
         *      IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         * tips:调用await,当前线程在此Condition上等待,同时将此线程实例化成一个Node,加入等待队列中
         * 
         * 
         */
        public final void await() throws InterruptedException {
            //判断是否中断
            if (Thread.interrupted())
                throw new InterruptedException();
            //在此Condition维护的等待队列中增加节点
            Node node = addConditionWaiter();
            //此处做锁的释放,将当前的线程占用的Lock释放,通过修改Lock的State实现
            int savedState = fullyRelease(node);
            
            int interruptMode = 0;
            //判断当前的线程是否在Syn队列中,此处isOnSyncQueue方法实现的功能比较多,而且不是Condition的方法,怀疑是一个方法做了多个事情。
            while (!isOnSyncQueue(node)) {
                //如果当前的node的WaitStatus是Condition,那么park此线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //线程唤醒之后继续争抢Lock
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

首先是检查中断。如果中断,那么抛出中断异常。
如果没有中断,调用addConditionWaiter():

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         * 
         * tips: 在Condition的队列中增加一个Node节点,代表当前Wait的线程,等待Signal信号唤醒。
         *         此处Node的首尾节点都没有用Volatile修饰,是因为到await方法的线程都是获取锁的,利用锁已经能保证了
         *       Node的可见性了。也就是说,此方法本身就是线程安全的,这里就没必要利用Volatile保证可见性。
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //遍历整个双向链表,清除已经取消的节点
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

这里主要是构建了一个等待队列的双向链表,我们叫它等待队列,新建一个waitStatus为CONDITION的Node,并加到队尾。这里没有用volatile遍历保证Node的可见性,但是因为默认是已经获取锁,所以方法默认是线程安全的。
然后我们需要释放当前的锁,如果当前锁不释放,那就死锁了,so调用fullyRelease():

    /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

这里主要是释放锁的逻辑。实现主要是在Release方法中,Release方法在unLock的时候也会调用到,主要是释放线程占用的锁和唤醒同步队列中阻塞的线程:

    /**
     * Releases in exclusive mode.  Implemented by unblocking one or
     * more threads if {@link #tryRelease} returns true.
     * This method can be used to implement method {@link Lock#unlock}.
     *
     * @param arg the release argument.  This value is conveyed to
     *        {@link #tryRelease} but is otherwise uninterpreted and
     *        can represent anything you like.
     * @return the value returned from {@link #tryRelease}
     */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这段代码在之前的《深入理解ReentrantLock》中写过了,就不分析了。
接下来是判断当前的节点是不是在同步队列中,这里可能会有一点疑问,就是为什么是判断同步队列而不是等待队列呢?
其实在后面signal的时候会有一个将node转入到同步队列的过程,从而使线程重新加入到锁竞争中。这里判断是否在同步队列中,其实就是判断是不是已经signal:

/**
     * Returns true if a node, always one that was initially placed on
     * a condition queue, is now waiting to reacquire on sync queue.
     * @param node the node
     * @return true if is reacquiring
     * tips:此处第一个判断当前Node的状态,如果是CONDITION,表明是正常的没有被唤醒的节点,返回false 表名当前的节点还没进入到Syn队列
     *      第二个判断做了个取巧,从而避免遍历这个歌Syn队列,直接判断Node的下一个节点是否为null,不为null表明在Syn队列中
     *      第三个判断遍历整个Node的Syn队列,如果存在返回true
     *      这里有一个Wait队列到Syn队列的转化过程
     */
    
    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }
            //判断当前的线程是否在Syn队列中,判断节点是不是在同步队列中,因为上一步已经释放了锁,也就是说此时可能有线程已经获取锁同时可能已经调用了singal方法,如果已经唤醒,那么就不应该park了,而是退出while方法,从而继续争抢锁
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

如果之前返回了true,那么表示已经在队列中,那么继续争抢锁,这种主要场景是已经释放锁,但是还没有park线程的时候,线程被singal。如果返回false,证明没有singal,所以要park当前的线程。
这样await的逻辑就清晰了,接下来是一个singal的过程,调用singal的时候:

        /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            //检查当前线程是不是,Lock占用的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //取出Condition队列中第一个Node
            Node first = firstWaiter;
            if (first != null)
                //唤醒第一个Node
                doSignal(first);
        }

首先判断当前线程是不是持有锁的先,如果不是,那么锁没意义,抛出异常。如果是,那么去除等待队列中首节点,调用doSignal唤醒:

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

这里主要是遍历整个等待队列,知道找到一个可唤醒的节点,所以主要关心唤醒逻辑,即transferForSignal:

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal).
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         * 如果cancelled 那么会继续唤醒下一个节点,
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        
        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

这里其实是有疑惑的,我开始也是没看明白,后来想了想才明白。这里主要的疑惑是!compareAndSetWaitStatus(p, ws, Node.SIGNAL)一致false的,所以一直不会执行LockSupport.unpark(node.thread)。那也就是说无法唤醒await的线程?其实不是的,因为唤醒操作时在unlock或者是await时进行的。因为在release的方法中会做同步队列中节点的唤醒,而signal之后的线程是加入到了同步队列中的,也就是说signal之后的线程还是需要重新竞争锁的,而不是直接获取锁。
这一点和synchronized的设计不谋而合,因为在synchronized中调用notify也不是及时的获取锁的,而是要等待synchronized块结束之后才竞争。
ReentrantLock还是有很多的优秀实现思想,比如CAS,转换严格的双向链表队列等。后面我还会继续学习总结,不断进步。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,558评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,002评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,024评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,144评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,255评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,295评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,068评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,478评论 1 305
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,789评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,965评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,649评论 4 336
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,267评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,982评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,800评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,847评论 2 351

推荐阅读更多精彩内容