(转载)Java并发编程之AbstractQueuedSynchronizer

节点删除原文链接:https://www.jianshu.com/p/df0d7d6571de

参考资料:https://www.cnblogs.com/sheeva/p/6472949.html

引言

AbstractQueuedSynchronizer,队列同步器,简称AQS,它是java并发用来构建锁或者其他同步组件的基础框架。

一般使用AQS的主要方式是继承,子类通过实现它提供的抽象方法来管理同步状态,主要管理的方式是通过tryAcquire和tryRelease类似的方法来操作状态,同时,AQS提供以下线程安全的方法来对状态进行操作:

protected final int getState();

protected final void setState(int newState);

protected final boolean compareAndSetState(intexpect,intupdate);

AQS本身是没有实现任何同步接口的,它仅仅只是定义了同步状态的获取和释放的方法来供自定义的同步组件的使用。

注:AQS主要是怎么使用的呢?

在java的同步组件中,AQS的子类一般是同步组件的静态内部类。

AQS是实现同步组件的关键,它俩的关系可以这样描述:同步组件是面向使用者的,它定义了使用者与组件交互的接口,隐藏了具体的实现细节;而AQS面向的是同步组件的实现者,它简化了具体的实现方式,屏蔽了线程切换相关底层操作,它们俩一起很好的对使用者和实现者所关注的领域做了一个隔离。

AQS实现分析

接下来将从实现的角度来具体分析AQS是如何来完成线程同步的。

同步队列分析

AQS的实现依赖内部的同步队列(FIFO双向队列)来完成同步状态的管理,假如当前线程获取同步状态失败,AQS会将该线程以及等待状态等信息构造成一个Node,并将其加入同步队列,同时阻塞当前线程。当同步状态释放时,唤醒队列的首节点。

Node

Node主要包含以下成员变量:

static final class Node{

    volatile int waitStatus;

    volatile Node prev;

    volatile Node next;

    volatile Thread thread;

    Node nextWaiter;

}

waitStatus:节点状态,主要有这几种状态:

Node节点状态

1.```CANCELLED```:当前线程被取消;

2.```SIGNAL```:当前节点的后继节点需要运行;

3.```CONDITION```:当前节点在等待condition;

4.```PROPAGATE```:当前场景下后续的acquireShared可以执行;

5.```0```:当前节点在sync队列中等待获取锁。

prev:前驱节点;

next:后继节点;

thread:进入队列的当前线程;

nextWaiter:存储condition队列中的后继节点。

Node是sync队列和condition队列构建的基础,AQS拥有三个成员变量:

AQS成员变量

对于锁的获取,请求形成节点将其挂在队列尾部,至于资源的转移,是从头到尾进行,队列的基本结构就出来了:

AQS同步队列结构

1.节点插入

AQS提供基于CAS的设置尾节点的方法:

CAS设置尾节点

2.节点删除

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态之后将会唤醒后继节点,后继节点将会在获取同步状态成功的时候将自己设置为首节点。

注:设置首节点是由获取同步状态成功的线程来完成,因为每次只会有一个线程能够成功的获取到同步状态,所以,设置首节点并不需要CAS来保证。

AQS源码解析

AQS提供以下接口以供实现自定义同步器:

1.protected boolean tryAcquire(int arg)

独占式获取同步状态,该方法的实现需要先查询当前的同步状态是否可以获取,如果可以获取再进行获取;

2.protected boolean tryRelease(int arg)

释放状态;

3.protected int tryAcquireShared(int arg)

共享式获取同步状态;

4.protected boolean tryReleaseShared(int arg)

共享式释放状态;

5.protected boolean isHeldExclusively()

独占模式下,判断同步状态是否已经被占用。

使用者可以根据实际情况使用这些接口自定义同步组件。

AQS提供两种方式来操作同步状态,独占式与共享式,下面就针对性做一下源码分析。

独占式同步状态获取 - acquire实现

独占式同步状态获取

具体执行流程如下:

调用tryAcquire方法尝试获取同步状态;

如果获取不到同步状态,将当前线程构造成节点Node并加入同步队列;

再次尝试获取,如果还是没有获取到那么将当前线程从线程调度器上摘下,进入等待状态。

下面我们具体来看一下节点的构造以及加入同步队列部分的代码实现。

addWaiter实现

addWaiter实现

使用当前thread构造Node;

尝试在队尾插入节点,如果尾节点已经存在,就做以下操作:

- 分配引用T指向尾节点;

- 将待插入节点的prev指针指向尾节点;

- 如果尾节点还为T,将当前尾节点设置为带待插入节点;

- T的next指针指向待插入节点。

