同步实现原理:队列同步器

1. 队列同步器介绍


  • 队列同步器AbstractQueuedSynchronizer,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

  • 同步器的主要使用方式是继承,一般作为同步器组件的静态内部类,在同步器中仅定义了与状态相关的方法,且状态既可以独占获取也可以共享获取,这样就可以实现不同的同步组件(ReetrantLock、CountDownLatch等)。同步组件利用同步器进行锁的实现,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等顶层操作。

  • 同步器的设计是基于模板方法模式的,使用者需要继承同步器并重写指定的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些模板方法将会调用使用者重写的方法。

  • 同步器提供了3个方法来修改和访问同步状态:

1、getState():获取当前同步状态
2、setState(int newState):设置当前同步状态
3、compareAndSetState(int expect, int update):使用CAS设置当前状态,该方法能保证操作的原子性。

  • 同步器可以重写的方法
方法名称 描述
tryAcquire(int arg) 独占获取同步状态,实现该方法需要查询当前状态,并判断同步状态是否符合预期状态,然后再进行CAS设置同步状态。
treRelease(int arg) 独占式释放同步状态,等待获取同步状态的线程将有机会获取同步状态
tryAcquireShared(int arg) 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之失败
tryReleaseShared(int arg) 共享式释放同步状态
isHeldExclusively() 当前同步器是否在独占模式下被线程占用,一般该方法表示是否被当前线程所独占
  • 同步器的模板方法基本上分为3类:
  1. 独占式获取与释放同步状态
  2. 共享式获取与释放同步状态
  3. 查询同步队列中的等待线程情况

2. 队列同步器示例


  • 只有掌握了同步器的工作原理才能更加深入的理解并发包中其他的并发组件,下面通过一个独占锁的示例来深入了解一下同步器的工作原理
  • 独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能获取锁。
public class Mutex implements Lock, java.io.Serializable {
    // 内部类,自定义同步器
    private static class Sync extends AbstractQueuedSynchronizer {
      // 是否处于占用状态
      protected boolean isHeldExclusively() {
        return getState() == 1;
      }
      // 当状态为0的时候获取锁
      public boolean tryAcquire(int acquires) {
        assert acquires == 1; // Otherwise unused
        if (compareAndSetState(0, 1)) {
           setExclusiveOwnerThread(Thread.currentThread());
           return true;
        }
        return false;
      }
      // 释放锁,将状态设置为0
      protected boolean tryRelease(int releases) {
        assert releases == 1; // Otherwise unused
        if (getState() == 0) throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
      }
      // 返回一个Condition,每个condition都包含了一个condition队列
      Condition newCondition() { return new ConditionObject(); }
    }
    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();
    public void lock()                { sync.acquire(1); }
    public boolean tryLock()          { return sync.tryAcquire(1); }
    public void unlock()              { sync.release(1); }
    public Condition newCondition()   { return sync.newCondition(); }
    public boolean isLocked()         { return sync.isHeldExclusively(); }
    public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
    public void lockInterruptibly() throws InterruptedException {
      sync.acquireInterruptibly(1);
    }
    public boolean tryLock(long timeout, TimeUnit unit)
        throws InterruptedException {
      return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
  }
  • 可以看到Mutex将Lock接口均代理给了同步器的实现。
  • 使用方将Mutex构造出来之后,调用lock获取锁,调用unlock进行解锁。
  • 用户使用Mutex时并不会直接和内部同步器的实现打交道,而是通过调用Mutex提供的方法。

3. 队列同步器实现分析(独占方式)


1) 同步队列

  • 同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
  • 同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点。
  • 同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。

同步队列中节点源码如下:

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer  
    implements java.io.Serializable {  
    ......  
    private transient volatile Node head;//头节点  
    private transient volatile Node tail;//尾节点  
    private volatile int state;//*同步状态*  
    ......  
    static final class Node {  
        volatile int waitStatus;//等待状态  
        volatile Node prev;//前驱  
        volatile Node next;//后继  
        volatile Thread thread;//线程引用  
        ......  
    }  
    ......  
}   

