ReentrantLock 与 AQS 独占

ReentrangLock与synchronized对比:

ReentrantLock:

  • 依赖AQS实现
  • 支持响应中断,超时
  • 需要显式调用unlock解锁
  • 支持公平锁 非公平锁
  • 支持多个条件队列

Synchronized:

  • jvm内部实现
  • 自动释放锁,不需要手动调用
  • 只支持非公平锁
  • 支持一个条件队列

公平锁与非公平锁

ReentrantLock 支持公平与非公平两种方式,通过构造函数中的boolean参数,即可选择不同的类型,内部通过Sync来做加锁解锁操作,而Sync就是AQS的一个实现。

    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

FairSync

lock 方法:

// ReentrantLock.java - FairSync
    final void lock() {
        acquire(1);//直接调用acquire方法 参数可以理解为要获取的锁
    }

// AbstractQueuedSynchronizer.java
    public final void acquire(int arg) {// acquire方法位于AQS中,是一个模版方法
         //这里主要分为三步骤
        // 1 tryAcquire尝试拿锁
        // 2 tryAcquire 拿锁失败 则通过addWaiter进行Node创建以及入队操作
        // 3 通过acquireQueued方法 尝试拿锁,拿不到锁的时候 阻塞自己
        if (!tryAcquire(arg) && //针对不同的模式 有不同的traAcquire实现
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            // acquireQueued会返回是否中断标示,因为内部会擦除中断标志位,
            // 所以这里重新设置下,方便用户捕获中断并执行对应操作
            selfInterrupt(); 
    }

// ReentrantLock.java - FairSync
    protected final boolean tryAcquire(int acquires) { // 公平锁的拿锁操作
        final Thread current = Thread.currentThread(); // 获取当前线程
        int c = getState(); // 获取当前状态
        if (c == 0) { // 0则没有线程上锁
            if (!hasQueuedPredecessors() && // 队列中没有先驱
                    compareAndSetState(0, acquires)) { // 设置状态位(拿锁)
                // 拿锁成功
                setExclusiveOwnerThread(current); // 设置独占线程为当前线程
                return true; 
            }
        // 已经被上锁 但是当前线程是上锁的线程(重入)
        } else if (current == getExclusiveOwnerThread()) { 
            int nextc = c + acquires; // state 进一步加1
            if (nextc < 0)
                throw new Error("Maximum lock count exceeded");
            setState(nextc); // 这里不需要通过cas操作了 因为都是一个线程
            return true;
        }
        return false;
    }

unlock方法:

// ReentrantLock.java - FairSync
    public void unlock() {
        sync.release(1); // 借助sync的release方法完成释放锁的操作
    }

// AbstractQueuedSynchronizer.java
    public final boolean release(int arg) { // AQS中的模版方法
        if (tryRelease(arg)) { // 释放锁
            Node h = head; // 拿到AQS队列头节点
            if (h != null && h.waitStatus != 0) // 头节点不为空并且状态不为0(初始状态)
                unparkSuccessor(h); // 唤醒后继节点(后继节点拿锁)
            return true;
        }
        return false;
    }

// ReentrantLock.java - FairSync
    protected final boolean tryRelease(int releases) { // 释放锁操作
        int c = getState() - releases; // 重入数量减1
         // 如果当前线程不是记录的独占的线程 则抛出异常
        if (Thread.currentThread() != getExclusiveOwnerThread())
            throw new IllegalMonitorStateException();
        boolean free = false;
        if (c == 0) { // 所有加的锁都被释放(如:lock 10次 也要unlock 10次)
            free = true;
            setExclusiveOwnerThread(null); // 设置独占线程为null
        }
        setState(c); // 修改状态
        return free;
    }

可以看到 ,整个流程通过state字段表示当前线程加锁的次数,state为0的时候就是没有线程上锁,通过exclusiveOwnerThread表示当前拿到锁的唯一的线程(因为这里是互斥锁),通过state+n支持重入操作。

NonfairSync

NonfairSync 与FairSync在整个加锁流程上并没有什么本质上的区别,唯一的区别就是,加锁的时候,不先去判断队列是否有等待的线程,而是直接尝试拿锁,拿不到锁才进行入队操作。
lock 方法:

   final void lock() {
       // 主要区别:进来直接尝试加锁
       if (compareAndSetState(0, 1))
           setExclusiveOwnerThread(Thread.currentThread());
       else
           acquire(1);
   }

   final boolean nonfairTryAcquire(int acquires) {
       final Thread current = Thread.currentThread();
       int c = getState();
       if (c == 0) {
           // 主要区别:没有执行hasQueuedPredecessors方法判断队列是否有值
           if (compareAndSetState(0, acquires)) {
               setExclusiveOwnerThread(current);
               return true;
           }
       }
       else if (current == getExclusiveOwnerThread()) {
           int nextc = c + acquires;
           if (nextc < 0) // overflow
               throw new Error("Maximum lock count exceeded");
           setState(nextc);
           return true;
       }
       return false;
   }

上面主要讲了ReentrantLock 加锁与解锁的一些流程,还有一些涉及AQS操作的方法我们没有深入,下面来深入看一下这些方法:

AQS内部维护了一个双向链表,用来记录所有等待的线程,其链表节点就是Node:

    static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate
         */
        static final int PROPAGATE = -3;
        // 节点的状态 枚举就是上面这些 这里的状态其实记录的是应该对下一个节点中的线程采取的措施
        // 比如如果该节点状态为-1 那么下一个节点中的线程就是可被唤醒的)
        volatile int waitStatus; 
        volatile Node prev; // 上一个节点
        volatile Node next; // 下一个节点
        volatile Thread thread; // 正在等待的线程
        Node nextWaiter; // 为condition准备
    }

