Java8 源码阅读 - AQS之Condition

Java8 源码阅读 - AbstractQueuedSynchronizer
Java8 源码阅读 - AQS之Condition

Condition配合ReentrantLock来使用实现线程间的通信,相较于Object+synchronized的组合来说,Condition用于控制线程之间的协作会更加的方便和高效,这也是JUC下其他类的默认使用锁的方式,所以说了解Condition的原理方便我们阅读和理解其他的类实现的含义;

public class ConditionObject implements Condition, java.io.Serializable {
    /** 队列的第一个节点 */
    private transient Node firstWaiter;
    /** 队列的最后一个节点 */
    private transient Node lastWaiter;
...

每个Condition对象都包含着一个FIFO的等待队列,在队列中的每个节点都包含了一个线程引用,该线程就是在对象上等待的线程;

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        // 支持中断处理
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    ...
}

调用Conditionawait()方法后,将会以当前线程构造节点,并将节点从尾部加入等待队列,会使当前线程进入等待状态,同时线程状态变为等待状态,当从方法返回时,当前线程一定获取了Condition相关联的锁;

private Node addConditionWaiter() {
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 如果队列中最后一个节点的状态不为CONDITION
        // 则移除condition队列中所有的cancelled节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null)
        firstWaiter = node;
    else
        t.nextWaiter = node;
    lastWaiter = node;
    return node;
}

这个可以看到队列插入新元素的过程,直接将新节点赋值给末尾节点的下一个节点,因为使用Condition的条件就是在ReentrantLock加锁中使用,所以这里没有考虑对线程安全的问题;

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();
        if (release(savedState)) {
            // 只有在资源全部被释放的时候返回成功
            failed = false;
            return savedState;
        } else {
            // 否则会认为发生了多线程的安全问题,将抛出异常
            throw new IllegalMonitorStateException();
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        // 如果资源释放完毕
        Node h = head;
        // 头结点的waitStatus不为CACCELLED
        if (h != null && h.waitStatus != 0)
            // 尝试唤醒head节点的第一个后继节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

protected final boolean tryRelease(int releases) {
    // 将资源释放
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

fullyRelease的目的主要在于唤醒(如果存在)第一个等待队列节点中所保存的线程,注意这里有两个队列,一个是AQS同步器的等待队列,另一个是condition等待队列,唤醒之后的节点就移出了等待队列了;

public final void await() throws InterruptedException {
    ... 
    while (!isOnSyncQueue(node)) {
        // 阻塞当前线程,等待其他线程唤醒
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            // 如果线程恢复后发现线程仍处于中断状态
            break;
    }
    ... 
}
final boolean isOnSyncQueue(Node node) {
    // 如果节点状态为CONDITION则表明该节点已经在condition队列中
    if (node.waitStatus == Node.CONDITION || node.prev == null)
        return false;
    if (node.next != null)
        // 只有AQS等待队列才会有next关系维持,condition队列是依赖nextWaiter
        return true;
    return findNodeFromTail(node);
}
// 从等待队列的队尾开始查询
private boolean findNodeFromTail(Node node) {
    Node t = tail;
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

这里isOnSyncQueue分为两种情况,一是使用await方法的线程刚刚进入到condition队列中,这时候一般情况下会在第一个判断语句中就能退出,因为isOnSyncQueue前刚刚新建了一个新的waiter节点的状态为CONDITION,且被移除了等待队列,所以node.prev也应该是null,这一层的意思是当前节点node不在等待队列中;
二是在其他线程调用signal之后,该线程被唤醒了,此时waiter节点会被添加到等待队列的队尾中(signal代码中可以看的),所以通常来说会在findNodeFromTail中找到对应节点,这一层的意思应该是线程被重新唤醒,又可以在等待队列中找到该node节点;

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
         (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 如果signal方法没有正确的恢复节点状态
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        // 重新通过cas设置,并重新加入到等待队列
        enq(node);
        return true;
    }
    // cas失败或者已经被其他线程修改,只需要等待其他线程完成操作
    // 通常意味着interrupt是发生在await之后
    while (!isOnSyncQueue(node))
         // 让出cpu时间片
        Thread.yield();
    return false;
}

处理中断的逻辑,如果在线程阻塞过程中发现该线程被中断了,则会根据该线程节点nodewaitStatus来判断是否被其他线程signal,如果是则表示中断是发生在signal之后的,反之则表示中断是在signal之前的;

public final void await() throws InterruptedException {
    ... 
    // 前面已经保证node节点是等待队列的队尾节点了
    // 如果acquireQueued过程中被中断且中断是发生在await之后发生的
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

final boolean acquireQueued(final Node node, int arg) {
    // node节点是排队到队尾的节点
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取node的前置节点
            final Node p = node.predecessor();
            // 如果前置节点是头节点 则尝试获取锁、
            if (p == head && tryAcquire(arg)) {
                // 该线程获取锁成功 将node节点(p的下一任节点)设置为head 
                // setHead里面将node的线程信息清除 因为head只做哨兵作用
                setHead(node);
                // 将p节点出队 释放p节点 
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            // 如果p不是头节点 或者 该线程获取锁失败
            if (shouldParkAfterFailedAcquire(p, node) && // 若p节点的状态为SIGNAL,则表示p节点允许被阻塞
                parkAndCheckInterrupt())  //  如果该线程被阻塞且线程状态为中断
                // 将中断标志置为true 这是lock方法处理中断的方式
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
// 根据中断类型来恢复中断或者抛出中断异常
private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        selfInterrupt();
}
static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

首先先进入acquireQueued重新给线程上锁,因为每个await方法都是在ReentrantLocklockunlock之间发生的,在await中会短暂的将其解锁,这里需要重新加锁,如果node是等待队列中的第一个节点,那么会立刻上锁成功,否则则需要阻塞等待其他线程释放锁资源;
然后根据线程中断类型来觉得是抛出InterruptedException异常还是恢复中断,中断是发生在signal之后的是选择恢复中断,反之抛出异常;

public final void signal() {
    // 必须在加锁条件下使用signal
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        // 每次循环都将首个waiter节点移除队列
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null; 
    } while (!transferForSignal(first) &&
             // 如果发现该节点状态为CANCELLED,则尝试signal下一个节点
             (first = firstWaiter) != null);
}
// 将waiter节点重新插入等待队列,只有该节点已经被取消才会返回false
final boolean transferForSignal(Node node) {
    // 如果cas失败,则表示节点已经被其他线程所取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;
    Node p = enq(node); // 重新将节点插入等待队列
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        // 如果前置节点的状态为CANCELLED(>0),
        // 或者将前置节点的状态设置SIGNAL时失败,
        // 将尝试唤醒当前线程;
        LockSupport.unpark(node.thread);
    return true;
}

signal方法做的就是将condition队列中的第一个状态不为CANCELLED的节点的移出去,并且重新添加会等待队列中,等到使用signal的线程调用使用unlock之后才会唤醒await的线程;

每一个condition队列中节点就代表了这个conditionObject对象使用过的await的次数,如果condition队列中有剩余节点,也就意味着awaitsignal不是成对出现,除非使用signalAll

public final void signalAll() {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;
    if (first != null)
        doSignalAll(first);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

signalAllsignal的唯一区别就是将所有condition队列上的节点一次性全部移回到等待队列中;

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,029评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,395评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,570评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,535评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,650评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,850评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,006评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,747评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,207评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,536评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,683评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,342评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,964评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,772评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,004评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,401评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,566评论 2 349

推荐阅读更多精彩内容