快速在队尾插入节点,失败则进入enq(Node node)方法。

enq实现

enq的逻辑可以确保Node可以有顺序的添加到同步队列中,具体的加入队列的逻辑如下:

初始化同步队列:如果尾节点为空,分配一个头结点,并将尾节点指向头结点;

节点入队,通过CAS将节点设置为尾节点,以此在队尾做节点插入。

可以看出,整个enq方法通过“死循环”来保证节点的正确插入。

进入同步队列之后接下来就是同步状态的获取了,或者说是访问控制acquireQueued。对于同步队列中的线程,在同一时刻只能由队列首节点获取同步状态,其他的线程进入等待,直到符合条件才能继续进行。

acquireQueued实现

获取当前节点的前驱节点;

如果当前节点的前驱节点是头节点,并且可以获取同步状态,设置当前节点为头结点,该节点占有锁;

不满足条件的线程进入等待状态。

在整个方法中,当前线程一直都在“死循环”中尝试获取同步状态:

节点自旋获取同步状态

从代码的逻辑也可以看出,其实在节点与节点之间在循环检查的过程中是不会相互通信的,仅仅只是判断自己当前的前驱是不是头结点,这样设计使得节点的释放符合FIFO,同时也避免了过早通知。

注:过早通知是指前驱节点不是头结点的线程由于中断被唤醒。

public final boolean release(int arg) {

    if (tryRelease(arg)) {

        Node h =head;

        if (h !=null && h.waitStatus !=0)

            unparkSuccessor(h);

        return true;

    }

    return false;

}

release方法中,若能够释放成功,则会通过unparkSuccessor()方法来重新设置头节点与其下一个节点的唤醒触发。

/**

* Wakes up node's successor, if one exists.

*

* @param node the node

*/

private void unparkSuccessor(Node node) {

/*

* If status is negative (i.e., possibly needing signal) try

* to clear in anticipation of signalling.  It is OK if this

* fails or if status is changed by waiting thread.

*/

    int ws = node.waitStatus;

if (ws <0)

compareAndSetWaitStatus(node, ws,0);

/*

* Thread to unpark is held in successor, which is normally

* just the next node.  But if cancelled or apparently null,

* traverse backwards from tail to find the actual

* non-cancelled successor.

*/

    Node s = node.next;

if (s ==null || s.waitStatus >0) {

s =null;

for (Node t =tail; t !=null && t != node; t = t.prev)

if (t.waitStatus <=0)

s = t;

}

if (s !=null)

LockSupport.unpark(s.thread);

}

最终,执行流程又回到了acquireQueued方法的自旋中,此时的头节点已经是release了锁,所以下一节点能够获取成功。(下文会详述)

acquire实现总结

1.同步状态维护:

对同步状态的操作是原子、非阻塞的,通过AQS提供的对状态访问的方法来对同步状态进行操作,并且利用CAS来确保原子操作;

2.状态获取:

一旦线程成功的修改了同步状态,那么该线程会被设置为同步队列的头节点;

3.同步队列维护:

不符合获取同步状态的线程会进入等待状态,直到符合条件被唤醒再开始执行。

整个执行流程如下:

当前线程获取同步状态并执行了相应的逻辑之后,就需要释放同步状态,让后续节点可以获取到同步状态,调用方法release(int arg)方法可以释放同步状态。

独占式同步状态释放 - release实现

1.尝试释放状态,tryRelease保证将状态重置回去,同样采用CAS来保证操作的原子性;

2.释放成功后,调用unparkSuccessor唤醒当前节点的后继节点线程。

unparkSuccessor实现

取出当前节点的next节点,将该节点线程唤醒,被唤醒的线程获取同步状态。这里主要通过LockSupport的unpark方法唤醒线程。

共享式同步状态获取

共享式获取与独占式获取最主要的区别就是在同一时刻能否有多个线程可以同时获取到同步状态。这两种不同的方式在获取资源区别如下图所示:

共享式访问资源时,其他共享式访问都是被允许的;

独占式访问资源时,在同一时刻只能有一个访问,其他的访问都被阻塞。

AQS提供acquireShared方法来支持共享式获取同步状态。

acquireShared实现

调用tryAcquireShared(int arg)方法尝试获取同步状态:

tryAcquireShared方法返回值 > 0时,表示能够获取到同步状态;

获取失败调用doAcquireShared(int arg)方法进入同步队列。

doAcquireShared实现

1.获取当前节点的前驱节点;

2.如果当前节点的前驱节点是头结点,并且获取到的共享同步状态 > 0,设置当前节点的为头结点,获取同步状态成功;

3.不满足条件的线程自旋等待。

