Java AQS 源码解析

引言

AQS全称java.util.concurrent.locks.AbstractQueuedSynchronizer,是Java 并发包中的一个抽象类,我们一般把它叫做抽象队列同步器,如我们常用的可重入锁ReentrantLock的内部实现就是基于AQS,理解AQS的内部源码实现对于我们深入理解使用java并发包中的各个功能非常重要。

实现原理

AQS内部维护了一个双向链表队列来管理多个线程。简单介绍一下就是,当有一个新的线程去尝试获取锁,这是如果获取失败,AQS则会将此线程封装成一个Node节点,并将此节点加入到内部维护的队列中。

上面简单的描述了线程获取锁的过程,那么AQS是使用什么来确认当前线程是否可以尝试获取锁呢?

其实AQS内部为了一个volatile int state变量来表示当前资源是否已经被其他线程占有,对于独占锁来说,只有当state = 0时才说明该资源当前没有线程占有,后续线程可以开始争抢改资源,后续线程需要通过CAS操作来确保修改state变量成功才能成功抢占该资源。

image.png

重要方法

AQS内部提供了三组核心方法用来实现一个同步组件。三组方法从字面意思上区分可以分为两种类型:独占锁相关,共享锁相关

  1. 访问state有三种方法
  • getState() 获取同步状态

  • setState() 设置同步状态(非CAS操作)

  • compareAndSetState() 通过CAS操作修改

  1. 一组抽象方法,需要子类实现,主要是获取同步锁并判断是否成功的操作
方法 描述
boolean tryAcquire(int arg) 独占式获取同步状态
boolean tryRelease(int arg) 独占式释放同步状态
int tryAcquireShared(int arg) 共享式获取同步状态
boolean tryReleaseShared(int arg) 共享式释放同步状态
boolean isHeldExclusively() 检测当前线程是否获取独占锁
  1. 模板方法,用于子类直接调用
方法 描述
void acquire(int arg) 获取独占锁。会调用tryAcquire方法,如果未获取成功,则会进入同步队列等待
void acquireInterruptibly(int arg) 响应中断版的 acquire
boolean tryAcquireNanos(int arg,long nanos) 超时+响应中断版的 acquire
void acquireShared(int arg) 获取共享锁。共享式获取同步状态,同一时刻可能会有多个线程获得同步状态。比如读写锁的读锁就是就是调用这个方法获取同步状态的
void acquireSharedInterruptibly(int arg) 响应中断版的acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 超时+响应中断版的acquireShared
boolean release(int arg) 独占式释放同步状态
boolean releaseShared(int arg) 释放共享锁
Collection getQueuedThreads() 获取同步队列上的线程集合

源码分析

上文有提到,进入同步队列的线程会被封装成一个Node节点,下面我们先来看下Node.class


static  final  class Node {

 /**

 * 用于标记一个节点在共享模式下等待

 */

 static final Node SHARED = new Node();

 /**

 * 用于标记一个节点在独占模式下等待

 */

 static final Node EXCLUSIVE = null;

 /**

 * 等待状态:取消状态

 */

 static final int CANCELLED = 1;

 /**

 * 等待状态:通知(指示后继线程需要阻塞)

 */

 static final int SIGNAL = -1;

 /**

 * 等待状态:条件等待

 */

 static final int CONDITION = -2;

 /**

 * 等待状态:传播

 */

 static final int PROPAGATE = -3;

 /**

 * 等待状态

 */

 volatile int waitStatus;

 /**

 * 前驱节点

 */

 volatile Node prev;

 /**

 * 后继节点

 */

 volatile Node next;

 /**

 * 节点对应的线程

 */

 volatile Thread thread;

 /**

 * 等待队列中的后继节点

 */

 Node nextWaiter;

 /**

 * 当前节点是否处于共享模式等待

 */

 final boolean isShared() {

 return nextWaiter == SHARED;

 }

 /**

 * 获取前驱节点,如果为空的话抛出空指针异常

 */

 final Node predecessor() throws NullPointerException {

 Node p = prev;

 if (p == null) {

 throw new NullPointerException();

 } else {

 return p;

 }

 }

 Node() {

 }

 /**

 * addWaiter会调用此构造函数

 */

 Node(Thread thread, Node mode) {

 this.nextWaiter = mode;

 this.thread = thread;

 }

 /**

 * Condition会用到此构造函数

 */

 Node(Thread thread, int waitStatus) {

 this.waitStatus = waitStatus;

 this.thread = thread;

 }

}

这里解释几个重要的属性

属性 意义
prev 前置节点
next 后置节点
CANCELLED 状态值 表示节点被取消
SIGNAL 状态值 表示后置节点需要被阻塞
CONDITION 状态值 表示节点进入 CONDITION
PROPAGATE 状态值 共享锁传播状态

独占锁的获取

AQS中包含了headtail两个Node引用,其中head在逻辑上的含义是当前持有锁的线程,tail指示队尾节点。headtail可以相等


/**

* 获取独占锁,该方法会首先调用tryAcquire(由子类实现)来获取锁,

* 如果获取成功会直接返回,失败则会将当前抢锁失败的线程封装为Node节点加入队尾

 */

