Java多线程(二十三)---Condition接口

移步java多线程系列文章

  • 任意一个Java对象,都拥有一组监视器方法(定义在java.lang.Object上),主要包括wait()、wait(long timeout)、notify()以及notifyAll()方法,这些方法与synchronized同步关键字配合,可以实现等待/通知模式。
  • Condition接口也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式

1.Condition接口

Condition接口意义在于提供一种针对使用条件状态(队列或变量)的线程进行阻塞和唤醒的机制 :
阻塞: 阻塞线程进入等待状态同时释放关联锁,直到被其他线程将条件状态设置为true为止

唤醒: 唤醒等待在条件队列中的线程,同时提供对唤醒全部的支持

锁关联: 值得注意的是,由于多线程下该状态是共享的,因此通常会通过于锁关联保证其原子性

解除等待: 当线程解除等待状态(被唤醒、中断、超时等)后仍需重新竞争锁,获取锁后才能够从暂停位置开始继续往后执行

/*
 * Conditions (also known as `condition queues` or `condition variables`) 
 * provide a means for one thread to suspend execution to wait
 * until notified by another thread that some state condition may now be true.
 * 
 * Conditions(如条件队列或条件变量)的作用如下:
 *  暂停线程并进入等待状态,直到被其他线程将条件状态设置为true为止
 *  值得注意的是:由于多线程下该状态是共享的,因此通常会通过于锁关联保证其原子性
 *  如Lock#newCondition newCondition()方法
 */
public interface Condition {
    /**
     * Causes the current thread to wait until it is signalled or {Thread#interrupt interrupted}
     * 线程等待直到被唤醒或被中断
     * 阻塞方法的实现有几个准则:
     *  1.当前线程持有的锁(关联该条件变量)将被释放
     *  2.当线程解除等待状态后仍需重新竞争锁,获取锁后才能够从当前位置开始继续往后执行
     */
    void await() throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled.
     * 线程等待直到被唤醒
     */
    void awaitUninterruptibly();
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses.
     * 线程等待直到被唤醒或被中断或超时
     */
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified waiting time elapses. This method is behaviorally
     * equivalent to: {@code awaitNanos(unit.toNanos(time)) > 0}
     * 线程等待直到被唤醒或被中断或超时
     * 该方法等同于 awaitNanos(unit.toNanos(time)) > 0
     */
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    /**
     * Causes the current thread to wait until it is signalled or interrupted,
     * or the specified deadline elapses. 
     * 线程等待直到被唤醒或被中断或超时
     */
    boolean awaitUntil(Date deadline) throws InterruptedException;
    /**
     * Wakes up one waiting thread.
     * 唤醒在条件队列中等待的一个线程
     * 注意:线程被唤醒后仍需重新竞争锁,获取锁后才能够从当前位置开始继续往后执行
     */
    void signal();
    /**
     * Wakes up all waiting threads.
     * 唤醒在条件队列中等待的所有线程
     */
    void signalAll();
}

2.ConditionObject综述

ConditionObject包括如下方面:
定位: ConditionObject是AQS的内部类,其通过实现Condition接口提供对管程形式的条件的支持

作用: ConditionObject在AQS中作为对Lock的实现支持

使用: 当条件不满足时线程入队,当条件满足时出队并重新尝试获取锁,获取成功后从暂停位置开始继续往后执行

3.ConditionObject实现原理

ConditionObject的实现有几个注意事项:
1.内部会维护一个条件等待同步队列,根据FIFO原则执行入队出队操作

2.通过firstWaiter维护头节点,lastWaiter维护尾节点

3.节点通过nextWaiter记录后继条件节点形成链表结构,遍历时从头节点开始沿着nextWaiter顺序遍历

4.条件队列中节点状态waitStatus只能为0或Node.CONDITION

5.使用条件队列的前提是线程需要持有同步锁,且只支持独占模式

6.当节点在条件队列被唤醒(因signal|timeout|interrupt)后,需要进行节点转移,即由条件节点转变为同步节点

4.ConditionObject组成

4.1 类定义

public class ConditionObject implements Condition, java.io.Serializable

4.2 构造器

public ConditionObject() { }

4.3 重要变量