然后是 上面acquire方法中的几个关键方法:
首先会创建节点并将节点入队:
addWaiter:

  public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           // 这里调用addWaiter会传入节点模式 这里可以看到是EXCLUSIVE模式
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
           selfInterrupt();
   }

   private Node addWaiter(Node mode) {
       Node node = new Node(Thread.currentThread(), mode); // 创建一个对应模式的node
       // Try the fast path of enq; backup to full enq on failure
       Node pred = tail;
       if (pred != null) { // 如果队列中已经有节点 则先尝试下入队 否则直接直接enq()
           node.prev = pred; // 节点入队
           if (compareAndSetTail(pred, node)) { // 可能别的线程也在尝试入队 所以这里用cas
               pred.next = node;
               return node; // 如果cas成功 世界返回
           }
       }
       enq(node); // 如果上面入队失败 这里开始自旋入队
       return node;
   }

   private Node enq(final Node node) {
       for (;;) {
           Node t = tail;
           if (t == null) { // Must initialize 队列必须被初始化
               if (compareAndSetHead(new Node())) // 设置头节点为空节点 (链表基本操作)
                   tail = head;
           } else { // 本节点入队
               node.prev = t;
               if (compareAndSetTail(t, node)) {
                   t.next = node;
                   return t;
               }
           }
       }
   }

然后会通过acquireQueued方法 执行抢锁或者阻塞逻辑:

  public final void acquire(int arg) {
       if (!tryAcquire(arg) &&
           // 通过addWaiter拿到当前节点后 执行acquireQueued
           acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
           selfInterrupt();
   }

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false; // 标识该线程是否被阻塞
            for (;;) {
                final Node p = node.predecessor(); // 拿到上一个节点
                // 上一个节点是头节点 就可以进行加锁操作了
                if (p == head && tryAcquire(arg)) { 
                    // 加锁成功则设置当前节点为头节点 并清除记录的线程
                    setHead(node); 
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                // 这里拿锁失败则通过shouldParkAfterFailedAcquire判读是否要park阻塞线程 
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt()) 
                    // 走到这里说明 当前节点有被中断过 所以将标示返出去 方便外界补操作(再次中断)
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    // 判断前驱节点的状态来决定 当前线程是否应该被park
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL) // 前驱节点为SIGNAL 则线程可以被park
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) { // ws大于0 则该节点已被废弃
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {  // 更新前驱节点状态为SIGNAL
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    private final boolean parkAndCheckInterrupt() { // 挂起当前线程
        LockSupport.park(this);
        return Thread.interrupted(); // 将当前线程是否被中断return出去 
    }

整个AQS等待队列大概就这样,不过这里只分析了独占的状态,还有一些放在后面的文章分析。

后记

        Thread t1 = new Thread(() -> {
            while (true){
                System.out.println("prepare");
                LockSupport.park();
                System.out.println("continue");
            }
        });
        t1.start();


        Thread.sleep(1000);
//        LockSupport.unpark(t1);
        t1.interrupt();
        t1.join();


输出:
prepare
continue
prepare
continue
prepare
continue
prepare
continue
prepare
continue
prepare
continue
...

interrupt后线程会记录标识,后面不会被park了 除非调用 Thread.interrupted()擦除标识位:

        Thread t1 = new Thread(() -> {
            while (true){
                System.out.println("prepare");
                LockSupport.park();
                System.out.println("continue");
                Thread.interrupted();
            }
        });
        t1.start();


        Thread.sleep(1000);
//        LockSupport.unpark(t1);
        t1.interrupt();
        t1.join();


输出:
prepare
continue
prepare

可以看到 这样就会再次park了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,616评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,020评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,078评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,040评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,154评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,265评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,298评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,072评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,491评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,795评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,970评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,654评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,272评论 3 318
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,985评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,223评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,815评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,852评论 2 351

推荐阅读更多精彩内容