博客思路介绍
concurrent
框架的思路在AQS
中有不少体现。所以我们打算着重记录一下,记录的思路如下:
- 引入及介绍
AQS
队列通用的方法 - 介绍
AQS
预提供的各种和锁获得、释放相关的方法;及暴露出来的用来重写的方法。 - 介绍队列相关的监控方法
- 介绍
Condition
相关的方法 - 扩展、总结及展望
本文主要介绍AQS
预提供的各种和锁获得、释放相关的方法,并介绍一些定义成空实现的用来覆盖的方法和依赖这些空方法的锁操作方法。
各种获得锁的相关方法
取消“尝试获取”
源码
/**
* 取消对应节点获取锁的尝试【把这个节点从队中删了】
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// 如果节点不存在,就不用删了
if (node == null)
return;
node.thread = null;
// 找到这个节点的有效的前驱节点【顺手把waitStatus为 CANCELLED 的已取消节点摘了】
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// 获得有效前驱节点的next指向
Node predNext = pred.next;
// 这里直接采用的赋值,因为都要取消这个节点了,不管有人取消了还是激活了,
// 你要操作的结果就是取消,换句话说,这个类似restful中的可重试性,结果是
// 一定的、不变的
// 这一步之后,如果后面再有删除的节点,向前遍历时会掠过此节点
// 在此操作之前,我们的取消操作不会收到其他线程的影响【无条件赋值】
node.waitStatus = Node.CANCELLED;
// 接下来进行取消后的操作,该删删、该唤醒唤醒
// 如果被取消的节点是尾节点就移动 tail 指针
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// 如果被取消的点是一个队中间的普通节点。那么
// 设置先驱节点为 SIGNAL 并修改next指针
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
// 如果先驱节点是头节点,直接唤醒节点,
// 节点唤醒操作完成后会有后续操作来删此节点
unparkSuccessor(node);
}
node next = node; // help GC
}
}
思路
这个难理解的地方有以下:
为什么只调整pre
不调整next
在此双向队列中,只有pre
是真的指向可靠的,next
的定位是加快找后继节点的速度,即不保证可靠,只是图方便。【从上一节的入队列也能看出来】
为什么区分队头、队中、队尾
这个和他的判断逻辑相关,首先我们确定一下,队头这里指的是他是head
指向节点的后继节点,队尾是他是tail
指向的节点。
- 如果他是
head
指向节点的后继节点,那如果他被删了,后继节点就是新的no.1,直接唤醒后继节点即可,后继的线程线程获得执行后会根据 pre 检测是否队首,如果是则出队,出队方法就是设置入参node为head指向,不管中间是否还有废节点,都掠过 - 如果他是
tail
指向的节点,删除后得记得设置tail
- 如果他是队列中间的一个普通节点,删除就像正常的双向队列操作删除
为什么只有上三种情况,而没有node
为head
指向的情况
head
指向的节点是已经通过竞争或者入队排队竞争获得锁的节点。人家已经正常的执行了,不存在取消的情况。取消指的是停止竞争锁,不是线程上的取消,我们的队列也不会决定一个线程的存亡,只是协调线程的阻塞和唤醒。
待删除的节点为队中的节点,那一大串条件如何解读
pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
&& pred.thread != null
条件有三部分,要同时满足
pred != head
结合之前的if..else
,是指node
是队中的节点((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
意思是如果前驱节点有效(ws
不是CANCELLED
),就设置成SIGNAL
,方便唤醒到这里时能继续唤醒后继节点-
pred.thread != null
保证在操作之后还是有效的。因为节点被取消/作废的特点是:
ws == CANCELLED || thread == null
判断获取失败后是否应阻塞
源码
/**
* 当一个队列中的节点获得锁失败时(可能是各种原因吧,比如锁释放时唤醒后者,结果和刚来未进队的新线
* 程竞争没竞争过人家)是否应该阻塞此线程等待
*
* @param pred node的前驱,保存着pred点的状态和要对node点的操作
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* 前驱节点设置未自己得以执行后会唤醒node,所以没问题可以安心阻塞等人叫
*/
return true;
/*
* 无法阻塞,看看是啥原因并进行修改
*/
if (ws > 0) {
/*
* 前驱节点失效,被取消了,更新前驱节点
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* ws 不是 SIGNAL 不是 CALCELLED,可能是0或者 CONDITION ROPAGATE,我们认为
* 他需要信号但是还没阻塞,所以我们把他置成 SIGNAL。 调用者需要重试以确定是
* 否是真的不需要阻塞
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
思路
此函数的思路比较奇怪,因为他并不是单纯的判断此节点竞争锁失败之后是否可以阻塞等待,他也在node
的前置节点不符合的情况下对前置节点进行修正,以使得通过循环调用此函数可以使节点
对应的线程可以阻塞等待。
用中文来说,这个函数的名字叫做“为未成功获取锁的线程扫平障碍”更合适。
问题:
- 在
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
的官方源码的注解中只写了ws
可能是0或者 ROPAGATE ,没有写 CONDITION ,不清楚为何,这个问题保留。
中断当前线程
源码
/**
* 用来中断当前线程的
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
思路
为啥会抽出这个函数老实说我也有点懵逼,可能为了后面如果有什么扩展能更好的应用进去吧。
阻塞并检查中断
源码
/**
* 阻塞当前线程,然后根据当前线程是否被打断返回布尔值
*
* @return 如果被中断过,就返回 true
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
思路
注意:
-
LockSupport.park(this)
会使当前线程一直阻塞,阻塞结束后继续执行下一行的return
-
Thread.interrupted()
会返回当前线程是否被打断过,返回后会将当前线程是否打断过的状态记录成false
队列中的节点尝试获得锁
源码
/**
* 线程已经进队列并生成了节点之后
* 线程以互斥的、不可打断的方式获得锁
*
* @param node 此线程对应的node
* @param arg the acquire argument
* @return 如果被中断过就返回true
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 如果线程对应的节点位于head的后继且尝试获得锁成功
// 就使此节点出队
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 线程对应的节点没有获得锁,排队没排到或者排到了没
// 竞争到。如果可以阻塞等待更好,不用死循环占用cpu资
// 源了,如果不能就循环呗
// 这里可以和上面的shouldParkAfterFailedAcquire()
// 结合起来,能阻塞就阻塞,不能阻塞就清除掉阻碍的条件
// 以方便阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 没有 return 就离开了循环,可能是 tryAcquire()抛出了
// 异常
// 这里可以结合之前的 cancelAcquire() 这个节点获取锁失败,
// 这个点就应该从队列中处理掉了
if (failed)
cancelAcquire(node);
}
}
思路
没啥要特殊说的。
总体来说,各种获得锁的方法中的入参都是 args
,只有这个有node
,为什么呢?
可能是因为这个最基本吧,干脆写的通用一点,方便复用。
为什么只管自己,没有唤醒后继
出队意味着获得锁,如果是互斥模式,后继肯定没法拿锁了,所以没必要唤醒。在释放锁(release
相关方法)才管后继的唤醒。
获取锁,可打断
源码
/**
* 线程未进队列,
* 以互斥状态、可打断的方式获取锁
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
// 先把此线程加进队列中,剩下的操作和 acquireQueued()
// 除了对线程打断的处理不同,其他的基本都一样了
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
思路
doAcquireInterruptibly()
和acquireQueued()
最大的区别有两个:
- 线程的不同:
-
doAcquireInterruptibly()
线程未进入队列,要现入队生成节点 -
acquireQueued()
线程已进入队列直接获得前驱即可
-
- 对线程打断的反应不同:
-
doAcquireInterruptibly()
抛异常 -
acquireQueued()
返回boolean
值
-
获取锁,在给定的n纳秒内
源码
/**
* 线程未进队列
* 再给定的时间内通过互斥模式获得锁
*
* @param arg the acquire argument
* @param 等待时间
* @return 如果在等待时间内成功获得锁就返回 true
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
// 已超时,直接返回
if (nanosTimeout <= 0L)
return false;
// 记录 deadLine
final long deadline = System.nanoTime() + nanosTimeout;
// 线程入队列排队
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
// 一样的套路
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
// 根据剩余时间选择不同的方式:
// 1. 如果剩余的时间多,则直接阻塞等待
// 2. 如果剩余的时间少,就循环检查吧,毕竟线程的唤醒
// 阻塞要消耗不少资源
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
// 日常在获取失败后处理队列中的失败节点
if (failed)
cancelAcquire(node);
}
}
思路
纳秒, 1纳秒 = 秒
时间分界: 1000纳秒
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;
获取锁,共享状态
源码
/**
* 线程未进队列
* 以共享的模式、不可打断的获得锁
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// 如果获取成功返回的为正值
int r = tryAcquireShared(arg);
if (r >= 0) {
// 出队并根据后继的情况唤醒后继共享的节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
思路
没啥特别的,共享模式和互斥模式都要求head
的后继才能释放。因为共享模式也是一个一个释放的节点
和互斥模式的区别
这里调用的是setHeadAndPropagate()
,共享模式获得锁后,后面的可能也能获得锁,所以得唤醒后继,让后面的检查一下
获取锁,共享状态,可打断
源码
/**
* 和互斥模式的能打断、不能打断的关系差不多,没啥变化
* @param arg the acquire argument
*/
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
思路
没得说
获取锁,共享状态,在给定的n纳秒内
源码
/**
* 同理
*
* @param arg the acquire argument
* @param 等待时间
* @return 在等待时间内成功获得锁就返回 true
*/
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return true;
}
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
思路
用于覆盖的方法
此处介绍的是一些用于继承的空方法,这些方法默认的实现是抛出UnsupportedOperationException
,需要子类根据自己的情况动态实现。
上面介绍的各种各样获得锁的方法,都是依赖这些方法做核心实现的。是不是很像集合类源码阅读时了解到的适配器模式?
获得锁
源码
/**
* 尝试以互斥模式获得锁,此方法应该先检查本对象的状态判断是否能以互斥
* 模式让他获得锁。
*
* 要获取锁的线程会调用这个方法,如果调用失败,就进队列阻塞直到被唤醒,
* 这个方法可以用来实现 Lock.tryLock()方法
*
* @return 获得成功就返回 true
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
思路
当然,如果你要以贪心模式进行竞争的话就该怎么写怎么写,如果你想公平一点,
那就return false;
让所有线程都默认去排队,不要插队。
释放锁
源码
/**
* 尝试释放互斥状态的锁,仅尝试释放锁,不做其他操作,例如:
* 1. 不会释放信号唤醒其他线程
* 2. 不会对队列操作
*
* 对于队列操作来说,tryRelease() 仅仅是个条件判断,判断此线程
* 的锁是否可以释放
*
* @return 如果该对象已经完成释放就返回 true ,不能释放就返回 false
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
思路
获得锁,共享模式
源码
/**
* 尝试以共享状态获得锁,此方法要先检查本对象的状态是否允许以共享状态
* 获得锁。
* 其他的和 tryAcquire() 方法一样
*
* @return 如果获取失败返回负值
* 如果获取成功且之后的共享模式不能再成功了,返回0
* 如果获取成功且之后的共享获取还能成功,返回1
*
* 返回值非负都表示此次获取是成功的。
*/
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
思路
这么设计的原因是在多线程共同读取某个文件时可以限制线程并发的数量,比如最多可以三个线程一起读取。
释放锁,共享模式
源码
/**
* 尝试以共享状态释放锁。
*
*
*
* @return 如果释放成功返回 true
* 如果释放失败返回 false
*
* 如果返回 true ,则其他等待的 共享/独占模式的节点都可以
* 参与竞争锁了。
*/
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
思路
判断锁是否被本线程以独占模式占有
源码
/**
* 判断调用此方法的线程是否以互斥状态获得此锁
* 注意,此方法要立即返回,不能阻塞
*
* 此方法被 ConditionObject 中的方法使用,如果你不用 ConditionObject
* 那么也不用实现此方法。
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
思路
向外暴露的整合功能的方法
获得锁
源码
/**
* 以互斥模式、不可打断的获得锁
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
思路
这个if
中的条件还是用了短路的特点,实现思路如下:
- 先尝试直接获得锁。通过调用
tryAcquire()
- 插队不行,就排队吧,进队列。通过调用
addWaiter(Node.EXCLUSIVE)
- 进队列了,然后阻塞线程以排队等待唤醒。通过调用
acquireQueued()
- 如果通过排队之后根据结果进行后续处理
- 调用成功了,
acquireQueued()
返回的是false
,及正常执行没有被中断 - 调用失败了,
acquireQueued()
返回的是true
,表示经历了线程中断,然后就再次中断本线程以重设中断标志位,方便本方法的调用者检测中断。
- 调用成功了,
获得锁,不可打断
源码
/**
* 以独占模式、可打断的获得锁。
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
思路
一样的套路。
获得锁,在指定的n纳秒内
源码
/**
* 以独占模式、可打断的获得锁,在指定的n纳秒内
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return {@code true} if acquired; {@code false} if timed out
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
思路
||
短路
释放锁
源码
/**
* 以独占的方式释放锁
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return 和 tryRelease() 返回的结果一样
*/
public final boolean release(int arg) {
if (tryRelease(arg)) { // 尝试释放锁
//可以释放锁并释放了,接下来进行队列的调整以及
// 信号的转播(如:唤醒后继节点)
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
思路
获得锁,共享模式
源码
/**
* 以共享模式、可打断的获得锁。
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
思路
一个套路,先插队,插队不行排队阻塞慢慢等
获得锁,共享模式,不可打断
源码
/**
* 获得锁,共享模式,可打断。 套路不变
*
* @param arg the acquire argument.
* This value is conveyed to {@link #tryAcquireShared} but is
* otherwise uninterpreted and can represent anything
* you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
获得锁,共享模式,在n纳秒内
源码
/**
* 获得锁,共享模式,可打断。
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
* @param nanosTimeout the maximum number of nanoseconds to wait
* @return 如果左后获得了锁就返回 true
* @throws InterruptedException if the current thread is interrupted
*/
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquireShared(arg) >= 0 ||
doAcquireSharedNanos(arg, nanosTimeout);
}
思考
刚开始是不是有点懵。。。。原来都是 tryAcquireShared(arg)<0
,这次怎么反了?
。。。。。。。这次和之前原理不一样,用的不是if
,是||
的短路。
思路还是一个思路滴。
释放锁,共享模式
源码
/**
* 尝试以共享模式释放,释放成功后会继续唤醒后面的线程
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
总结
本文主要介绍了三类方法:
- 各种获得锁的相关方法
- 用于覆盖的方法
- 向外暴露的整合功能的方法
其中:
- 各种获得锁的相关方法主要负责
- 队列中线程阻塞、唤醒控制
- 维护节点状态变化及双向队列节点之间的关系
- 用于覆盖的方法主要负责
- 专业业务场景业务定制【只负责此线程的状态变化,不关其他】
- 向外暴露的整合功能的方法主要负责
- 整合前两者,将队列相关操作封好不向外暴露
- 引入先尝试,失败后入队重试的机制
用一个图来表示三者的关系:
扩展
自旋锁
自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。
自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。因此操作系统的实现在很多地方往往用自旋锁。Windows操作系统提供的轻型读写锁(SRW Lock)内部就用了自旋锁。显然,单核CPU不适于使用自旋锁,这里的单核CPU指的是单核单线程的CPU,因为,在同一时间只有一个线程是处在运行状态,假设运行线程A发现无法获取锁,只能等待解锁,但因为A自身不挂起,所以那个持有锁的线程B没有办法进入运行状态,只能等到操作系统分给A的时间片用完,才能有机会被调度。这种情况下使用自旋锁的代价很高。
获取、释放自旋锁,实际上是读写自旋锁的存储内存或寄存器。因此这种读写操作必须是原子的。通常用test-and-set等原子操作来实现