/** 条件等待队列首节点 */
private transient Node firstWaiter;
/** 条件等待队列尾节点 */
private transient Node lastWaiter;
/** 退出时重新中断*/
private static final int REINTERRUPT =  1;
/** 退出时直接抛出异常 */
private static final int THROW_IE    = -1;
/** 独占模式时才能使用条件队列 ,用于链接下一个等待节点 */
Node nextWaiter;

5.条件节点阻塞

5.1 不响应中断阻塞

条件节点阻塞的一般流程:
1.入队: 根据FIFO原则,新节点会被封装成Node并被加入到条件队列队尾

2.释放: 为不影响其他线程,当前已持有锁的线程需要先释放所有锁,让出锁资源

3.阻塞: 在条件队列中的节点线程需要被阻塞,直到条件满足后重试获取同步锁

4.重试: 当条件满足后需要重试获取同步锁,获取成功后才能继续从暂停位置继续向后执行

补充:条件满足指的是线程被其他线程唤醒、或超时、或中断后且位于同步队列中

public final void awaitUninterruptibly() {
    //1.新节点入条件队列
    Node node = addConditionWaiter();
    //2.当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放锁
    int savedState = fullyRelease(node);
    //3.阻塞在条件队列中的不满足条件的节点线程
    boolean interrupted = false;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        //由于不响应中断,因此只是记录是否中断,此时会清除中断标识
        if (Thread.interrupted())
            interrupted = true;
    }
    /**
     * 4.被唤醒需要重新尝试获取锁,获取锁成功后才能继续往后执行
     * 若期间又被中断后,需要再次设置已被清除的中断标识
     */
    if (acquireQueued(node, savedState) || interrupted)
        selfInterrupt();
}

小问:为神马需要循环判断isOnSyncQueue? 即while (!isOnSyncQueue(node)) ?
友情小提示:读者可以回顾一下同步队列获取锁的过程

小答:循环只有一个目的,就是确保节点已经脱离条件队列且进入同步队列,这样才有资格重新获取同步锁

5.2 响应中断阻塞

public final void await() throws InterruptedException {
    //响应中断,直接抛异常,没必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新节点入条件队列   
    Node node = addConditionWaiter();
    //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放锁
    int savedState = fullyRelease(node);
    //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        /**
         * 检测中断,一旦发生中断
         *  1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
         *  2.退出循环 -> 接下来会在循环外进行中断处理
         *  注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
         *        这样随后就能安心执行acquireQueued同步队列入队操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    /**
     * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若当前节点存在后继节点时,需要执行出队操作
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    //interruptMode != 0 说明是需要进行中断处理的
    if (interruptMode != 0)
        //执行中断处理
        reportInterruptAfterWait(interruptMode);
}
/**
 * 中断处理:
 *  1.中断模式为THROW_IE:向上抛出异常
 *  2.中断模式为REINTERRUPT:重新设置中断状态
 */
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}

5.3 响应中断和超时阻塞-纳秒可选