同步队列基本结构图如下:


同步队列基本结构

同步队列涉及两个动作:

  1. 节点加入到同步队列中
  • 线程加入队列的过程必须保证线程安全,同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),保证线程安全。
  • 为什么必须保证线程安全:同时有多条线程没有获取同步状态要加入同步队列,这时如果不是线程安全的,请问谁先谁后呢?所以在此处的这个操作必须是线程安全的

添加过程如下图所示


节点加入到同步队列中
  1. 设置首节点
  • 同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后续节点,而后续节点将会在获取同步状态成功时将自己设置为首节点。
  • 设置首节点是由获取同步状态成功的线程来完成的,由于只有一个线程能够成功的获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点后继节点,并断开首节点的next引用即可。

首节点设置过程如下图所示:


设置首节点

2) 独占式同步状态获取与释放

  • 通过同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是说由于线程获取同步状态失败后进入同步队列中,后继对线程进行中断操作时,线程不会从同步队列移除。acquire方法:
public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
}

上述代码中完成了同步状态的获取、节点构造、加入同步队列以及同步队列中自旋等待的相关工作。

主要逻辑:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node)方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

节点的构造以及加入同步队列依靠于addWaiter和enq方法:

//添加到同步队列中
 private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

  /**
     * Inserts node into queue, initializing if necessary. See picture above.
     * @param node the node to insert
     * @return node's predecessor
        节点插入到尾节点:将并发添加节点的请求通过CAS变得”串行化“了。
     */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

在 acquireQueued 方法中,当前线程在”死循环“中尝试获取同步状态,而只有前驱节点是头结点才能够尝试获取同步状态。原因如下:

  • 头节点是成功获取到同步状态的节点,而头结点的线程线程释放了同步状态后,将会唤醒其后继节点,后继节点的线程在被唤醒后需要检查自己的前驱节点是否是头结点。
  • 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态。
/**
     * Acquires in exclusive uninterruptible mode for thread already in
     * queue. Used by condition wait methods as well as acquire.
     *
     * @param node the node
     * @param arg the acquire argument
     * @return {@code true} if interrupted while waiting
     */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
  • 节点自选同步状态如下图


    节点自旋获取同步状态
  • 独占同步状态获取流程,也就是acquire方法调用流程,如下图


    独占式同步状态获取流程
  • 同步状态释放:当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的release(int arg)方法可以释放同步状态,该方法在释放了同步状态之后,会"唤醒"其后继节点(进而使后继节点重新尝试获取同步状态)。
public final boolean release(int arg) {  
    if (tryRelease(arg)) {//释放同步状态  
        Node h = head;  
        if (h != null && h.waitStatus != 0)//独占模式下这里表示SIGNAL  
            unparkSuccessor(h);//唤醒后继节点  
        return true;  
    }  
    return false;  
} 
private void unparkSuccessor(Node node) {  
    int ws = node.waitStatus;//获取当前节点等待状态  
    if (ws < 0)  
        compareAndSetWaitStatus(node, ws, 0);//更新等待状态  
  
    /* Thread to unpark is held in successor, which is normally just the next node.   
       But if cancelled or apparently null, 
     * traverse backwards from tail to find the actual non-cancelled successor.*/  
    Node s = node.next;  
    if (s == null || s.waitStatus > 0) {//找到第一个没有被取消的后继节点(等待状态为SIGNAL)  
        s = null;  
        for (Node t = tail; t != null && t != node; t = t.prev)  
            if (t.waitStatus <= 0)  
                s = t;  
    }  
    if (s != null)  
        LockSupport.unpark(s.thread);//唤醒后继线程  
}  
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,294评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,493评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,790评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,595评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,718评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,906评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,053评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,797评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,250评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,570评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,711评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,388评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,018评论 3 316
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,796评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,023评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,461评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,595评论 2 350

推荐阅读更多精彩内容