前言
上篇介绍了AQS的概念,AQS 所支持的两种模式:独占模式和共享模式,如何去使用AQS 实现独占锁。这篇就介绍下AQS 是怎么是将线程锁住的?以及多个线程争夺时AQS 做了哪些事情?
知识点简介
volatile
volatile
是Java 中的关键字,主要有两个特性:可见性和防止重排序。在AQS主要用到第一个特性就是可见性;什么是可见性呢?学过操作系统的都知道,主内存是存放程序运行时数据的地方,但是为了速度每个CPU 都有自己的高速缓存,缓存自己从主内存加载的数据。一个线程是运行在CPU上的,当这个线程想加载某个数据时优先从自己的高速缓存里找,如果没有找到再从主内存里加载,操作完就放到高速缓存中,别的线程工作时也是优先从自己的高速缓存里读,这样就会读到旧的数据,最后导致计算出错。为了避免这种问题,保证多个CPU 的高速缓存是一致的,OS 内部有个缓存一致性协议(eg:MESI
), 而volatile 就是使用这种协议来使修饰的变量在多个线程间可见。
CAS
CAS
全称compareAndSwap,在Java中就是一个函数, 它有三个参数:修改值的内存地址,旧値,期望修改后的値;该函数大致流程如下:
- 按照内存地址取出旧的値,
- 与传进来的旧値比较是否相同:不相同则失败,否则执行步骤3
- 将内存地址的値设置为期望值
上述三个动作是具有原子性的,即不可拆分的,对应着处理器的一个原子指令
(CMPXCHG),处理器实现原子操作有两种第一种就是通过对内存总线加锁,第二种就缓存锁定只对某个缓存行进行锁定,第二种对资源的消耗低。
CLH 介绍
官方解释
CLH锁是Craig, Landin和Hagersten (CLH)锁,CLH锁是旋转锁,可以确保没有饥饿,提供公平先到先得的服务。
CLH锁是一种可伸缩的、高性能的、公平的自旋锁,基于列表,应用程序线程仅在本地变量上自旋,它不断轮询前驱状态,如果发现前驱状态释放锁则结束自旋。
简述某个线程获取锁的过程
先将自己加入到队列中,并将自己的状态
(true/false)设置为true,然后循环判断队列中前一个节点(线程)中的状态
是否为false ,这里的循环即模拟阻塞,直到队列的前一个节点释放锁,将它的状态设置为false ,则当前节点才获取锁。
实操(实现CLH)
简介
:和AQS相同只有一个状态
表示锁是否空闲,而不是队列中的每个节点都有一个状态
,此状态
有两个值1或者0: 1表示锁已经被抢占,0表示锁空闲。
原生的CLH 锁 存在一个弊端:当前节点的线程会不断轮询前一个节点的状态,它会造成CPU 使用率100%。对该弊端AQS采用阻塞和通知的手段,如果发现线程获取锁失败则将当前线程阻塞住,等前一个线程释放锁时,将它的后继线程阻塞状态解除即可。思路:
代码如下:
**
* CLH锁:一个种自旋锁,通过先进先出确保无饥饿的和公平的锁
*/
public class ClhLock {
// java 的一个不安全的帮助类,支持CAS 操作
private static Unsafe unsafe = UnsafeUtil.getUnsafe().orElse(null);
/**
* 0表示未锁住
* 1表示锁住
*/
private volatile int state = 0;
/**
* 独占锁拥有者线程
*/
private Thread exclusiveOwnerThread;
/**
* 获得锁的节点
* 开始为空
*/
private transient volatile Node head;
/**
* 尾部节点
* 开始为空
*/
private transient volatile Node tail;
/**
* 为了CAS 操作而设置的变量.
* 用下面这几个参数获取对应实体上对应字段的地址,充当CAS 第一个参数
*/
private final static long headOffset;
private final static long tailOffset;
private final static long stateOffset;
static {
try {
if (Objects.isNull(unsafe)) {
throw new IllegalStateException("Unsafe instance has not initialized");
}
headOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("tail"));
stateOffset = unsafe.objectFieldOffset(ClhLock.class.getDeclaredField("state"));
} catch (NoSuchFieldException e) {
throw new Error(e);
}
}
public ClhLock() {
}
/**
* 尝试获取锁,如果没有获取则加入等待队列
*/
public void lock() {
acquire(1);
}
private void acquire(int arg) {
//加入等待队列并返回当前节点
Node node = addWaiter(Thread.currentThread());
//轮询
for (; ; ) {
Node h = head;
//判断自己的前驱是否是占有锁的节点,若是则尝试获取锁
if (node.prev == h && tryAcquire(arg)) {
System.out.println("acquire lock thread:" + Thread.currentThread().getName());
// 获取锁成功将自己的节点设置为head
setHead(node);
return;
}
//若获取锁失败则阻塞当前线程
LockSupport.park(node.thread);
}
}
private void setHead(Node node) {
Node h = head;
// 通过cas 函数将 node 设置为head
if (compareAndSetHeadOrTail(headOffset, h, node)) {
//为了gc
node.prev = null;
node.thread = null;
}
}
/**
* 将线程入队
* @param currentThread 将要入队的线程
* @return
*/
private Node addWaiter(Thread currentThread) {
//新建一个节点代表当前线程
Node node = new Node(currentThread);
Node t = tail;
//判断尾部节点是否为空,开始时尾部节点为空
if (t != null) {
//尾部节点不为空则将尾部节点赋给当前节点的前驱
node.prev = t;
//将自己设置为尾部节点,可能不成功,会被其它线程先一步设置,若设置不了则会进入下面的enq
if (compareAndSetHeadOrTail(tailOffset, t, node)) {
t.next = node;
return node;
}
}
//若尾部节点为空(第一个线程进来),或者将当前节点设置为尾部节点失败
return enq(node);
}
protected boolean tryAcquire(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 0 && compareAndSetState(tempState, arg)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean compareAndSetState(int oldValue, int expectValue) {
return unsafe.compareAndSwapInt(this, stateOffset, oldValue, expectValue);
}
private boolean compareAndSetHeadOrTail(long offset, Node oldValue, Node expectValue) {
return unsafe.compareAndSwapObject(this, offset, oldValue, expectValue);
}
/**
* 释放锁
*/
public void unLock() {
release();
}
protected void release() {
//尝试释放锁
if (tryRelease(1)) {
Node h = head;
if (h != null) {
//唤醒后继线程
unParkSuccessor();
}
}
}
private void unParkSuccessor() {
//唤醒后继线程
Node n = head.next;
//下面逻辑暂时不管,不会出现这种情况
if (n == null) {
for (Node prev = tail.prev; prev != null; prev = prev.prev) {
if (prev != head) {
n = prev;
}
}
}
if (Objects.nonNull(n)) {
LockSupport.unpark(n.thread);
}
}
private boolean tryRelease(int arg) {
assert arg == 1;
int tempState = getState();
if (tempState == 1 && compareAndSetState(tempState, 0)) {
setExclusiveOwnerThread(null);
return true;
}
return false;
}
/**
* 队列的一个元素
*/
class Node {
/**
* 包含哪个线程,创建时实例化
*/
private Thread thread;
/**
* 前驱
*/
private Node prev;
/**
* 后继
*/
private Node next;
public Node(Thread thread) {
this.thread = thread;
}
public Thread getThread() {
return thread;
}
public Node getPrev() {
return prev;
}
public void setPrev(Node prev) {
this.prev = prev;
}
public Node() {
}
}
public int getState() {
return state;
}
public Node getHead() {
return head;
}
public Node getTail() {
return tail;
}
public void setExclusiveOwnerThread(Thread exclusiveOwnerThread) {
this.exclusiveOwnerThread = exclusiveOwnerThread;
}
个人建议:大家在看的时候,在草稿纸上比划下。因为链表比数组要抽象的。
测试代码
测试多线程执行加一操作,根据结果值是否出现不一致现象,判断是否是实现了同步的功能,未对性能进行测试
public class ClhLockTest {
private static int i = 1;
public static void incr() {
i++;
}
@Test
public void testLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(100);
ClhLock clhLock = new ClhLock();
for (int i = 0; i < 100; i++) {
new Thread(() -> {
clhLock.lock();
incr();
clhLock.unLock();
countDownLatch.countDown();
}).start();
}
countDownLatch.await();
Assertions.assertEquals(101,i);
}
}
AQS 中的CLH
简述
AQS使用了CLH 先入先出的思想,但是并不是公平的,而是抢占式的,但这取决于子类的实现方式。上文也说到AQS 只维护一个状态state
,而不像原生CLH 那样每个节点都有自己的状态。
Node
类的 构造
Node 类表示队列中的某个元素
属性
属性名称 | 类型 | 修饰 | 含义 |
---|---|---|---|
waitStatus | int | volatile | 等待状态,初始时为0,常见的几个状态下文会讲到 |
prev | Node | volatile | 当前节点的前驱,若尝试获取锁失败 则当前节点会会判断它的前驱是否头节点若是头节点则会尝试获取锁 |
next | Node | volatile | 后继节点可能会为空,当前节点释放锁时,唤醒它,但是若是后继节点为空时,则会从tail 节点向前遍历找到head 的后继节点 |
thread | Thread | volatile | 当前线程 |
nextWaiter | Node | frendly | 表示下一个节点是shared 还是exclusive |
Node
类waitStatus 的几种状态介绍
名称 | 值 | 含义以及作用 |
---|---|---|
SIGNAL | -1 | 这个节点的后继是(或者将要)阻塞的, 因此当前节点当取消或者释放时必须取消 其后继的阻塞状态。为了避免争夺, 获取方法必须首先表明他们需要一个信号, 然后重试原子获取,如果失败,则挂起。 |
CANCELLED | -2 | 因为超时或者中断这个节点取消了。 处于此状态的节点 永远不会改变。 特别的是,一个带有取消节点的线程永远不会被阻塞 |
PROPAGATE | -3 | 一个释放共享锁的节点应该传播给其它节点。 这是在 doReleaseShared方法中设置(仅针对头节点), 即使别的操作已经介入。 |
初始值 | 0 | 初始值 |
获取独占锁过程
源码分析 注释
- 尝试获取锁,由子类实现
public final void acquire(int arg) {
//调用tryAcquire函数,尝试修改state,该函数有子类实现,返回true 或者false
if (!tryAcquire(arg) &&
//若获取失败 则开始入队等待获取锁
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued 会判断该线程在获取完锁的时候是否被中断,
//若中断则继续获取锁,获取完再中断自己
selfInterrupt();
}
- 添加到队列尾部
/**
* 为当前线程和给定的模式(独占模式或者共享模式)创建节点并将节点入队
* @param mode Node.EXCLUSIVE 或者 Node.SHARED ——独占或者共享
* @return
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 尝试最快的方式入队,失败就就进入enq,不断地轮询直到成功入队
//入队就是设置tail 节点,因为会存在竞争的情况所以会设置失败
Node pred = tail;
if (pred != null) {
//将当前节点的前驱设置为尾部节点,然后尝试将当前节点设置为尾部节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
//入队成功,将其前驱的后继设置为当前节点
pred.next = node;
return node;
}
}
//以轮询的方式入队
enq(node);
// 入队成功返回当前节点
return node;
}
/**
* 将节点插入到队列中,如果需要的话初始化tail节点和head 节点
* @param node 要插入到队列中的节点
* @return 节点的后继
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) {
//如果尾部节点为空,则需要将head 和tail 初始化,延迟加载有不有
if (compareAndSetHead(new Node()))
tail = head;
} else {
//若尾部节点不为空则
// 将当前节点的前驱设置为尾部节点,
// 然后尝试将当前节点设置为尾部节点
node.prev = t;
if (compareAndSetTail(t, node)) {
//入队成功,将其前驱的后继设置为当前节点
t.next = node;
return t;
}
}
}
}
- 已在队列中的线程以独占模式获取锁
/**
*
*已在队列中的线程以独占模式获取锁并且不可中断(不是重点)
* @param node 当前线程对应的节点
* @param arg 获取参数 与子类定义有关不用管
* @return 如果在等待的时候被中断则返回true
*/
final boolean acquireQueued(final Node node, int arg) {
//表示获取锁是否失败,若最后还是未成功则取消该节点
boolean failed = true;
try {
//标识当前线程(节点)是否中断
boolean interrupted = false;
//轮训
for (;;) {
//获取前驱
final Node p = node.predecessor();
//判断前驱是否是head 节点,若是则尝试获取锁
if (p == head && tryAcquire(arg)) {
//若获取锁成功,则将当前节点设置为head
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果获取锁失败,则设置前驱的waitStatus为SIGNAL :下一个循环若还是失败则阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
//若前驱的waitStatus 为SIGNAL 则将当前线程阻塞,被唤醒的时候返回该线程是否被中断
parkAndCheckInterrupt())
// 若被中断
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
流程
- 调用tryAcquire方法尝试获取锁,成功就代表已获取锁
- 第一步失败则入队
- 入队后,然后开始轮询尝试获取锁
- 第三步轮询两次失败就将自己阻塞住,等待前驱释放时解除自己的阻塞状态
释放独占锁过程
源码分析 注释
public final boolean release(int arg) {
//尝试释放锁由子类实现,正常返回true
if (tryRelease(arg)) {
Node h = head;
//如果head 不为空,且head的waitStatus 不为0(其后继等待时将其设置为SIGNAL)
if (h != null && h.waitStatus != 0)
//唤醒后继
unparkSuccessor(h);
return true;
}
return false;
}
/**
* 唤醒给定节点的后继
* @param node 给定的节点
*/
private void unparkSuccessor(Node node) {
/**
* 若果waitStatus 小于零则需要被唤醒
* 这里只考虑为SIGNAL
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* 唤醒后继
* 通常其后继就是下一个,但是因为可能后继取消或者为空
* 需要从尾部向前遍历寻找一个距离head最近的不为空且没有被取消的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//唤醒后继线程
LockSupport.unpark(s.thread);
}
流程
- 调用tryRelease 释放锁
- 若head 节点不为空且waitStatus 不等于0,则唤醒后继线程
获取共享锁过程
源码分析 注释
public final void acquireShared(int arg) {
/**
* 尝试获取共享锁
* tryAcquired 返回 一个整数,为负数则获取失败,不同的子类实现方式不一样
* Semaphore 类返回的是剩余多少许可证
*/
if (tryAcquireShared(arg) < 0)
//获取失败 则入队等待获取
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
// 以共享模式入队,如何入队上文已介绍
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断当前节点的前驱是否是头节点
if (p == head) {
//若是头节点,则尝试获取锁
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功
/**
* 将当前节点设置为头
* 判断其后继节点是否为shared
* 若是则释放自己,因为共享模式表示多个线程都可以同时占有锁
* 注意第二个参数 是tryAcquireShared 的返回值,方法里会用到
*/
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
//和独占锁相同 具体可以上面
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* 设置队列的头,
* 并检查如果后继以共享模式等待且propagate(tryAcquireShared 返回值) >0
* 或者waiStatus 被设置为PROPAGATE
* @param node 当前节点
* @param propagate 一个tryAcquireShared 的返回值
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* 当propagate 大于 零
* 或者h.waitStatus 小于0
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
//释放共享锁,下面详解
doReleaseShared();
}
}
流程
- 尝试获取共享锁成功即返回
- 第一步尝试获取锁失败,添加到队列尾部,若需要初始化则先初始化head和tail 节点
- 然后再次尝试获取锁,若失败则将waitStatus 设置为SIGNAL,下一个循环获取锁失败,则挂起当前线程。
释放共享锁过程
源码分析 注释
/**
* 释放共享锁,唤醒它的后继并保证传播
*/
private void doReleaseShared() {
/*
* 保证一个释放会传播下去,即使有正在获取或释放。
* 如果需要,以一个通常的方式去唤醒head的后继,
* 如果不需要,就把head 的status 设置为PROPAGATE,
* 保证后面的释放传播可以继续
*/
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
// 如果h 的waitStatus 是SIGNAL,则先尝试将其设置为0
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
//唤醒后继
unparkSuccessor(h);
}
//若waitStatus 为0,则将其设置为PROPAGATE,为后续的释放保证传播这个行为
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
流程
- 解除对共享锁的占用
- 唤醒head的后继并保证传播
后记
AQS 作为Java 锁的基架,其重要性不言而喻。本文只是从获取锁释放锁的角度去分析,没有涉及到waitStatus 为条件的情况以及取消状态进行分析,下一节将从这个角度去分析下AQS,大家有啥疑问以及以及指正咱们评论区见。