/**
 * Implements timed condition wait.
 * 响应中断和超时阻塞
 * @return long 剩余超时时间 <0 说明已超时
 */
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {
    //响应中断,直接抛异常,没必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新节点入条件队列     
    Node node = addConditionWaiter();
    //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
    int savedState = fullyRelease(node);
    //截止时长
    final long deadline = System.nanoTime() + nanosTimeout;
    //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        /**
         * 将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
         * 值得注意的是:剩余超时时间允许为0和负数
         */
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //为了提高效率,超过阈值才执行阻塞,否则接着自旋
        if (nanosTimeout >= spinForTimeoutThreshold)
            //若超过nanosTimeout时长,会自动解除阻塞唤醒线程
            LockSupport.parkNanos(this, nanosTimeout);
        /**
         * 检测中断,一旦发生中断
         *  1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
         *  2.退出循环 -> 接下来会在循环外进行中断处理
         *  注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
         *        这样随后就能安心执行acquireQueued同步队列入队操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //剩余超时时间    
        nanosTimeout = deadline - System.nanoTime();
    }
    /**
     * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若当前节点存在后继节点时,需要执行出队操作
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 说明是需要进行中断处理的
    if (interruptMode != 0)
        //执行中断处理
        reportInterruptAfterWait(interruptMode);
    //返回已花费时长    
    return deadline - System.nanoTime();
}

5.4 响应中断和超时阻塞-纳秒日期单位可选

/**
 * Implements timed condition wait.
 * 响应中断和超时阻塞-纳秒日期单位可选
 * @return boolean 超时是否发生在被唤醒之前
 */
public final boolean await(long time, TimeUnit unit)
            throws InterruptedException {
    //注意是纳秒        
    long nanosTimeout = unit.toNanos(time);
    //响应中断,直接抛异常,没必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新节点入条件队列 
    Node node = addConditionWaiter();
    //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
    int savedState = fullyRelease(node);
    //截止时长
    final long deadline = System.nanoTime() + nanosTimeout;
    //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
    boolean timedout = false;
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        /**
         * 将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
         * 值得注意的是:剩余超时时间允许为0和负数
         */
        if (nanosTimeout <= 0L) {
            timedout = transferAfterCancelledWait(node);
            break;
        }
        //为了提高效率,超过阈值才执行阻塞,否则接着自旋
        if (nanosTimeout >= spinForTimeoutThreshold)
            //若超过nanosTimeout时长,会自动解除阻塞唤醒线程
            LockSupport.parkNanos(this, nanosTimeout);
        /**
         * 检测中断,一旦发生中断
         *  1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
         *  2.退出循环 -> 接下来会在循环外进行中断处理
         *  注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
         *        这样随后就能安心执行acquireQueued同步队列入队操作了
         */   
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
        //剩余超时时间 
        nanosTimeout = deadline - System.nanoTime();
    }
    /**
     * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若当前节点存在后继节点时,需要执行出队操作
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 说明是需要进行中断处理的
    if (interruptMode != 0)
        //执行中断处理
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

5.5 响应中断和超时阻塞-毫秒日期可选

/**
 * Implements absolute timed condition wait.
 * 响应中断和超时阻塞-毫秒日期可选
 * @return boolean 超时是否发生在被唤醒之前
 */
public final boolean awaitUntil(Date deadline)
            throws InterruptedException {
    //注意:与awaitNano区别的是这里用的是毫秒
    long abstime = deadline.getTime();
    //响应中断,直接抛异常,没必要往下走了
    if (Thread.interrupted())
        throw new InterruptedException();
    //新节点入条件队列 
    Node node = addConditionWaiter();
    //当前线程已持有锁,但由于要被阻塞,为不影响其他线程,需要先释放同步锁
    int savedState = fullyRelease(node);
    //记录节点是否超时
    boolean timedout = false;
    //只阻塞在条件队列中的节点线程,同步节点才能竞争同步锁
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        //毫秒超时,将条件队列中因超时而唤醒的节点进行转移(注意此处是超时)
        if (System.currentTimeMillis() > abstime) {
            /**
             * 记录节点是否超时
             * 注意:当取消发生在节点被唤醒之前才返回true
             */
            timedout = transferAfterCancelledWait(node);
            //转换一定会成功,因此安心退出即可
            break;
        }
        //若超过abstime时长,会自动解除阻塞唤醒线程
        LockSupport.parkUntil(this, abstime);
        /**
         * 检测中断,一旦发生中断
         *  1.将条件队列中因中断而唤醒的节点进行转移(注意此处是中断)
         *  2.退出循环 -> 接下来会在循环外进行中断处理
         *  注意:之所以安心退出是因为会通过执行transferAfterCancelledWait进行节点转移
         *        这样随后就能安心执行acquireQueued同步队列入队操作了
         */    
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
        break;
    }
    /**
     * 重新尝试获取同步锁,获取成功后且被中断,当中断模式为抛出异常时,需要设置为重新中断
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    /**
     * 若当前节点存在后继节点时,需要执行出队操作
     * 补充:acquireQueued会返回获取锁过程中线程是否有过中断,true则说明发生过中断
     */
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    //interruptMode != 0 说明是需要进行中断处理的    
    if (interruptMode != 0)
         //执行中断处理
        reportInterruptAfterWait(interruptMode);
    return !timedout;
}

6.条件节点唤醒

唤醒操作主要干了三个事情:
1.清除节点: 从条件队列中移除该节点

