java源码 - ReentrantLock之FairSync

开篇

 这篇文章主要是讲解FairSync公平锁的源码分析,整个内容分为加锁过程、解锁过程,CLH队列等概念。
 首先一直困扰我的CLH队列的CLH的缩写我终于明白,看似三个人的人名的首字符缩写"CLH" (Craig, Landin, andHagersten)。
 加锁过程主要核心逻辑在于尝试获取锁,获锁失败后进入等待队列,以及进入等待队列的过程是需要进行多次循环判断的。
 解锁过程相对加锁过程会简单许多,核心逻辑在释放锁、唤醒下一个等待线程两个过程。
  CLH的概念在加锁过程已经提及了,可以一并看看。

java源码 - ReentrantLock
java源码 - ReentrantLock之FairSync
java源码 - ReentrantLock之NonfairSync
java源码 - ReentrantLock图解加锁过程


加锁过程

ReentrantLock的的锁过程如下:

  • 1、先尝试获取锁,通过tryAcquire()实现。
  • 2、获取锁失败后,线程被包装成Node对象后添加到CLH队列,通过addWaiter()实现。
  • 3、添加CLH队列后,逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。

tryAcquire的操作流程
1、如果锁未占用的情况下:判断当前线程是否处于CLH的首位,如果位于首位就通过原子更新操作设置锁占用。
2、如果锁被占用的情况下:判断当前线程是否是占用锁线程,如果是则实现锁的可重入功能,设置锁占用次数。

    static final class FairSync extends Sync {
        
        // lock的入口,内部调用acquire方法实现加锁操作
        final void lock() {
            // lock的入口
            acquire(1);
        }

      public final void acquire(int arg) {
          // 第一步尝试获取锁,成功则返回
          // 获取锁失败后通过addWaiter添加到CLH队列的末尾
          // 通过acquireQueued判断是否轮到自己唤醒了
          // 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了
          if (!tryAcquire(arg) &&
              acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
              selfInterrupt();
      }
  
        // acquires的参数值为1
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            // 获取当前锁状态,0表示锁未占用,>0表示被占用
            int c = getState();
            if (c == 0) {
                // 首先判断是不是CLH队列的第一个元素,没有祖先则表示第一个元素
                // 然后从unsafe把state设置为1,表示锁被占用
                // 设置锁占用线程为当前线程
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    // 设置锁占用线程为当前线程
                    setExclusiveOwnerThread(current);
                    // 返回锁占用成功
                    return true;
                }
            }
            // 判断锁占用线程是不是本线程,说明是可重入锁
            else if (current == getExclusiveOwnerThread()) {
                // 重入锁增加锁定次数
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                // 设置state会占用次数
                setState(nextc);
                // 返回锁占用成功
                return true;
            }

            // 否则返回锁占用失败
            return false;
        }
    }

