源码分析之AbstractQueuedSynchronizer

java.util.concurrent包中,大部分的同步器都是基于AbstractQueuedSynchronizer(AQS)这个框架实现的。这个框架为同步状态提供原子性管理、线程的阻塞和解除阻塞以及排队提供了一种通用机制。

同步器一般包含2种方法,一种是acquire,另一种是releaseacquire操作阻塞线程,获取锁。release通过某种方式改变让被acquire阻塞的线程继续执行,释放锁。为了实现这2种操作,需要以下3个基本组件的相互协作:

  • 同步状态的原子性管理
  • 线程的阻塞和解除阻塞
  • 队列管理

同步状态

    /**
     * The synchronization state.
     */
    private volatile int state;

AQS使用一个int变量来保存同步状态,并暴露出getStatesetState以及compareAndSet来读取或更新这个状态。并且用了volatile来修饰,保证了在多线程环境下的可见性。通过使用compare-and-swap(CAS)指令来实现compareAndSetState

这里的同步状态用int而非long,主要是因为64位long字段的原子性操作在很多平台上是使用内部锁的方式来模拟实现的,这会使得同步器的会有性能问题。绝对多数int型的state足够我们使用,但JDK也提供了longstate的实现:java.util.concurrent.locks.AbstractQueuedLongSynchronizer

阻塞

JDK1.5之前,阻塞线程和解除线程阻塞都是基于Java自身的监控器。在AQS中实现阻塞是用java.util.concurrent包的LockSuport类。方法LockSupport.park阻塞当前线程,直到有个LockSupport.unpark方法被调用。

队列管理

AQS框架关键就在于如何管理被阻塞的线程队列。提供了2个队列,分别是线程安全Sync Queue(CLH Queue)、普通的Condition Queue

Sync Queue

Sync Queue是基于FIFO的队列,用于构建锁或者其他相关同步装置。CLH锁可以更容易地去实现取消(cancellation)超时功能,因此我们选择了CLH锁作为实现的基础。

队列中的元素Node是保存线程的引用和线程状态。NodeAQS的一个静态内部类:

 static final class Node {
        static final Node SHARED = new Node();
        static final Node EXCLUSIVE = null;
        static final int CANCELLED =  1;
        static final int SIGNAL    = -1;
        static final int CONDITION = -2;
        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        Node nextWaiter;
    }

Node类的成员变量如上所示,主要负责保存线程引用、队列的前继和后继节点,以及同步状态:

成员 描述
waitStatus 用来标记Node的状态:
CANCELLED:1, 表示当前线程已经被取消
SIGNAL:-1,表示当前节点的后继节点等待运行
CONDITION:-2, 表示当前节点已被加入Condition Queue
PROPAGATE:-3, 共享锁的最终状态是PROPAGATE
thread 当前获取lock的线程
SHARED 表示节点是共享模式
EXCLUSIVE 表示节点是独占模式
prev 前继节点
next 后继节点
nextWaiter 存储Condition Queue中的后继节点

Node元素是Sync Queue构建的基础。当获取锁的时候,请求形成节点挂载在尾部。而锁资源的释放再获取的过程是从开始向后进行的。

acquire 获取锁

AQS自身仅定义了类似acquire方法。在实现锁的时候,一般会实现一个继承AQS的内部类Sync。而在Sync类中,我们根据需求来实现重写tryAcquire方法和tryRelease方法。独占锁acquire方法如下:

    public final void acquire(int arg) {
  
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  • 通过tryAcquire(由不同的实现类实现)尝试获取锁,如果可以获取锁直接返回。获取不到锁,则调用addWaiter方法;
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
    
        private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  • addWaiter方法作用是把当前线程封装成Node节点,通过CAS操作快速尝试挂载至队列尾部。
    • 如果tail节点t已经有了:将t节点更新为当前节点node的前继节点node.prev,将t.next更新为当前节点node
    • 如果tail节点添加失败:
      • 如果tail节点为空,那么原子化的分配一个头节点,并将尾节点指向头节点,这一步是初始化;
      • 如果tail节点不为空,循环重复addWaiter方法的工作直至当前节点入队为止。
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 节点加入Sync Queue之后,接下来就是要进行锁的获取,或者说是访问控制了,只有一个线程能够在同一时刻继续的运行,而其他的进入等待状态。
    • 获取当前节点的前继节点
    • 当前继节点是头结点并且能够获取状态,代表该当前节点占有锁;如果满足上述条件,那么代表能够占有锁,根据节点对锁占有的含义,设置头结点为当前节点。
    • 否则进入等待状态。

至此,可以总结一次acquire的过程大致为:

release 释放锁
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
  • 首先通过CAS操作变更同步状态state
  • 释放成功后,通过LockSupport.unpark方法来唤醒后继节点,后继节点继续获取锁。

Condition Queue

AQS框架提供了一个ConditionObject内部类,给维护独占同步的类以及实现Lock接口的类使用。一个锁对象可以关联任意数目的条件对象,可以提供典型的Java监视器风格的awaitsignalsignalAll操作,包括带有超时的,以及一些检测、监控的方法。Condition Queue是普通的队列并不要求是线程安全,原因是在线程在操作Condition时,要求线程必须独占锁,不需要考虑并发的问题。

Condition Queue也是以Node为基础的队列。

        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
await操作
  • Condition在执行await操作时,首先会调用addConditionWaiter()方法将当前线程封装的Node节点加入到wait queue
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

上述addConditionWaiter的逻辑是:

  • 首先清除Condition Queue队列中cancelled状态的尾节点;
  • Condition Queue队列为空,封装当前线程的node节点为Condition QueuefirstWaiter。如Condition Queue队列不为空,则把该节点加至队列尾部。
 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        
  • 加入Condition queue之后,要释放当前线程获取的所有的锁;
  • 如果线程没有在Sync Queue中,将调用LockSupport.park阻塞当前线程,直到signalled或者interrupted唤醒去获取锁。
single 操作
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }
  • 首先检查线程是否独占锁;
  • 获取Codition QueuefirstWaiter,将节点转移至Sync Queue中去。
singleAll 操作
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

signalAll唤醒Condition Queue的所有等待线程,将所有的Condition Queue中的node元素转移至Sync Queue中去。

其他API

这里只介绍了独占锁模式下,普通acquirerelease方法的原理,AQS还提供了很多可以供我们选择的API

  • 如优先考虑中断、超时的:acquireInterruptiblytryAcquireNanos
  • 如共享锁模式下acquireSharedreleaseShared

等等...

这里暂时不详细分析,后面有时间的话可以再做了解。

参考:

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容