2.节点转移: 将条件节点转换为同步节点,即从条件队列转移到同步队列

3.唤醒节点: 将转移成功的节点重新唤醒

6.1 唤醒单个

唤醒单个的实质:根据FIFO原则唤醒first,即条件队列头节点

/**
 * Moves the longest-waiting thread, if one exists, from the
 * wait queue for this condition to the wait queue for the
 * owning lock.
 *
 * 从条件队列中唤醒一个节点
 * 原则:将条件队列中已存在的等待时间最长的线程转移到等待队列中,即头节点
 */
public final void signal() {
    //条件队列只适用于独占模式且只能由持有锁的线程执行唤醒操作
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    //先进先出原则    
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}
/**
 * Removes and transfers nodes until hit non-cancelled one or null. 
 * @param first (non-null) the first node on condition queue 条件队列非空头节点
 */
private void doSignal(Node first) {
    do {
        //清空队列
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
        /**
         * 将节点从条件队列转换到同步队列中并唤醒线程
         */
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}

6.2 唤醒全部

唤醒全部的实质:沿着nextWaiter顺序遍历依次唤醒

/**
 * Moves all threads from the wait queue for this condition to
 * the wait queue for the owning lock.
 *
 * 唤醒条件队列中的全部节点
 */
public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        //跟signal的区别就是多了个All...
        doSignalAll(first);
}
/**
 * Removes and transfers all nodes.
 * 
 * 全部唤醒的实质:
 *     沿着nextWaiter顺序遍历依次转移并唤醒
 */
private void doSignalAll(Node first) {
    //1.注意要清空条件队列
    lastWaiter = firstWaiter = null;
    //2.沿着nextWaiter顺序遍历依次唤醒
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        //3.其实质就是沿着nextWaiter顺序遍历
        first = next;
    } while (first != null);
}

7.条件节点转移

转移操作指的是节点由条件节点转换为同步节点,主要进行了如下两步进行转移:
1.更新节点状态: 根据同步队列入队原则,新节点初始状态必须为0
2.同步队列入队: 当前节点肯定能入队成功,同时返回前驱节点

7.1 因正常唤醒转移

/**
 * Transfers a node from a condition queue onto sync queue.
 * Returns true if successful.
 *
 * 将节点从条件队列转换到同步队列中并唤醒线程
 *  1.在正常调用signalXXX()方法时才会调用该方法
 *  2.若因为超时或中断进行转移,不会调用该方法,但这两种情况的转移都会置为0
 *  3.注意:在调用该方法之前都会执行first.nextWaiter = null,即从等待队列中移除当前头节点
 * 
 * @param node the node 根据FIFO原则,通常为first节点
 * @return true if successfully transferred (else the node was
 * cancelled before signal) 转换成功或节点在唤醒前被取消 才返回true
 */
final boolean transferForSignal(Node node) {
    /**
     * 1.CAS更新节点状态为0,一旦失败立即返回false
     * 注意:必须将节点状态更新为0,因为同步队列入队时新节点必须为0,
     *       否则就不符合同步队列入队原则,因此一旦失败立即返回
     * 补充:CAS失败的原因在于其他线程已经执行唤醒操作将该节点变更为0,因此其实无需再次enq了
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    //2.进入同步队列,注意enq方法返回的是前驱节点
    Node p = enq(node);
    /**
     * 3.在因前驱节点被取消或状态突变发生后需要唤醒节点线程 
     *  ws > 0:
     *     说明前驱节点被取消(CANCELLED),因此需要唤醒当前节点线程
     *  !compareAndSetWaitStatus(p, ws, Node.SIGNAL):
     *     此时发生状态突变,比如ws刚好变成CANCELLED
     * 补充:因为enq保证一定入队成功,因此实质是唤醒在同步队列中的节点
     *       结合awaitXXX(),线程会从park之后继续往后执行
     * 
     * !!特别注意!!:正常情况下,根据流程可知
     *  ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)
     *  这种判断是不会返回true的,此时是不会唤线程;
     *  因此真正唤醒线程的地方在于调用signal()方法的线程发送完signal
     *  信号后再调用release(1)方法(比如调用ReentrantLock的unlock()),
     *  因为其已入队,因此可以被唤醒
     */
    int ws = p.waitStatus.
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}

