jdk10-ReentrantLock重入锁源码分析(下)

https://www.jianshu.com/p/d560c2d9ea8e
上一篇对ReentrantLock的lock和unlock方法做了详细的讲解这篇将接着讲解Condition,

1. condition 生产者消费者列子

condition 是依赖于 ReentrantLock 的,不管是调用 await 进入等待还是 signal 唤醒,都必须获取到锁才能进行操作。
每个 ReentrantLock 实例可以通过调用多次 newCondition 产生多个 ConditionObject 的实例:

public class ReentrantLock implements Lock, java.io.Serializable {
    public Condition newCondition() {
        return sync.newCondition();
    }
}

public class ConditionTest {
    final Lock lock = new ReentrantLock();
    final Condition producer = lock.newCondition();
    final Condition consumer = lock.newCondition();

    final int max = 100;
    final LinkedList<Integer> item = new LinkedList<>();

    // 生产
    public void put() throws InterruptedException {
        lock.lock();
        try {
            while (item.size() == max) {
                producer.await(); // 队列已满,生产者等待
            }
            item.add(new Random().nextInt());
            consumer.signal(); // 生产成功,通知消费者
        } finally {
            lock.unlock();
        }
    }

    // 消费
    public void take() throws InterruptedException {
        lock.lock();
        try {
            while (item.size() == 0) {
                consumer.await(); // 队列为空,消费者等待
            }
            System.out.println(item.poll());
            producer.signal(); // 被我消费掉一个,给生产者发个通知
        } finally {
            lock.unlock();
        }
    }
}

2. AQS.ConditionObject

我们可以看到ConditionObject中利用两个属性来组成条件队列

public class ConditionObject implements Condition, java.io.Serializable { 
        /** First node of condition queue. 条件队列第一个结点 */
        private transient Node firstWaiter;
        /** Last node of condition queue.  条件队列最后一个节点*/
        private transient Node lastWaiter;

        /** 代表 await 返回的时候,需要重新设置中断状态 */
        private static final int REINTERRUPT =  1;
        /** 代表 await 返回的时候,需要抛出 InterruptedException 异常 */
        private static final int THROW_IE    = -1;
}

3. 线程1调用 ConditionObject.await()

执行方法链

1、ConditionObject.await(),线程1在获取到锁后,调用await()

  1. 包装线程为node加入到条件等待队列
  2. 释放锁
  3. 线程1节点不在AQS同步等待队列中,阻塞当前线程,
    (signal会把线程移到AQS队列中,就跳出死循环)
  4. 线程1先执行到此结束
