2022-02-10 condition await和singal

消费者和生产者的例子

public class ConditionDemo {
    private static final Queue<Apple> queue = new LinkedList();
    private static final Lock lock = new ReentrantLock(false);
    private static final Condition customer = lock.newCondition();
    private static final Condition producer = lock.newCondition();

    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        produce();
                    }
                }

                private void produce() {

                    lock.lock();
                    try {
                        while (queue.size() == 5) {
                            System.out.println("队列已满,无法添加");
                            producer.await();
                        }
                        queue.offer(new Apple(1));
                        System.out.println(Thread.currentThread().getName() + "生产了一个苹果,目前有" + queue.size() + "个苹果");
                        customer.signal();
                        Thread.sleep(200);
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }, i + "号生产者").start();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        custome();
                    }
                }

                private void custome() {

                    lock.lock();
                    try {
                        while (queue.size() == 0) {
                            System.out.println("队列为空,无法消费");
                            customer.await();
                        }
                        queue.poll();
                        System.out.println(Thread.currentThread().getName() + "消费了一个苹果,目前有" + queue.size() + "个苹果");
                        producer.signal();
                        Thread.sleep(200);

                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        lock.unlock();
                    }
                }
            }, i + "号消费者者").start();
        }
    }
}

逐行看await方法

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

能够执行到这里证明当前线程肯定是获取到了锁,也就是说已经lock.lock()住了
条件队列是condition中的队列 每次await()之后都会把线程放到条件队列里面,每次signal()之后就会把条件队列里面的firstNode加入到阻塞队列里面
阻塞队列就是lock里面的队列
这个方法挺绕的 要先将node放到条件队列里面,然后释放锁,最后把线程park住,等着signal调用

        public final void await() throws InterruptedException {
            //判断是不是中断了 这里不是很重要
            if (Thread.interrupted())
                throw new InterruptedException();
            //将当前线程 包装成一个node放到条件队列里面
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

考虑几种情况
lock.newCondition() 实际是创建了一个ConditionObject对象 这个对象有两个属性一个是firstWaiter 一个是 lastWaiter 俩都是node
这俩node都是sqs里面的node
1.条件队列没有其他node

        private Node addConditionWaiter() {
            //这时first和last 都是null
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //跳过这个判断
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建一个node 并且设置waitStatus = -2 1是cancel取消 初始化不填是0 -1是signal 只有是-1时才能唤醒aqs里面的内容
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //t确实是null
            if (t == null)
                //firstWaiter赋值
                firstWaiter = node;
            else
                t.nextWaiter = node;
            //lastWaiter赋值
            lastWaiter = node;
            return node;
        }

此时条件队列应该是 node->null

2.条件队列里面已经有两个了 node1->node2->null first是node1 last是node2

        private Node addConditionWaiter() {
            //t现在是node2
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            //假如 node2 的状态不是CONDITION 里面这个unlink方式是清楚调这些不是状态不是condition的node 倒着清除?
            //假如 状态是CONDITION那么直接跳过这个方法
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //创建一个node 并且设置waitStatus = -2 1是cancel取消 初始化不填是0 -1是signal 只有是-1时才能唤醒aqs里面的内容
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //t现在是last了不是null
            if (t == null)
                //firstWaiter赋值
                firstWaiter = node;
            else
                //把node 赋值给t 也就是last的nextWaiter
                t.nextWaiter = node;
            //最后把last指向node
            lastWaiter = node;
            return node;
        }

这之后node就被加入到了条件队列里面了
但是node 还是拿着锁 需要释放锁
int savedState = fullyRelease(node);
//不考虑可重入的问题

final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            //这里返回的值为1
            int savedState = getState();
            //release()方法就是释放锁的那个方法 主要做了两件事
            //第一是吧  (tryRelease(arg)) -> ExclusiveOwnerThread这个属性置为空 然后把state设为0 不考虑可重入
            //释放这个锁之后 这时head还是它本身 if (h != null && h.waitStatus != 0) unparkSuccessor(h); 
            //head不为null 然后node的状态也不是0 0是初始化的状态 没有什么以外的话 这时应该是-1 因为-1是后继节点加入的时候给设置的
            //unparkSuccessor(h)就是第二件事 唤醒后继
            //这里比较难以理解的是 那这个head怎么处理 后边 后继节点被唤醒后会从park哪里继续运行
            //然后 acquireQueued(...)这个方法这里重新设置为新的节点(也就是这个被唤醒的后继)
            if (release(savedState)) {
                //释放成功 进入这个判断
                failed = false;
                //返回1 
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            //最后不会走这里
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

程序接着会 回到这里 从翻译可以看出是判断它是不是进入了同步队列,这个就是之前说的阻塞队列
如果正常走 这时候肯定是没有在阻塞队列里面

            while (!isOnSyncQueue(node)) {
                //如果返回false 那么这里就会park住这个node 也就是在这里阻塞了
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

这个方法可以看出 如果是正常node 那么刚刚进入条件队列时这个状态就是condition 并且前驱也是空的
这里node的前驱和后继是在阻塞队列里面的前驱和后继 条件队列里面只有一个nextWaiter 还是node对象
正常情况下返回false
如果不是的话 比如进入到第二个判断 后边源码写的很清楚 如果它存在后继那么肯定是在阻塞队列里面了

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
        }

这里猜想一下 如果想跳出这个循环 那么肯定是需要signal()方法唤醒 ,然后改变某些属性,才能继续;
看不懂了后边

            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

signal()方法 这个方法是从条件队列里面唤醒上边被park住的node

        public final void signal() {
            //方法是判断是不是当前线程调用的
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

考虑两种情况吧 一种是条件队列里面就一个node 既是first也是last

        private void doSignal(Node first) {
            do {
                //这种情况下 是first的nextwaiter是null  firstwaiter这之后也就成了null  first不是null
                if ( (firstWaiter = first.nextWaiter) == null)
                    //把last置位 null
                    lastWaiter = null;
                //最后first的next也为null
                first.nextWaiter = null;
                //first之后就成了一个孤岛了 如果想要跳出去 那就必须下面两个判断有一个是false
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

程序走到这里 正常情况下是希望返回true的

    final boolean transferForSignal(Node node) {
        /*
         * 如果不能改变状态 那么这个node 就是被取消了
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        //上边变更成功后走到这里 这一段和之前进入条件队列类似  就是一直添加到队尾然后把前一个不是取消状态的node的状态改为-1
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            //改好以后唤醒这个node,这个node 封装的线程继续从 之前 while (!isOnSyncQueue(node)) {park()}的这里运行
            //它运行它的 这个signal方法应该是其他线程调用的 返回true 就行了 
            LockSupport.unpark(node.thread);
        return true;
    }

这里想了半天
从park哪里开始以后 可以看到 其实它是在生产者消费者代码那个await方法哪里继续的
那它是怎么抢到锁的呢

            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

第一个if里面的第一个判断就是抢锁的过程 而且这个savedState是之前存的状态
这里就明白多了如果是可重入这里可能是大于1的 然后 这里抢锁成功再设置进去
后边那几个就不看了 看不懂
这个acquireQueue抢到锁 然后增加一个苹果通知消费者的条件队列 最后再释放

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,701评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,649评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,037评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,994评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,018评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,796评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,481评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,370评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,868评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,014评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,153评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,832评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,494评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,039评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,437评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,131评论 2 356

推荐阅读更多精彩内容