public final void acquire(int arg) {

 if (!tryAcquire(arg) &&

 acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

 selfInterrupt();

}

/**

* 增加一个

 *

 */

private Node addWaiter(Node mode) {

 Node node = new Node(Thread.currentThread(), mode);

  // Try the fast path of enq; backup to full enq on failure

  // 快速执行一次加入队列尾部的操作,如果失败的话则执行 enq

  // 获取队尾节点

 Node pred = tail;

 if (pred != null) {

  // 获取队尾的前驱节点

 node.prev = pred;

  // CAS设置新的队尾,有可能失败

 if (compareAndSetTail(pred, node)) {

 pred.next = node;

 return node;

 }

 }

  // 如果还没有队尾(初始状态),或则入队失败,则执行下面的方法

 enq(node);

 return node;

}

private Node enq(final Node node) {

  // 通过CAS 自旋设置新的队尾

 for (;;) {

 Node t = tail;

  // 队尾节点为空,说明是初始状态,必须初始化头尾节点

 if (t == null) { // Must initialize

 if (compareAndSetHead(new Node()))

 tail = head;

 } else {

 // 这里注意一个细节

  // node.prev = t 操作是设置当前节点的前驱节点为现在的队尾节点

  // 但是该操作没有放到compareAndSetTail操作成功以后,也就是说代码为何不这样写

  // if (compareAndSetTail(t, node)) {

  // node.prev = t;

  // t.next = node;

  // return t;

 // }

// 如果这样子写的话,队尾节点在某一瞬间会是一个孤立的节点,但是如果先

// 执行 node.prev = t 就把当前节点的前驱指向了还未设置前的队尾,这样子当我们从后往前遍历的时候,就不会出现队列断裂的情况

 node.prev = t;

 if (compareAndSetTail(t, node)) {

 t.next = node;

 return t;

 }

 }

 }

}

/**

* 节点加入队列之后操作,这里分两种情况

* 1\. 如果当前节点的前驱节点为head 头节点,则尝试获取锁,并设置新的头节点

 * 2\. 阻塞自己

 */

final boolean acquireQueued(final Node node, int arg) {

 boolean failed = true;

 try {

 boolean interrupted = false;

  // 循环尝试获取锁

 for (;;) {

 // 获取前驱节点

 final Node p = node.predecessor();

  // 前驱节点为head节点 则尝试获取锁

 if (p == head && tryAcquire(arg)) {

 setHead(node);

 p.next = null; // help GC

 failed = false;

 return interrupted;

 }

  // 判断是否需要阻塞自己

 if (shouldParkAfterFailedAcquire(p, node) &&

 parkAndCheckInterrupt())

 interrupted = true;

 }

 } finally {

  // 如果抛异常失败,则调用 cancelAcquire 取消该节点

 if (failed)

 cancelAcquire(node);

 }

}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {

 int ws = pred.waitStatus;

  // 如果前驱节点的 waitStatus 为 -1,则说明当前线程需要阻塞

 if (ws == Node.SIGNAL)

 /*

 * This node has already set status asking a release

 * to signal it, so it can safely park.

 */

 return true;

  // waitStatus大于0,说明前驱节点为取消状态,需要重新设置前驱节点

 if (ws > 0) {

  // 向前寻找waitStatus<=0的节点,并设置为新的前驱节点

 do {

 node.prev = pred = pred.prev;

 } while (pred.waitStatus > 0);

 pred.next = node;

 } else {

 /*

 * 如果前驱节点状态为 0 -2 -3,则设置前驱节点 WaitStatus = -1,并重新循环

 */

 compareAndSetWaitStatus(pred, ws, Node.SIGNAL);

 }

  return  false;

}

/**

 * 调用 LockSupport.park(this) 阻塞当前线程

 */

private final boolean parkAndCheckInterrupt() {

 LockSupport.park(this);

 return Thread.interrupted();

}

/**

* 取消节点,也就是当前线程放弃获取锁

 */

private void cancelAcquire(Node node) {

 if (node == null)

 return;

 node.thread = null;

  // 获取前驱节点,会跳过状态为取消状态的节点 Skip cancelled predecessors

 Node pred = node.prev;

 while (pred.waitStatus > 0)

 node.prev = pred = pred.prev;

  // 获取后继节点

 Node predNext = pred.next;

  // 设置节点状态为取消

 node.waitStatus = Node.CANCELLED;

  // 如果当前节点是队尾节点,则设置前驱节点为队尾,并设置前驱节点的后继节点为null

 if (node == tail && compareAndSetTail(node, pred)) {

 compareAndSetNext(pred, predNext, null);

 } else {

 int ws;

  // 如果前驱节点不是头结点,且前驱节点的 WaitStatus = -1

 if (pred != head &&

 ((ws = pred.waitStatus) == Node.SIGNAL ||

 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&

 pred.thread != null) {

 Node next = node.next;

  // 将前置节点和后继节点相连

 if (next != null && next.waitStatus <= 0)

 compareAndSetNext(pred, predNext, next);

 } else {

 // 唤醒后继节点

  // 这里为何要唤醒后继节点?

  // 因为如果当前节点的前置节点为head节点,但是此时head节点的状态为0(当前节点还未来得及设置前置节点状态为 -1),这时head节点释放了锁因为状态还为0,所以不会去唤醒后继线程。这时队列就失去活性了!所以这里需要唤醒后继线程

 unparkSuccessor(node);

 }

 node.next = node; // help GC

 }

}

/**

* 唤醒后继节点

 */

private void unparkSuccessor(Node node) {

 /*

 * 如果节点 waitStatus < 0,则尝试将状态设为0

 */

 int ws = node.waitStatus;

 if (ws < 0)

 compareAndSetWaitStatus(node, ws, 0);

 /*

 * 获取当前节点的后继节点

 * 如果 s!=null 且 s未被取消 则直接唤醒

 */

 Node s = node.next;

 if (s == null || s.waitStatus > 0) {

 s = null;

  // 从尾节点向前查找离node最近的非取消节点

  // 注意这里为何要从后向前查找

  // 在 enq方法中有段注释,如果从前往后的话,有可能出现队列断裂

 for (Node t = tail; t != null && t != node; t = t.prev)

 if (t.waitStatus <= 0)

 s = t;

 }

 if (s != null)

 LockSupport.unpark(s.thread);

}

上文基本完成了对独占锁获取锁的流程的分析,这里简单画个图来更清晰的展示整个获取锁的过程:

image.png

独占锁的释放


/**

 * 释放锁

 */

public  final  boolean release(int arg) {

  // 如果释放锁成功

 if (tryRelease(arg)) {

 Node h = head;

  // 且当前持锁的头结点状态不等于0,就去唤醒后继线程

  // 思考这里等于0 为何不行?

  // 因为当 waitStatus == 0时,说明后继线程还在运行中,还未来得及将waitStatus 状态设为-1

 if (h != null && h.waitStatus != 0)

  // 与上文unparkSuccessor方法一致

 unparkSuccessor(h);

 return true;

 }

  return  false;

}

共享锁

共享锁与独占锁最大的区别在于,共享锁允许多个线程持有锁,而独占锁在同一时间只有一个线程可以获取到锁

共享锁的获取


/**

* 获取共享锁

 */

public final void acquireShared(int arg) {

  // 如果tryAcquireShared(arg) < 0表示获取共享锁失败

  // 获取失败的原因一般是由于该资源正由独占锁占有

 if (tryAcquireShared(arg) < 0)

 doAcquireShared(arg);

}

private void doAcquireShared(int arg) {

  // 加入到队列尾部

 final Node node = addWaiter(Node.SHARED);

 boolean failed = true;

 try {

 boolean interrupted = false;

 for (;;) {

 final Node p = node.predecessor();

 if (p == head) {

 int r = tryAcquireShared(arg);

  // 一旦共享获取成功,设置新的头结点,并且唤醒后继线程

 if (r >= 0) {

 setHeadAndPropagate(node, r);

 p.next = null; // help GC

 if (interrupted)

 selfInterrupt();

 failed = false;

 return;

 }

 }

 // 判断是否需要阻塞

 if (shouldParkAfterFailedAcquire(p, node) &&

 parkAndCheckInterrupt())

 interrupted = true;

 }

 } finally {

 if (failed)

 cancelAcquire(node);

 }

}

/**

* 在获取共享锁成功后,设置head节点

* 跟据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程。

 */

private void setHeadAndPropagate(Node node, int propagate) {

 Node h = head;

 setHead(node);

 /*

 * propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一。

 * h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒,

 * 思考,这里为什么不能只用propagate > 0来决定是否可以传播

 * 因为此时有可能有其他线程释放了锁,但propagate还未更新,所以还需要判断h.waitStatus < 0

 */

 if (propagate > 0 || h == null || h.waitStatus < 0 ||

 (h = head) == null || h.waitStatus < 0) {

 Node s = node.next;

  // 后继节点为共享锁类型

 if (s == null || s.isShared())

 doReleaseShared();

 }

}

/**

* 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。

 */

private void doReleaseShared() {

 for (;;) {

 Node h = head;

  // 如果队列中存在后继线程。

 if (h != null && h != tail) {

 int ws = h.waitStatus;

 if (ws == Node.SIGNAL) {

 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))

 continue;

 unparkSuccessor(h);

 }

  // 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。

 else if (ws == 0 &&

 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))

 continue;

 }

 if (h == head)

 break;

 }

}

共享锁的释放


public final boolean releaseShared(int arg) {

 if (tryReleaseShared(arg)) {

 // 与上文doReleaseShared一样

 doReleaseShared();

 return true;

 }

 return false;

}

总结

AQS的整体理念相对比较容易理解,但是AQS真正难懂的其实是它的细节,因为一个很简单的问题一旦被放在并发环境中就会变的非常抽象,难以理解。要理解AQS的细节实现,还是需要多看,我是前前后后看了有一周,对于有些细节还是略模糊。

本篇文章参考了

AbstractQueuedSynchronizer源码解读

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容