ConditionObject::
        public final void await() throws InterruptedException {
            if (Thread.interrupted()) // 中断抛出异常退出
                throw new InterruptedException();
1、包装当前线程node加入到条件队列
            Node node = addConditionWaiter();
2、释放锁,返回释放之前的状态值(这个方法锁释放)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
3、如果不是在同步队列中,一直while
            while (!isOnSyncQueue(node)) {
4、阻塞当前线程
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

2、 ConditionObject.addConditionWaiter(),此方法是在还没释放锁的情况下,调用所以线程安全

获取条件队列最后一个节点,如果最后一个节点是null,头结点加入,不是null ,t.nextWaiter=node,重置lastWaiter

  1. 获取最后一个节点 t,如果 t == null,头结点加入队列,firstWaiter指向node
  2. 如果 t != null ,t.nextWaiter 指向node
  3. 从新使lastWaiter 指向当前线程node
ConditionObject ::

        private Node addConditionWaiter() {
1、ReentrantLock.isHeldExclusively() 是否是当前线程持有锁
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node t = lastWaiter;
2、 如果最后节点是取消状态,清理条件队列,(因为此次是线程1所以为null)
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
1、包装当前线程为node
            Node node = new Node(Node.CONDITION);
2、加入到条件队列
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
3、last指向最新节点
            lastWaiter = node;
            return node;
        }

        protected final boolean isHeldExclusively() {
            return getExclusiveOwnerThread() == Thread.currentThread();
        }
线程执行后队列情况

3、 AQS.fullyRelease(node) 完全释放锁

  1. 获取当前线程的状态
  2. 调用AQS.release(int) 这个方法就是ReentrantLock.unlock()调用的那个
  3. 释放成功返回释放之前的值
AQS :: 
    final long fullyRelease(Node node) {
        try {
            long savedState = getState();
            if (release(savedState))
                return savedState;
            throw new IllegalMonitorStateException();
        } catch (Throwable t) {
            node.waitStatus = Node.CANCELLED;
            throw t;
        }
    }

4、 AQS.isOnSyncQueue() 如果不在AQS同步等待队列中返回false

  1. 第一个判断,状态是condition := 条件队列中,和前驱节点不存在:=在条件队列中
  2. 只有同步等待队列 next才不会为null
    可以通过判断 node.prev() != null 判断node 在阻塞队列吗? 不能
    AQS 的入队方法,首先设置的是 node.prev 指向 tail,然后cas可能失败
  1. 经过上面两步的判断可以明显的知道 节点在AQS的末尾,
    从后到前遍历同步等待队列查看是否在队列中
AQS ::
    final boolean isOnSyncQueue(Node node) {
1、状态是condition,和前驱节点不存在,那么节点不会再同步等待队列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
2、只有同步等待队列 next才不会为null
        if (node.next != null) // If has successor, it must be on queue
            return true;
        return findNodeFromTail(node);
    }
    private boolean findNodeFromTail(Node node) {
        for (Node p = tail;;) {
            if (p == node)
                return true;
            if (p == null)
                return false;
            p = p.prev;
        }
    }

4. ConditionObject.signal() 唤醒线程1

移动第一个线程(等待最长时间的)到AQS等待队列中,并且获取锁

  1. 获取第一个线程节点,非null,调用doSignal(first)
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

1、doSignal(first)

  1. 首先重置 first.nextWaiter = null; 把此节点 清理出条件队列
  2. transferForSignal把节点移到AQS队列返回true 退出循环
  3. 假如移动失败(节点被中断已经移动过了),继续循环移动下一个
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
1、 清理节点
                first.nextWaiter = null;
            } while (  !transferForSignal(first) &&  (first = firstWaiter) != null  );
        }

    final boolean transferForSignal(Node node) {
        /*
         * 如果失败,说明线程可能已经中断
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
1、ws > 0 说明 node 在阻塞队列中的前驱节点取消了等待锁,直接唤醒 node 对应的线程
2、如果 ws <= 0, 那么 compareAndSetWaitStatus 将会被调用,上篇介绍的时候说过,节点入队后,需要把前驱节点的状态设为 Node.SIGNAL(-1)
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

上面介绍了正常的 await(),signal()过程,下面我们分析,特殊情况

5. 线程1调用 await() 被其他线程中断执行流程

中断执行流程

1、ConditionObject.await(),线程1在获取到锁后,调用await()

  1. 包装线程为node加入到条件等待队列
  2. 释放锁
  3. 线程1节点不在AQS同步等待队列中,阻塞当前线程,
    (signal会把线程移到AQS队列中,就跳出死循环)

中断后的执行流程:

  1. 中断 --> 重置中断状态,加入到AQS等待队列 interruptMode = THROW_IE,跳出循环 注意 没有清理节点
  2. acquireQueued获取到锁
  3. unlinkCancelledWaiters上面《中断》并不会把自己清理出条件队列,所以清理取消的节点
  4. 根据interruptMode 判断是否抛出异常还是设置中断状态(第4步清理掉了中断状态)
    public final void await() throws InterruptedException {
            if (Thread.interrupted()) // 中断抛出异常退出
                throw new InterruptedException();
1、包装当前线程node加入到条件队列
            Node node = addConditionWaiter();
2、释放锁,返回释放之前的状态值(这个方法锁释放)
            int savedState = fullyRelease(node);
            int interruptMode = 0;
3、如果不是在同步队列中,一直while
            while (!isOnSyncQueue(node)) {
4、阻塞当前线程
                LockSupport.park(this);
5、中断判断,跳出循环,加入到AQS等待队列  interruptMode  = THROW_IE
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
6. interruptMode   = THROW_IE 所以继续走,acquireQueued获取到锁
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
7. 上面中断并不会把自己清理出条件队列,所以清理取消的节点
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
8. 根据interruptMode 判断是否抛出异常还是设置中断状态
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

2、checkInterruptWhileWaiting(node)

  1. 没有中断:return 0,signalled之前中断:THROW_IE ,之后中断:REINTERRUPT
  2. 特别注意 node.compareAndSetWaitStatus(Node.CONDITION, 0),失败表示signal已经移动节点,这样防止并发
        /**
         * 没有中断:0,signalled之前中断:THROW_IE ,之后中断:REINTERRUPT 
         */
        private int checkInterruptWhileWaiting(Node node) {
            return Thread.interrupted() ?
                (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
                0;
        }

    final boolean transferAfterCancelledWait(Node node) {
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        /*
         * 到这里是因为 CAS 失败,肯定是因为 signal 方法已经将 waitStatus 设置为了 0
         * signal 方法会将节点转移到阻塞队列,但是可能还没完成,这边自旋等待其完成
         * 当然,这种事情还是比较少的吧:signal 调用之后,没完成转移之前,发生了中断
         */
        while (!isOnSyncQueue(node))
            Thread.yield();
        return false;
    }

3、acquireQueued(node, savedState) 获取锁

这个就不说了,上篇有讲解

4、unlinkCancelledWaiters();

等待队列是一个单向链表,遍历链表将已经取消等待的节点清除出去

  1. Node t :作为当前需要清理的线程节点
  2. Node trail :作为Node t 的上一个节点,链表的上一个节点
  3. 循环 t=next;
  4. 如果 t 状态不是在条件队列中
  • 清理 t 指向下一个节点的引用
  • 把 t 节点的上一个节点的next指向t的next,
        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter; 
                1、t = 线程1,如果状态不等于CONDITION,需要清理
                if (t.waitStatus != Node.CONDITION) {
                    2、清理 t 的 next引用
                    t.nextWaiter = null;
                    3、清理 t 的上一个节点,指向t的引用
                    if (trail == null)
                        firstWaiter = next;
                    else
                        trail.nextWaiter = next;
                    4、next=null 说明遍历完毕,把lastWaiter指向t的上一个节点
                    if (next == null)
                        lastWaiter = trail;
                } else {
                    trail = t;                
                }

                5、每次while循环把t重置指向他的next节点
                t = next;
            }
        }