小问:读者还记得enq方法都做了什么吗?
小答:针对enq方法,笔者简单回顾一下作用,帮助读者理解一下:
1.初始化时对head和tail进行赋值

2.当前节点进入同步对了并作为新的tail

3.入队一定可以成功因为自旋

4.注意enq返回的是前驱节点

7.2 因中断或超时唤醒转移

/**
 * Transfers node, if necessary, to sync queue after a cancelled wait.
 * Returns true if thread was cancelled before being signalled.
 *
 * 将条件队列中因中断或超时而唤醒的节点进行转移
 * 取消发生在被唤醒之前返回true
 * 两种情况需要调用该方法:
 *  1.中断:checkInterruptWhileWaiting() -> 该方法会被awaitXXX()调用
 *  2.超时:带超时的awaitXXX()方法
 */
final boolean transferAfterCancelledWait(Node node) {
    //Node.CONDITION状态节点只用于条件队列,因此需要设置为0才能进入同步队列
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        //当CAS变更waitStatus为0时,说明节点已跟条件队列无关,随后进入同步队列即可
        enq(node);
        //由于enq方法能够保证进入同步队列成功,因此当enq执行完毕,可以放心的返回true
        return true;
    }
    //若CAS失败且节点还不在同步队列中,线程需要先释放资源,提高效率
    while (!isOnSyncQueue(node))
        Thread.yield();
    return false;
}

小问:为什么需要执行Thread.yield()?
小答:对于这个问题,笔者的考虑如下:
1.若CAS变更为0失败,说明已被其他线程执行唤醒转换并变更为0
2.但由于enq操作可能还在进行中,因此此时该节点可能还没真正入队,因此需要循环检测是否已入队
3.由于入队操作已经在执行中,因此无需重复执行enq操作,可以先释放资源,并通过isOnSyncQueue退出循环即可,优化效率

8.条件节点队列操作

8.1 条件节点入队

新节点入队遵循FIFO原则,主要会做如下操作:

1.清洗: 若尾节点非CONDITION状态,需要清除所有非CONDITION状态节点并重设头尾节点

2.封装: 封装当前线程为Node,同时初始化节点状态为CONDITION

3.入队: 队列为空时,头尾节点指向同一个元素;非空时通过nextWaiter形成链表

/**
 * Adds a new waiter to wait queue.
 *
 * 新增条件等待队列队尾节点 - FIFO
 * 补充:由于条件队列只适用于独占模式,因此该方法不会有并发问题
 * @return its new wait node 返回新节点
 */
private Node addConditionWaiter() {
    //FIFO原则:新入队元素需队尾插入
    Node t = lastWaiter;
    //1.若尾节点非Condition状态,需要清除所有非CONDITION状态节点并重设头尾节点
    if (t != null && t.waitStatus != Node.CONDITION) {
        /**
         * 删除条件等待队列中的所有非CONDITION状态节点
         * 补充:由于在条件队列中节点状态只能是CONDITION或0(signal|timeout|interrupt)
         *      因此该方法实质就清除空节点或状态为0节点
         */
        unlinkCancelledWaiters();
        //上个方法执行后会使得lastWaiter肯定为非取消状态节点(只可能为空或CONDITION状态)
        t = lastWaiter;
    }
    //2.封装当前线程为Node,同时初始化节点状态为CONDITION
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    //3.队尾元素为空,正说明条件队列为空
    if (t == null)
        firstWaiter = node;
    else
        //4.新元素入队需队尾插入
        t.nextWaiter = node;
    //5.这里需要注意的是:队列为空时,头尾节点是指向同一个元素的;非空时就会形成链表
    lastWaiter = node;
    return node;
}

小问:何时出现t != null && t.waitStatus != Node.CONDITION?
小答:尾节点的状态不为Node.CONDITION,那只可能为0,这意味着节点需要转移,那么节点状态变更时机都有哪些呢?
1.当因中断或超时被唤醒后会通过调用transferAfterCancelledWait将节点状态CAS为0

2.当被正常唤醒后会通过调用transferForSignal将节点状态CAS为0

分析:但问题是正常唤醒后会先执行first.nextWaiter = null,因此此时尾节点应为空