与独占式获取同步状态一样,共享式获取也是需要释放同步状态的,AQS提供releaseShared(int arg)方法可以释放同步状态。

共享式同步状态释放 - releaseShared实现

调用tryReleaseShared方法释放状态;

调用doReleaseShared方法唤醒后继节点;

独占式超时获取 - doAcquireNanos

该方法提供了超时获取同步状态调用,假如在指定的时间段内可以获取到同步状态返回true,否则返回false。它是acquireInterruptibly(int arg)的增强版。

acquireInterruptibly实现

该方法提供了获取同步状态的能力,同样,在无法获取同步状态时会进入同步队列,这类似于acquire的功能,但是它和acquire还是区别的:acquireInterruptibly可以在外界对当前线程进行中断的时候可以提前获取到同步状态的操作,换个通俗易懂的解释吧:类似于synchronized获取锁时,这时候外界对当前线程中断了,线程获取锁的这个操作能够及时响应中断并且提前返回。

判断当前线程是否被中断,如果已经被中断,抛出InterruptedException异常并将中断标志位置为false;

获取同步状态,获取成功并返回,获取不成功调用doAcquireInterruptibly(int arg)排队等待。

doAcquireInterruptibly实现

构造节点Node,加入同步队列;

假如当前节点是首节点并且可以获取到同步状态,将当前节点设置为头结点,其他节点自旋等待;

节点每次被唤醒的时候,需要进行中断检测,假如当前线程被中断,抛出异常InterruptedException,退出循环。

doAcquireNanos实现

该方法在支持中断响应的基础上,增加了超时获取的特性。针对超时获取,主要在于计算出需要睡眠的时间间隔nanosTimeout,如果nanosTimeout > 0表示当前线程还需要睡眠,反之返回false。

nanosTimeout <= 0,表明当前线程不需要睡眠,返回false,不能获取到同步状态;

不满足条件的线程加入同步队列;

假如当前节点是首节点,并且可以获取到同步状态,将当前节点设置为头结点并退出,返回true,表明在指定的时间内可以获取到同步状态;

不满足条件3的线程,计算出当前休眠时间,nanosTimeout = 原有nanosTimeout + deadline(睡眠之前记录的时间)- now(System.nanoTime():当前时间):

如果nanosTimeout <= 0,返回超时未获取到同步状态;

如果nanosTimeout > 0 && nanosTimeout <= 1000L,线程快速自旋

注:为什么不直接进入超时等待呢?原因在于非常短的超时等待是无法做到十分精确的,如果这时候再进入超时等待会让nanosTimeout的超时从整体上表现的不精确,所以,在超时非常短的情况下,AQS都会无条件进入快速自旋;

- 如果nanosTimeout > 1000L,线程通过LockSupport.parkNanos进入超时等待。

整个流程可以总结如下图所示:


后记

在上述对AQS进行了实现层面的分析之后,我们就一个案例来加深对AQS的理解。

案例:设计一个AQS同步器,该工具在同一时刻,只能有两个线程能够访问,其他的线程阻塞。

设计分析:针对上述案例,我们可以这样定义AQS,设定一个初始状态为2,每一个线程获取一次就-1,正确的状态为:0,1,2,0表示新的线程获取同步状态时阻塞。由于资源数量大与1,需要实现tryAcquireShared和tryReleaseShared方法。

代码实现:

public class LockInstance implements Lock {

    private final Sync sync = new Sync(2);

    private static final class Sync extends AbstractQueuedSynchronizer {

      Sync(int state) {

            if (state <= 0) {

                throw new IllegalArgumentException("count must large than 0");

            }

            setState(state);

        }

        @Override

        public int tryAcquireShared(int arg) {

            for (;;) {

                System.out.println("try acquire....");

                int current = getState();

                int now = current - arg;

                if (now < 0 || compareAndSetState(current, now)) {

                    return now;

                }

            }

        }

        @Override

        public boolean tryReleaseShared(int arg) {

            for(;;) {

                System.out.println("try release....");

                int current = getState();

                int now = current + arg;

                if (compareAndSetState(current, now)) {

                    return true;

                }

            }

        }

    }

    @Override

    public void lock() {

        sync.acquireShared(1);

    }

    @Override

    public void lockInterruptibly() throws InterruptedException {

        sync.acquireInterruptibly(1);

    }

    @Override

    public boolean tryLock() {

        return sync.tryAcquireShared(1) >= 0;

    }

    @Override

    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        return sync.tryAcquireSharedNanos(1, unit.toNanos(time));

    }

    @Override

    public void unlock() {

        sync.tryReleaseShared(1);

    }

    @Override

    public Condition newCondition() {

        return null;

    }

}

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容