5、reportInterruptAfterWait(interruptMode);

判断是抛出异常,还是设置中断状态

        private void reportInterruptAfterWait(int interruptMode)
            throws InterruptedException {
            if (interruptMode == THROW_IE)
                throw new InterruptedException();
            else if (interruptMode == REINTERRUPT)
                selfInterrupt();
        }

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

6.假如线程中断的时候 ,又执行了唤醒 transferForSignal 移动节点

  1. 我们可以看到在入队之前,都先使用cas,保证了节点不会重复加入AQS队列
  2. 我上文中也有特别注意,表明:中断和signal同时发生的时候,处理方式
    final boolean transferForSignal(Node node) {
        /*
         * 如果失败,说明线程可能已经中断
         */
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;
    }

    final boolean transferAfterCancelledWait(Node node) {
        if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) {
            enq(node);
            return true;
    }

7. signalAll

可以很简单的看到do while循环移动节点

        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
    final boolean transferForSignal(Node node) {
        if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

8. awaitUninterruptibly() 不抛出异常 InterruptedException

加入队列,阻塞线程,获取锁,不过多介绍

        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if (Thread.interrupted())
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)
                selfInterrupt();
        }

9. 具备超时的await

三个方法都是和上面的await差不多,就是多个时间判断

  1. public final long awaitNanos(long nanosTimeout) throws InterruptedException
  2. public final boolean awaitUntil(Date deadline) throws InterruptedException
  3. public final boolean await(long time, TimeUnit unit) throws InterruptedException
public final long awaitNanos(long nanosTimeout)  throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // We don't check for nanosTimeout <= 0L here, to allow
            // awaitNanos(0) as a way to "yield the lock".
1、过期时间=当前时间 + 等待时长
            final long deadline = System.nanoTime() + nanosTimeout;
            long initialNanos = nanosTimeout;

            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;

            while (!isOnSyncQueue(node)) {
2、时间到了
                if (nanosTimeout <= 0L) {
3、取消等待 transferAfterCancelledWait(node) 移动节点到 AQS等待队列
                    transferAfterCancelledWait(node);
                    break;
                }
4、如果等待时间大于 自旋阈值  ,阻塞
                if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
5、检测中断
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
6、等待时间 = 过期时间-当前时间
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
7、剩余时间
            long remaining = deadline - System.nanoTime(); // avoid overflow
            return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
        }
        public final boolean await(long time, TimeUnit unit)
                throws InterruptedException {
            long nanosTimeout = unit.toNanos(time);
            if (Thread.interrupted())
                throw new InterruptedException();
            // We don't check for nanosTimeout <= 0L here, to allow
            // await(0, unit) as a way to "yield the lock".
            final long deadline = System.nanoTime() + nanosTimeout;
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (nanosTimeout <= 0L) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
                    LockSupport.parkNanos(this, nanosTimeout);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
                nanosTimeout = deadline - System.nanoTime();
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }
        public final boolean awaitUntil(Date deadline)
                throws InterruptedException {
            long abstime = deadline.getTime();
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            boolean timedout = false;
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                if (System.currentTimeMillis() >= abstime) {
                    timedout = transferAfterCancelledWait(node);
                    break;
                }
                LockSupport.parkUntil(this, abstime);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
            return !timedout;
        }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容