结论:排除这种情况后可知,只有第一种情况,即因中断或超时被唤醒后才会出现这种情况

8.2 条件节点出队

条件节点的出队时机:
1.新节点入队: 通过判断t.waitStatus != Node.CONDITION为true时执行unlinkCancelledWaiters()

2.节点重新尝试获取同步锁后: 通过判断node.nextWaiter != null为true时执行unlinkCancelledWaiters()

3.节点被正常唤醒后: 通过调用signal()方法并执行first.nextWaiter = null

补充:3与1,2的区别在于前者只是清除头节点,后者是遍历清除所有非CONDITION状态节点
注意:无论入队还是出队,前提都是线程已经持有同步锁

/**
 * Unlinks cancelled waiter nodes from condition queue.
 *
 * 移除条件队列中的所有非CONDITION状态节点
 * 两种情况会触发该方法:
 *  1.新节点加入条件队列时 -> addConditionWaiter()
 *  2.节点被唤醒之后 -> awaitXXX()
 *    补充一点:注意不是在signal()方法中执行的(因为后者只有转移和唤醒操作,前者才有获取锁操作)
 * 注意:条件队列中的节点只有CONDITION和0两种状态
 */
private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    //临时节点,主要用于在遍历时记录上一个非CONDITION节点(因为要跳过所有非CONDITION节点)
    Node trail = null;
    //从前往后顺序遍历条件队列,剔除全部非CONDITION状态节点
    while (t != null) {
        //下一个节点,注意next可能为空
        Node next = t.nextWaiter;
        /**
         * 非CONDITION状态(即0),需要执行移除操作
         * 强调:在条件队列中节点的waitStatus,只可能是CONDITION或是0(signal|timeout|interrupt)
         */
        if (t.waitStatus != Node.CONDITION) {
            /**
             * 移除后继节点 等同于 将当前节点从队列中移除
             * 同时由于不再有引用,会help GC
             * 注意:GC同时会考虑firstWaiter和lastWaiter的引用情况
             *      即若当前线程无用,上述变量最终也会移除对该节点的引用
             */
            t.nextWaiter = null;
            /**
             * 若之前没有非CONDITION状态节点,就先让next当一下头头
             * 注意:该方法执行完毕后,firstWaiter只能为空或CONDITION
             */
            if (trail == null)
                firstWaiter = next;
            else
                /**
                 * 一旦当前节点非CONDITION状态,那么需要先将它的后继节点与上一个非取消节点建立起联系
                 * 即使后继节点是非CONDITION状态也没事,因为在下次遍历时会重新建立联系的
                 * 说白了,其本质就是删除t
                 */
                trail.nextWaiter = next;
            /**
             * 最后一个非CONDITION状态节点即是条件队列的尾节点
             * 注意:该方法执行完毕后,lastWaiter最终只能为空或CONDITION
             */
            if (next == null)
                lastWaiter = trail;
        }
        else
            //CONDITION状态,就记录一下以作为下次遍历时的上一个非CONDITION节点
            trail = t;
        //开启下一次循环
        t = next;
    }
}

小问:为神马调用unlinkCancelledWaiters后firstWaiter只能为空或CONDITION?
小答:头节点非空时,有且只有条件队列中全部都是非CONDITION状态的节点时,新的头节点才能为空,原因在于firstWaiter = next;的前提是无可用的CONDITION状态节点,而trail只有在有CONDITION状态节点时才会被赋值更新

9.重要方法

9.1 释放同步锁

/**
 * Invokes release with current state value; returns saved state.
 * Cancels node and throws exception on failure.
 *
 * 释放同步状态并返回原状态
 * 当释放失败时节点会被取消同时抛出IllegalMonitorStateException异常
 */
