java并发包之AbstractQueuedSynchronizer源码分析(二)

此篇文章分析AbstractQueuedSynchronizer(AQS)类里的ConditionObject。

Condition

首先要明白Condition的应用场景,Condition经常用于生产者-消费者模型中。

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    // condition 依赖于 lock 来产生
    final Condition notFull = lock.newCondition();
    final Condition notEmpty = lock.newCondition();

    final Object[] items = new Object[100];
    int putptr, takeptr, count;

    // 生产
    public void put(Object x) throws InterruptedException {
        lock.lock();
        try {
            while (count == items.length)
                notFull.await();  // 队列已满,等待,直到 not full 才能继续生产
            items[putptr] = x;
            if (++putptr == items.length) putptr = 0;
            ++count;
            notEmpty.signal(); // 生产成功,队列已经 not empty 了,发个通知出去
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public Object take() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
                notEmpty.await(); // 队列为空,等待,直到队列 not empty,才能继续消费
            Object x = items[takeptr];
            if (++takeptr == items.length) takeptr = 0;
            --count;
            notFull.signal(); // 被我消费掉一个,队列 not full 了,发个通知出去
            return x;
        } finally {
            lock.unlock();
        }
    }
}

无论是await()还是signal()都需要获取到锁才可以操作,put()方法里当notFull.await()时,当前线程首先释放持有的锁,然后在此堵塞,等待唤醒并获取锁,获取锁之后才能返回并继续向下。执行take()方法里notFull().signal()会将之前阻塞的线程唤醒,然后lock.unlock()释放锁。
这里要注意的是,await()方法是首先释放锁,然后阻塞,只有当signal()将其唤醒并重新获得锁才会继续向下运行。

Conditon的初始化

每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例

final ConditionObject newCondition() {
          return new ConditionObject();
  }

ConditionObject是AbstractQueuedSynchronizer(AQS)中的内部类。
ConditonObject类的属性:

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue.条件队列的头节点 */ 
        private transient Node firstWaiter;
        /** Last node of condition queue. 条件队列的尾节点*/
        private transient Node lastWaiter;
        
        ..............................
}

上一篇文章分析的是AQS的阻塞队列,这里引入了一个条件队列

