深入解析AbstractQueuedSynchronizer源码1-Condition源码

简介和对比Object监视器方法对比

任何一组对象都用一组监视器方法,主要就是wait和notify方法,这些方法与synchronized关键字一起使用,实现等待/通知模式。juc也实现了类似Object的监视器方法,就是Condition,与lock可以配合使用实现等待/通知模式。功能还是有些差异,下面是Object监视器方法与Condition监视器方法功能对比。

对比项 Object Condition
实现条件 获取Synchronized锁 获取Lock锁
等待队列个数 只有一个,即一个Synchronized锁下,只有一个等待队列 多个,Lock可以new多个Condition
线程释放锁进入等待状态,是否支持中断 不支持 支持
线程释放锁进入等待状态,是否支持等待到具体某个时间 不支持 支持

以上是Object监视器方法与Condition监视器方法的主要区别,可以看出Condition监视器方法功能更多一点。

简单实现

分别用Object监视器方法和Condition监视器方法实现一个简单的阻塞队列

Object监视器方法

    private Object a = new Object();

    private Object b = new Object();

    private int count = 5;

    private List<String> list = new ArrayList<>(5);

    public void add(String str) {

        synchronized (a) {
            if (list.size() == count) {
                try {
                    a.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        synchronized (this) {
            list.add(str);
        }
        synchronized (b) {
            try {
                b.notifyAll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    public String get() {

        synchronized (b) {
            if (list.size() == 0) {
                try {
                    b.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        String str = "";
        synchronized (this) {
            str = list.get(0);
            list.remove(0);
        }
        synchronized (a) {
            try {
                a.notifyAll();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return str;
    }

Condition监视器方法

    private ReentrantLock lock = new ReentrantLock();

    private Condition a = lock.newCondition();

    private Condition b = lock.newCondition();

    private List<String> list = new ArrayList<>(5);

    private int count = 5;

    public void add(String str) {

        try {
            lock.lock();
            if (list.size() == count) {
                a.await();
            }
            list.add(str);
            b.signalAll();
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }
    }

    public String get() {

        try {
            lock.lock();
            if (list.size() == 0) {
                b.await();
            }
            String str = list.get(0);
            list.remove(0);
            a.signalAll();
            return str;
        } catch (Exception e) {

        } finally {
            lock.unlock();
        }
        return null;
    }

从以上代码中可以看出Object监视器方法只有一个等待队列,因此需要定义两组Object监视器方法来实现生产者-消费者pv操作,而Condition支持多个等待队列,因此不用定义多组,直接用一个锁就行。

源码分析

本文主要分析aqs中的实现,即AbstractQueuedSynchronizer.ConditionObject
具体的一些方法即描述如下:

方法 描述
signal 唤醒一个等待在Condition上的线程
signalAll 唤醒所有等待在Condition上的线程
awaitUninterruptibly 线程进入等待队列等待被唤醒,不响应中断
await() throws InterruptedException 线程进入等待队列等待被唤醒,响应中断,抛出中断异常
awaitNanos(long nanosTimeout) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待nanosTimeout纳秒,响应中断,抛出中断异常
awaitUntil(Date deadline) throws InterruptedException 线程进入等待队列等待被唤醒,最多等待到该具体时间,响应中断,抛出中断异常

简单介绍

深入解析方法实现前先简单介绍实现原理AbstractQueuedSynchronizer共有两个等待队列,Sync Queue和Condition Queue,Lock的加锁和wait,signal之间就是两个等待队列转换的过程,同Object监视器方法一样,Condition运行wait和signal方法时同样要先获取锁才能执行。
wait方法:把当前获的锁线程封装成Node节点放到Condition Queue中,然后释放锁资源。
signal方法:把Condition Queue首节点拿到Sync Queue中,完成之后释放锁资源,然后唤醒Sync Queue中的Node节点。

signal 和 signalAll

        public final void signal() {
             //健康检查,由Lock自己实现,比如ReentrantLock,就是检查当前线程是否是拥有锁的线程
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //唤醒等待队列中的第一个节点
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }

doSignal 和doSignalAll

        private void doSignal(Node first) {
            do {
                // 这里的操作主要是把first节点从Condition Queue中摘出来,如果队列空了,设置lastWaiter为null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) && // 唤醒该节点,即从Condition Queue转移到Sync Queue中
                     (first = firstWaiter) != null);
        }

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            // 循环唤醒所有节点
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

transferForSignal

    final boolean transferForSignal(Node node) {
        /*
         * cas改变CONDITION到初始状态,如果没成功,唤醒失败
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // enq方法是将node节点插入到Sync Queue中
        Node p = enq(node);
        int ws = p.waitStatus;
        // 如果节点被取消即(ws > 0 )或者设置需要被唤醒状态SIGNAL失败,就唤醒线程
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

以上是Condition的signal相关方法,唤醒操作,总结一点就是把Condition Queue中节点拿到Sync Queue中,设置成被需要被唤醒状态,等待被唤醒。

中断标志

简单介绍下中断标志,因为Condition的wait支持中断,所以中断很重要

        /**
        * 这两种的区别是是否是在signal后中断的,THROW_IE是唤醒前中断,需要抛出中断异常,REINTERRUPT是被唤醒后中断
        **/
        /** Mode meaning to reinterrupt on exit from wait 意思是wait退出前自我中断,不需要抛出中断异常 */
        private static final int REINTERRUPT =  1;
        /** Mode meaning to throw InterruptedException on exit from wait  退出前需要抛出中断异常*/
        private static final int THROW_IE    = -1;

awaitUninterruptibly

该方法不支持中断异常,实现比较简单,可以参照wait方法理解

        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

wait()throws InterruptedException

wait方法支持中断

        public final void await() throws InterruptedException {
        // 判断下中断状态
        if (Thread.interrupted())
            throw new InterruptedException();
        //封装node节点到Condition Queue中
        Node node = addConditionWaiter();
        // 释放锁资源,唤醒其他等待锁资源线程
        int savedState = fullyRelease(node);
        int interruptMode = 0;
        // 等待是否在Sync Queue中,即是否被其他线程signal唤醒,如果没有一直进行阻塞,指导被唤醒进入Sync Queue队列中
        while (!isOnSyncQueue(node)) {
            LockSupport.park(this);
            // 判断下中断类型,如果有中断且已经被signal唤醒,即节点在Sync Queue中,是THROW_IE,
            //否则是REINTERRUPT,或者是0即没有中断,代码不深入理解,可以自己去看
            // 这里被中断唤醒,而不是signal之后被唤醒,会在Condition Queue中和Sync Queue中都存在
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
        }
        // acquireQueued是循环的独占方式获取锁资源,即已经被唤醒放到Sync Queue中,
        //需要独占的方式再获取锁资源,返回值是是否被中断
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        // 这里判断是否是被中断唤醒的,如果是中断唤醒的,
        //需要把Condition Queue中已被取消的节点清理掉,防止垃圾存在
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            // 最后就是如果有中断,处理下中断,如果是THROW_IE中断模型,抛出中断异常
            reportInterruptAfterWait(interruptMode);

checkInterruptWhileWaiting 和 transferAfterCancelledWait

       private int checkInterruptWhileWaiting(Node node) {
            // 这里就是判断下线程是否被中断,如果中断,判断中断类型
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }
 // 这个函数的主要是判断节点被取消是否在signal唤醒之前,
final boolean transferAfterCancelledWait(Node node) {
       // 如果节点的状态是CONDITION,说明还没被signal唤醒,
      //依然在Condition Queue中,然后返回插入到Sync Queue中返回true,
      //这里说明了在signal唤醒之前被中断,两个等待队列都会存在节点
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        // 否则说明已经被signal唤醒,返回false
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

awaitNanos(long nanosTimeout)throws InterruptedException

该方法和wait方法基本差不多,就多了个超时等待

        public final long awaitNanos(long nanosTimeout)
                throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        Node node = addConditionWaiter();
        int savedState = fullyRelease(node);
        final long deadline = System.nanoTime() + nanosTimeout;
        int interruptMode = 0;
        while (!isOnSyncQueue(node)) {
            // 这里如果超时直接取消,然后退出等待
            if (nanosTimeout <= 0L) {
                transferAfterCancelledWait(node);
                break;
            }
            if (nanosTimeout >= spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                break;
            nanosTimeout = deadline - System.nanoTime();
        }
        if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
            interruptMode = REINTERRUPT;
        if (node.nextWaiter != null)
            unlinkCancelledWaiters();
        if (interruptMode != 0)
            reportInterruptAfterWait(interruptMode);
        return deadline - System.nanoTime(); //返回值表明是否是超时还是被唤醒

剩下的两个与时间相关的方法 public final boolean awaitUntil(Date deadline)和public final boolean await(long time, TimeUnit unit)方法和 awaitNanos(long nanosTimeout)方法基本一样,不做深入理解。

总结

以上是全部的Condition理解,一句话,Condition就是锁状态的一种中间状态转换队列,实现了等待/通知模式

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容