final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        //获取当前线程持有锁的同步状态 -- 支持可重入  
        int savedState = getState();
        /**
         * 由于该方法只用于独占模式,因此使用的是独占独有的release方法
         * 关于release方法请参见笔者的并发番@AbstractQueuedSynchronizer一文通
         * 作用是更新state状态(为0)同时唤醒后继节点(如果存在的话)
         * 释放锁的目的是为了让其他线程能够获取锁去执行任务,
         * 并等到其他线程调用signal()和release()后能够唤醒该线程
         */
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            //一旦释放失败,直接抛出异常 -- 干脆的很
            throw new IllegalMonitorStateException();
        }
    } finally {
        /**
         * 释放失败的补救措施:
         *   由于实际上节点已完成使命,节点状态需要变成取消状态以用于跳过和回收
         * 注意:此时节点还是同步节点,因此需要设置为CANCELLED
         */
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

9.2 检测中断

/**
 * Checks for interrupt, returning THROW_IE if interrupted
 * before signalled, REINTERRUPT if after signalled, or
 * 0 if not interrupted.
 *
 * 检测线程的中断情况,通过返回状态码告知线程下一步应如何处理中断,情况有如下三种:
 *  1.中断发生在被唤醒之前返回THROW_IE(需要抛出异常)
 *  2.中断发生在被唤醒之后返回REINTERRUPT(需要重新中断)
 *  3.无中断返回0
 */
private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ? (transferAfterCancelledWait(node) ? 
        THROW_IE : REINTERRUPT) : 0
}

小问:为神马中断发生在被唤醒之前返回THROW_IE?之后返回REINTERRUPT?
小答:唤醒前抛出异常主要是为了快速失败,提高效率,而重新中断是要处理被唤醒后一系列
唤醒前: 若被唤醒之前线程被中断,说明线程此时没有获取到资源,尽快抛出异常就可以结束等待并解放生产力;

唤醒后: 若被唤醒后线程被中断,说明线程基本已经获取锁,这时可能要多执行一些操作,如释放锁等

9.3 判断节点是否位于同步队列

/**
 * Returns true if a node, always one that was initially placed on
 * a condition queue, is now waiting to reacquire on sync queue.
 *
 * 判断节点是否位于同步队列
 * @param node the node
 * @return true if is reacquiring
 */
final boolean isOnSyncQueue(Node node) {
    //CONDITION只用于条件队列 || prev为空说明其一定不在同步队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    //如果next非空,它肯定在同步队列中    
    if (node.next != null) 
        return true;
    //node.prev非空并不意味着在同步队列中,因此需要从后往前遍历以判断是否在同步队列中    
    return findNodeFromTail(node);
}
/**
 * Returns true if node is on sync queue by searching backwards from tail.
 * 
 * node.prev非空并不意味着在同步队列中,原因是CAS(放入同步队列)可能失败
 * 因此需要从后往前遍历以判断其是否在同步队列中
 * 由于该方法被调用时,该节点总是靠近tail,因此除非CAS失败(可能性很低),否则几乎无须遍历
 */
private boolean findNodeFromTail(Node node) {
    //从同步队列的队尾节点开始沿着prev依次往前遍历
    Node t = tail;
    for (;;) {
        //存在,返回true
        if (t == node)
            return true;
        //都遍历到头了还不存在,那只能返回false了    
        if (t == null)
            return false;
        //沿着prev依次往前遍历吧    
        t = t.prev;
    }
}

10.流程图

10.1 无响应中断

注意:仅当调用signal()的线程再调用release()方法之后才会真正解除被阻塞线程的阻塞状态,但release()之后该线程并不是被立即唤醒,而是重新竞争锁直到变成head节点的后继节点且head节点为SIGNAl状态时才能被真正唤醒


条件队列.jpg

10.2 响应中断

中断的条件队列.jpg

10.3 响应中断与超时

超时中断的条件队列.jpg

1. 通过对比Object的监视器方法和Condition接口

qq_pic_merged_1535605094135.jpg

2 Condition接口与示例

Condition是依赖Lock对象的
使用示例

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
public void conditionWait() throws InterruptedException {
    lock.lock();
    try {
            condition.await();
    } finally {
            lock.unlock();
    }
}
public void conditionSignal() throws InterruptedException {
    lock.lock();
    try {
            condition.signal();
    } finally {
            lock.unlock();
    }
}

v般都会将Condition对象作为成员变量。当调用await()方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的signal()方法,通知当前线程后,当前线程才从await()方法返回,并且在返回前已经获取了锁。

Condition定义的方法

qq_pic_merged_1535605905139.jpg

qq_pic_merged_1535605921452.jpg