acquire的操作流程

  • 1、第一步通过tryAcquire()尝试获取锁,成功则返回
  • 2、获取锁失败后通过addWaiter添加到CLH队列的末尾
  • 3、添加CLH队列后,通过acquireQueued()方法逐步的去执行CLH队列的线程,如果当前线程获取到了锁则返回;否则当前线程进行休眠,直到唤醒并重新获取锁后返回。
    public final void acquire(int arg) {
        // 第一步尝试获取锁,成功则返回
        // 获取锁失败后通过addWaiter添加到CLH队列的末尾
        // 通过acquireQueued判断是否轮到自己唤醒了
        // 可以理解为之前没获取锁但是等执行到这里的时候可能锁已经释放了
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter的操作流程
1、将当前线程包装成Node对象。
2、先尝试通过快速失败法尝试在CLH队尾插入Node对象
3、如果快速插入失败后那么就通过enq方法在CLH队尾插入Node对象

    private Node addWaiter(Node mode) {
        // 将线程包装成为Node对象,便于添加CLH队列
        Node node = new Node(Thread.currentThread(), mode);

        // 先尝试快速插入到CLH队尾,插入成功就返回Node对象
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }

        // 快速插入CLH队尾失败后,通过enq方法实现
        enq(node);
        return node;
    }

    // 将Node节点插入CLH队尾的实现
    private Node enq(final Node node) {
        for (;;) {
           
            Node t = tail;

            // 如果CLH队列为空,那么设置Head和Tail都为Node
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                // 通过unsafe来保证Node插入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

acquireQueued的操作流程

  • 1、如果当前节点Node的前驱节点属于head,当前节点属于老二地位通过tryAcquire()尝试获取锁,获取成功后那么就释放原head节点(可以理解为head已经释放锁然后从CLH删除),把当前节点设置为head节点。
  • 2、通过shouldParkAfterFailedAcquire()方法判断Node代表的线程是否进入waiting状态,直到被unpark()。
  • 3、parkAndCheckInterrupt()方法将当前线程进入waiting状态。
  • 4、休眠线程被唤醒的时候会执行 if (p == head && tryAcquire(arg))逻辑判断
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                // 先判断节点的祖先
                final Node p = node.predecessor();
                // 如果前驱是head,即该结点已成老二,
                // 那么便有资格去尝试获取资源,
                // tryAcquire成功说明head已经释放锁
                // 休眠线程被唤醒的时候会继续执行这里
                if (p == head && tryAcquire(arg)) {
                    // 设置当前节点为head节点
                    setHead(node);
                     // 释放原head节点用于gc回收
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }

                // 如果自己可以休息了,就进入waiting状态,直到被unpark()
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

shouldParkAfterFailedAcquire的操作流程

  • 1、如果前置节点处于SIGNAL状态,那么当前线程进入阻塞状态,返回true
  • 2、如果前置节点处于ws>0也就是取消状态,那么当前线程节点就往前查找第一个状态处于ws<=0的节点
  • 3、如果前置状态ws=0的节点,那么就把前置节点设置为SIGNAL状态
  • 4、整个shouldParkAfterFailedAcquire函数是在for()循环当中循环执行的,我们可以想象按照步骤2->3->1的顺序执行,按照前置遍历寻找合适的前置节点,接着发现前置节点ws状态为0后重新设置为SIGNAL,最后发现前置节点状态为SINGAL后休眠线程自身。
  • 5、线程从运行态进入waiting状态其实也是经历了一系列的处理过程的。
    // shouldParkAfterFailedAcquire外层for循环调用
    // 第一次设置Node前置节点状态为SIGNAL
    // 下一次循环就前置节点庄为SIGNAL,那么线程自身就需要被阻塞了
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
       
       // 如果前继节点是SIGNAL状态,则意味这当前线程需要被阻塞。此时,返回true。
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            return true;

        // ws>0代表线程被取消了
        // static final int CANCELLED =  1;
        //  waitStatus value to indicate thread has cancelled
        if (ws > 0) {
            // 如果前驱处于取消状态,那就一直往前找,直到找到最近一个正常等待的状态,并排在它的后边
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
            // 状态为0的情况只可能是初始化的时候的默认值
            // 当前线程进入等待状态的时候需要设置前置状态为SIGNAL
            // SIGNAL状态表示后置线程需要被唤醒
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() {
        // parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
        LockSupport.park(this);
        return Thread.interrupted();
    }

Node的介绍

  • 1、Node节点作为CLH队列的节点元素,内部包含线程对象
  • 2、Node节点包含多种状态,每种状态都在源码中注释了,默认初始化应该为0
  • 3、Node节点是一个双向列表的节点,包含前置和后置节点的指针
  • 4、Node节点处于AbstractQueuedSynchronizer类当中,其中AbstractQueuedSynchronizer包含state变量标记是否处于锁状态
public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

    protected AbstractQueuedSynchronizer() { }

    /**
     *      +------+  prev +-----+       +-----+
     * head |      | <---- |     | <---- |     |  tail
     *      +------+       +-----+       +-----+
     */
    static final class Node {

        static final Node SHARED = new Node();

        static final Node EXCLUSIVE = null;
        // 当前线程已被取消
        static final int CANCELLED =  1;
        // “当前线程的后继线程需要被unpark(唤醒)”。
        // 一般发生情况是:当前线程的后继线程处于阻塞状态,
        // 而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
        static final int SIGNAL    = -1;
        // 当前线程(处在Condition休眠状态)在等待Condition唤醒
        static final int CONDITION = -2;
        // (共享锁)其它线程获取到“共享锁”,状态为0表示当前线程不属于上面的任何一种状态。
        static final int PROPAGATE = -3;
        
        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;

        volatile Thread thread;

        Node nextWaiter;

        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    private transient volatile Node head;

    private transient volatile Node tail;

    private volatile int state;
}


解锁过程

release过程

  • 1、通过tryRelease()方法尝试让当前线程释放锁对象
  • 2、通过unparkSuccessor()方法设置当前节点状态ws=0并且唤醒CLH队列中的下一个等待线程
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease过程

  • 1、如果占用锁线程非当前线程直接抛异常
  • 2、递减锁计数后如果值为0那么就释放当前锁占用者
  • 3、更新锁状态为未占用,即state为0
     protected final boolean tryRelease(int releases) {
        int c = getState() - releases;
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
         boolean free = false;
         if (c == 0) {
            free = true;
            setExclusiveOwnerThread(null);
         }
        setState(c);
        return free;
    }

unparkSuccessor过程
1、设置当前Node状态为0
2、寻找下一个等待线程节点来唤醒等待线程并通过LockSupport.unpark()唤醒线程
3、寻找下一个等待线程,如果当前Node的下一个节点符合状态就直接进行唤醒,否则从队尾开始进行倒序查找,找到最优先的线程进行唤醒。

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            // 找到状态<0的线程进行唤醒
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }

        if (s != null)
            LockSupport.unpark(s.thread);
    }


##参考文章
Java多线程:AQS源码分析

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容