JUC-AQS-ReentrantLock流程分析

JUC-AQS-ReentrantLock

  • 前置知识

公平锁和非公平锁
可重入锁
自旋锁
LockSupport
数据结构之链表
设计模式之模板设计模式

  • java.util.concurrent.locks 接口摘要
Package java.util.concurrent.locks

什么是AQS

AQS(Abstract Queue Synchronizor,抽象队列同步器)框架:通过一个volatile修饰的state来表示同步的状态,使用内置的一个FIFO队列来完成抢占资源的排队状态,把每个未抢占资源的线程封装成双向链表的一个节点,各线程通过CAS来修改state的值。整个过程都围绕着自旋锁、CAS和Support类提供的park/unpark来完成线程的睡眠与唤醒。

用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态.

CLH
  • 抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排人等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
  • 既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
  • 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSupport.park()的方式维护state变量的状态,使并发达到同步的效果。
image-20210511110103766
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.
image-20210511110810311

AQS内部维护属性volatile int state (32位)

  • state表示资源的可用状态

State三种访问方式

getState()、setState()、compareAndSetState()

AQS定义两种资源共享方式

  • Exclusive-独占,只有一个线程能执行,如ReentrantLock
  • Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch

AQS定义两种队列

  • 同步等待队列
  • 条件等待队列

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。

AQS内部体系架构

ReentrantLock

同步等待队列

AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。

image-20210511111956740

条件等待队列

Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁

image-20210511112030119

abstract void lock()

 abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
image-20210511114337010

对比公平锁和非公平锁的tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()

hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:

  1. 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
  2. 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一- 个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)
image-20210511114428109
image-20210511114837941
  • 模拟三个线程
public class AQSDemo {
    public static void main(String[] args) {
        ReentrantLock reentrantLock = new ReentrantLock();//非公平锁

        // A B C三个顾客,去银行办理业务,A先到,此时窗口空无一人,他优先获得办理窗口的机会,办理业务。
        // A 耗时严重,估计长期占有窗口
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("----come in A");
                //暂停50分钟线程
                try {
                    TimeUnit.MILLISECONDS.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } finally {
                reentrantLock.unlock();
            }
        }, "A").start();

        //B是第2个,B一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口。
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("----come in B");
            } finally {
                reentrantLock.unlock();
            }
        }, "B").start();


        //C是第3个,C一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口,前面是B顾客,FIFO
        new Thread(() -> {
            reentrantLock.lock();
            try {
                System.out.println("----come in C");
            } finally {
                reentrantLock.unlock();
            }
        }, "C").start();

    }
}

RUN> 🚗🚗🚗🚙🚙🚙

----come in A
----come in B
----come in C
image-20210511135857297

AQS 初始状态

image-20210511144350543
static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

         protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
}

ThreadA 来来抢占业务,当前state=0compareAndSetState(0, 1)设置成功

image-20210511144925008

主流程-分支1-tryAcquire

接下来B线程来抢占业务 lock-->acquire(1)--> tryAcquire(1) -->nonfairTryAcquire(1)

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }

    protected final boolean tryAcquire(int acquires) {
        return nonfairTryAcquire(acquires);
    }
}

主流程-分支2-addWaiter

image-20210511152029864
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

主流程-分支3-acquireQueued

addWaiter()入队成功后,接着acquireQueued(Node(ThreadB))

Acquires in exclusive uninterruptible mode for thread already in queue. Used by condition wait methods as well as acquire.
Params:
node – the node
arg – the acquire argument
Returns:
true if interrupted while waiting
final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();//前驱节点
                if (p == head && tryAcquire(arg)) {//哨兵节点等于head,则再去抢占锁tryAcquire()
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 自旋两次,返回true
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }


 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
image-20210511152839078
image-20210511153633671

shouldParkAfterFailedAcquire 自旋两次,返回true,接着parkAndCheckInterrupt

线程ThreadB--> LockSupport.park挂起成功!🚥🚥🚥

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }
  • C是第3个,C一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口,前面是B顾客,FIFO
new Thread(() -> {
    reentrantLock.lock();
    try {
        System.out.println("----come in C");
    } finally {
        reentrantLock.unlock();
    }
}, "C").start();

addWaiter(Node.EXCLUSIVE), arg) ThreadC的Node入队列

image-20210511154810627

acquireQueued

acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 内部自旋两次,第一次把前置节点waitStatus修改成Node.SIGNAL第二次,挂起当前线程Thread-C

image-20210511155604676

线程Thread-C入队完毕

image-20210511155947052
image-20210511182114373

unlock解锁流程

image-20210511183724863
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    Node s = node.next;//拿到傀儡节点的下一个节点 s;
    if (s == null || s.waitStatus > 0) {//s.waitStatus 不能于-1 ,从后往前找到最前面waitStatus<=0 的节点
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

唤醒节点的处理器

参数:哨兵节点(傀儡节点) h

1.拿到傀儡节点的ws状态值-1;

2.判断ws < 0 ? CAS设置ws的值为0;

3.拿到傀儡节点的下一个节点 s;

4.判断 s == null 或者 s.waitStatus > 0

5.如果s != null,LockSupport.unpark(s.thread)唤醒线程;

state已经改为0了,阻塞的线程被唤醒,回到accquireQueued的死循环加锁的流程

image-20210511194828117

经过线程A-> unparkSuccessor

image-20210511195301673

线程Thread-B唤醒,return Thread.interrupted();

image-20210511200327974

老哨兵退位,新哨兵上位,新的线程变成ThreadOwen

image-20210511201505945
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容