获取一个Condition必须通过Lock的newCondition()方法。

Condition的使用方式
一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”

public class BoundedQueue<T> {
    private Object[]    items;
    // 添加的下标,删除的下标和数组当前数量
    private int addIndex, removeIndex, count;
    private Lock lock     = new ReentrantLock();
    private Condition    notEmpty = lock.newCondition();
    private Condition    notFull = lock.newCondition();
    public BoundedQueue(int size) {
            items = new Object[size];
    }
    // 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
    public void add(T t) throws InterruptedException {
            lock.lock();
            try {
                    while (count == items.length)
                            notFull.await();
                    items[addIndex] = t;
                    if (++addIndex == items.length)
                            addIndex = 0;

                    ++count;
                    notEmpty.signal();
            } finally {
                    lock.unlock();
            }
    }
    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
            lock.lock();
            try {
                    while (count == 0)
                            notEmpty.await();
                    Object x = items[removeIndex];
                    if (++removeIndex == items.length)
                            removeIndex = 0;
                    --count;
                    notFull.signal();
                    return (T) x;
            } finally {
                    lock.unlock();
            }
    }
}

上述示例中,BoundedQueue通过add(T t)方法添加一个元素,通过remove()方法移出一个元素。以添加方法为例。

  • 首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用notFull.await(),当前线程随之释放锁并进入等待状态。
  • 如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在notEmpty上的线程,数组中已经有新元素可以获取。
  • 在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。

3 Condition的实现分析

  • ConditionObject是同步器AbstractQueuedSynchronizer的内部类,因为Condition的操作需首先需要获得锁
  • 每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

3.1 等待队列

  • 等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了Condition.await()方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
  • 节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类AbstractQueuedSynchronizer.Node**。
  • 一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。
  • 当前线程调用Condition.await()方法,将会以当前线程构造节点,并将节点从尾部加入等待队列

等待队列的基本结构图


qq_pic_merged_1535807449758.jpg

Condition拥有首尾节点的引用,而新增节点只需要将原有的尾节点nextWaiter指向它,并且更新尾节点即可。

没有使用CAS保证,原因在于调用await()方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,
并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列

qq_pic_merged_1538233209041.jpg

Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

3.2 等待

  • 调用Condition的await()方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。
  • 当从await()方法返回时,当前线程一定获取了Condition相关联的锁。 如果从队列(同步队列和等待队列)的角度看await()方法,当调用await()方法时,相当于同步队列的首节点(获取了锁的节点)移动到Condition的等待队列中。

Condition的await()方法

 public final void await() throws Interrupt
edException {
    if (Thread.interrupted())
            throw new InterruptedException();
    // 当前线程加入等待队列
    Node node = addConditionWaiter();
    // 释放同步状态,也就是释放锁
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
            unlinkCancelledWaiters();
    if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
}
  • 调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
  • 当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用Condition.signal()方法唤醒,而是对等待线程进行中断,则会抛出InterruptedException。
  • 如果从队列的角度去看,当前线程加入Condition的等待队列,该过程如图5-11示。
qq_pic_merged_1538234152233.jpg
  • 如图所示,同步队列的首节点并不会直接加入等待队列,而是通过addConditionWaiter()方法把当前线程构造成一个新的节点并将其加入等待队列中。

3.3 通知

调用Condition的signal()方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
ConditionObject的signal方法

  public final void signal() {
    if (!isHeldExclusively())
            throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
            doSignal(first);
}

调用该方法的前置条件是当前线程必须获取了锁,可以看到signal()方法进行了isHeldExclusively()检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport唤醒节点中的线程。
节点从等待队列移动到同步队列的过程

qq_pic_merged_1538234438845.jpg

  • 通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。
  • 当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。 被唤醒后的线程,将从await()方法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
  • 成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已经成功地获取了锁。
  • Condition的signalAll()方法,相当于对等待队列中的每个节点均执行一次signal()方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。

参考

《java并发编程的艺术》
并发番@ConditionObject一文通

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,029评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,238评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,576评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,214评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,324评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,392评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,416评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,196评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,631评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,919评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,090评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,767评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,410评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,090评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,328评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,952评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,979评论 2 351

推荐阅读更多精彩内容