从(https://www.javadoop.com/) 里偷来的图。。。。

node里的属性:

volatile int waitStatus; // 可取值 0、CANCELLED(1)、SIGNAL(-1)、CONDITION(-2)、PROPAGATE(-3)
volatile Node prev; //阻塞队列的前驱
volatile Node next;//阻塞队列的后继
volatile Thread thread;
Node nextWaiter; //条件队列的后继

不管是阻塞队列还是条件队列的节点,都是node的实例,因为条件队列中的节点需要移动到阻塞队列里。
一个 ReentrantLock 实例可以通过多次调用 newCondition() 来产生多个 Condition 实例,这里对应 condition1 和 condition2。每个condition就是一个条件队列。
根据这个图片再具体说明其简单流程:
1.线程1调用condition1.await()时首先释放锁,将当前线程1实例化成一个node加入条件队列condition1中,并在此阻塞。
2.线程2调用condition1.signal()将条件队列condition1中的firstWaiter(队头)移动到阻塞队列的队尾,等待获取锁,抢到锁之后condition1.await()才能返回,继续向下执行。

await()

public final void await() throws InterruptedException {
            //首先判断线程的中断状态,如果中断标志位为true抛出异常。
            if (Thread.interrupted())
                throw new InterruptedException();
            //将此线程实例成node加入条件队列。
            Node node = addConditionWaiter();
            //完全释放锁(完全释放的原因是可能存在可重入的情况),返回值是释放锁之前的state值。
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            //判断是否已经转移到阻塞队列(同步队列)中,不在的话在此处挂起阻塞。
            //退出循环有两种情况:1.节点被转移到阻塞队列中   2.响应中断,break退出循环
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //interruptMode代表不同的中断情况。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //无论哪种情况在while循环中退出,此节点一定已经在阻塞队列中,等待获取锁。
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            //进入这个if条件体里是因为此节点是通过中断进入到阻塞队列中的,需要调用一次unlinkCancelledWaiters(),清除在条件队列中已经取消等待的节点。
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            //根据中断的不同情况进行不同的操作。
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

addConditionWaiter()

private Node addConditionWaiter() {
            //判断当前线程是否是持有锁的线程,如果不是抛出异常。所以进行await()首先要获取到锁。
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //判断条件队列中节点是否取消排队通过(t.waitStatus != Node.CONDITION)来判断,如果尾节点取消排队,将队列中取消排队的节点清除。
            if (t != null && t.waitStatus != Node.CONDITION) {
                //从头节点遍历条件队列,把取消排队的节点清除。
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //把此线程实例化一个node,node.waitStatus=Node.CONDITION;node.thread=currentThread;
            Node node = new Node(Node.CONDITION);
            //把此node从队尾加入到条件队列中。
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

将此线程实例化成节点加入到条件队列中的过程:
1.判断此线程是否已经获取锁,没有获取锁抛出异常。
2.遍历条件队列,把已经取消排队的节点从队列中移除。
3.把此线程实例化成节点从队尾加入到条件队列中。

fullyRelease(Node node)

因为在await时首先是获取独占锁,所以在把节点加入条件队列之后需要把锁释放,才可以让其他线程获取锁进行signal。

final int fullyRelease(Node node) {
        try {
            int savedState = getState();
            //因为ReentrantLock是可重入锁,所以通过savedState作为参数完全释放锁,将state置为0。重入n次,savedState即为n。
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            //释放锁失败,会将节点设置为取消状态,并抛出IllegalMonitorStateException()异常
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }

isOnSyncQueue(node)

正常情况下完全释放锁之后节点没有被signal或者中断,所以不在阻塞队列中,将此线程在这里挂起。

while (!isOnSyncQueue(node)) {
                //不在阻塞队列中,线程在此挂起。
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
}

isOnSyncQueue判断是否在阻塞队列

final boolean isOnSyncQueue(Node node) {
        //节点的waitStatus== Node.CONDITION说明节点还没有移动到阻塞队列中,因为后面的signal方法将节点移动到阻塞队列中会把waitStatus置为0。
        //如果在阻塞队列中,节点的前驱一定不为null。
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        //node有后继,说明一定在阻塞队列中。因为next在阻塞队列中才会使用到。
        if (node.next != null) // If has successor, it must be on queue
            return true;
        //那么这里判断是否在阻塞队列中还剩一种情况,node.prev() != null ,但是这个情况并不能判断是否在阻塞队列中。
        //因为在入阻塞队列时,首先是node.prev=tail,然后通过CAS将自己设置为新的tail,但是这次CAS可能失败,所以通过findNodeFromTail(node)来判断。
        //此方法从阻塞队尾开始找,如果找到此节点,说明在阻塞队列中,返回true。
        return findNodeFromTail(node);
    }
//从阻塞队列队尾开始遍历寻找。
private boolean findNodeFromTail(Node node) {
        // We check for node first, since it's likely to be at or near tail.
        // tail is known to be non-null, so we could re-order to "save"
        // one null check, but we leave it this way to help the VM.
        for (Node p = tail;;) {
            if (p == node)
                return true;
            if (p == null)
                return false;
            p = p.prev;
        }
  }

从上面的循环中可知,不在阻塞队列时,isOnSyncQueue返回false,进入循环体,LockSupport.park(this);线程在此挂起,等待唤醒。接下来进行signal,但是signal只是把节点从条件队列中移动到了阻塞队列中,真正的唤醒其实还是从lock.unlock()这一步进行唤醒。

signal()

条件队列中的节点转移到阻塞队列

        public final void signal() {
            //和await一样,首先判断是否此线程已经获取锁,没有获取锁则抛出异常。
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
//从条件队列队头开始遍历,找出第一个符合转移条件的节点,因为有些节点会取消排队,但可能还在条件队列中。
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
 //从条件队列头节点开始转移,转移不成功就转移下一个,直到转移成功或者队列为null。
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
//把节点从条件队列转移到阻塞队列中。
    final boolean transferForSignal(Node node) {
        //CAS失败说明当前节点的waitStatus不是Node.CONDITION,即说明此节点取消,不需要进行转移,返回false。 否则将 waitStatus 置为 0。
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
        //将此节点加入阻塞队列的队尾。返回的p为此节点在阻塞队列的前驱。
        Node p = enq(node);
        int ws = p.waitStatus;
        //如果前驱节点>0,说明其前驱节点取消了排队。
        //果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            //进入此处说明前驱节点取消了排队或者CAS失败,直接从此处唤醒此线程。
            LockSupport.unpark(node.thread);
        return true;
    }

正常情况下不进入 if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))语句块中,所以正常情况下就是把节点从条件队列转移到阻塞队列中。如果其阻塞队列中的前驱节点取消等待或者CAS失败,那么直接唤醒此节点对应的线程。

唤醒后检查中断状态

在进行signal的操作时正常情况是把节点加入到阻塞队列中,但并没有唤醒对应挂起的节点,而是等待lock.unlock()释放锁并唤醒阻塞队列中的节点。

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                //唤醒后从这里继续运行,执行checkInterruptWhileWaiting(node)。
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
}

interruptMode可以取值为 REINTERRUPT(1),THROW_IE(-1),0。

  • REINTERRUPT: 代表 await 返回的时候,需要重新设置中断状态
  • THROW_IE: 代表 await 返回的时候,需要抛出 InterruptedException 异常
  • 0 :说明在 await 期间,没有发生中断

有三种情况会让LockSupport.park(this);返回,继续向下执行:

  1. 正常情况:signal—>把条件队列中的节点转移到阻塞队列—>lock.unlock()唤醒阻塞队列中的节点对应的线程。
  2. 线程中断:线程中断会使 LockSupport.park(this);立即返回,退出阻塞状态。
  3. 在进行signal时将节点转移到阻塞队列之后。其前驱节点取消排队或者CAS失败。

线程在被唤醒之后会进入if语句执行checkInterruptWhileWaiting(Node node),此方法用来判断在线程挂起期间是否发生中断,没有发生中断返回0,在signal之前发生中断返回THROW_IE(-1),在signal之后发生中断返回REINTERRUPT(1)。
方法Thread.interrupted():如果当前线程被中断,返回true,并把中断标志位设为false。即第二次访问时会返回false。

private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                 //发生中断,执行 (transferAfterCancelledWait(node)。否则直接返回0
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
}
final boolean transferAfterCancelledWait(Node node) {
 //如果进入这个if语句,说明中断是signal之前发生的。
//因为在signal过程中,会把node.waitStauts由Node.CONDITION改成0,所以如果这里CAS成功说明waitStatus还没有被signal改为0,即中断是signal之前发生的。
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            //把节点加到阻塞队列里,即使中断也会把节点放入阻塞队列中。
            //这里需要注意的是,在正常情况下signal将节点放入阻塞队列时会把 first.nextWaiter = null;即把此节点从条件队列中断开。
            //但是这里并没有从条件队列中断开,而是保留了其在条件队列中的结构。
            enq(node);
            return true;
        }
 //到这里即CAS失败,说明进行了signal将waitStatus置为0,但是signal可能还没有将节点转移到阻塞队列,所以这里自旋等待signal将节点转移到阻塞队列。
//这种情况是罕见的:signal执行后,没完成转移之前,发生了中断
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

中断也会将节点加入到阻塞队列,但是没有切断与条件队列的联系。因为如果从这里切断了与条件队列的联系,在正常的signal()转移节点时会找不到后面的节点,因为signal()转移时是根据node.nextWaiter找到下一个节点。

准备获取锁

退出while循环后说明节点一定进入了阻塞队列,准备换取锁。acquireQueued(node, savedState)进行自旋获取锁,没有获取成功从这里再次挂起。

if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

这里需要注意的是前面因为中断到达这一步时,此节点的线程可能并没有成功获取锁,所以需要再次从此处挂起。等待正常唤醒或者中断继续尝试获取锁。因为acquireQueued返回值是线程是否被中断,但是因为方法Thread.interrupted()返回后会将中断标志位置为false,即第二次会返回false。所以这里比较绕的有五种情况:

  • 通过正常lock.unlock()唤醒,进入acquireQueued(node, savedState)挂起,没有通过中断唤醒,返回false,interruptMode=0,继续向下执行。
  • 通过正常lock.unlock()唤醒,进入acquireQueued(node, savedState)挂起,通过中断唤醒,返回true,interruptMode != THROW_IE,所以将 interruptMode = REINTERRUPT;,也可以说明其是在signal之后中断的。
  • 第一次通过中断唤醒,进入acquireQueued(node, savedState)直接获取锁成功,acquireQueued(node, savedState)返回false,根据第一次中断设置的interruptMode来进行对应的中断处理。
  • 第一次通过中断唤醒,进入acquireQueued(node, savedState)后没有成功获取锁,挂起,通过lock.unlock()唤醒,acquireQueued(node, savedState)同样返回false,根据第一次中断设置的interruptMode来进行对应的中断处理
  • 第一次通过中断唤醒,进入acquireQueued(node, savedState)后没有成功获取锁,挂起,再次对其进行中断,这时acquireQueued(node, savedState)返回true,然后判断interruptMode的值进行对应的中断处理。

获取锁之后继续执行:

if (node.nextWaiter != null) // clean up if cancelled
    unlinkCancelledWaiters();
if (interruptMode != 0)
    reportInterruptAfterWait(interruptMode);

前面有提到,因为在signal之前进行中断时,也会将节点加入到阻塞队列中,但是并没有设置 node.nextWaiter = null,所以在这里会调用unlinkCancelledWaiters();清除条件队列中已经取消等待的节点。

如果发生过中断,那么interruptMode!=0,所以执行reportInterruptAfterWait(interruptMode);来处理不同的中断请求。
interruptMode的取值:

  • 0:什么都不做,没有被中断过;
  • THROW_IE:await 方法抛出 InterruptedException 异常,因为它代表在 await() 期间发生了中断,即在signal()之前发生中断;
  • REINTERRUPT:重新中断当前线程,因为它代表 await() 期间没有被中断,而是 signal() 以后发生的中断。
private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
}

这个方法执行完就彻底从await()返回,结束。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,036评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,046评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,411评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,622评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,661评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,521评论 1 304
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,288评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,200评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,644评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,837评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,953评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,673评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,281评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,889评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,011评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,119评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,901评论 2 355

推荐阅读更多精彩内容