JUC-AQS-ReentrantLock
- 前置知识
公平锁和非公平锁
可重入锁
自旋锁
LockSupport
数据结构之链表
设计模式之模板设计模式
- java.util.concurrent.locks 接口摘要
什么是AQS
AQS(Abstract Queue Synchronizor,抽象队列同步器)框架:通过一个volatile修饰的state来表示同步的状态,使用内置的一个FIFO队列来完成抢占资源的排队状态,把每个未抢占资源的线程封装成双向链表的一个节点,各线程通过CAS来修改state的值。整个过程都围绕着自旋锁、CAS和Support类提供的park/unpark来完成线程的睡眠与唤醒。
用来构建锁或者其它同步器组件的重量级基础框架及整个JUC体系的基石,通过内置的FIFO队列来完成资源获取线程的排队工作,并通过一个int类变量表示持有锁的状态.
- 抢到资源的线程直接使用处理业务,抢不到资源的必然涉及一种排队等候机制。抢占资源失败的线程继续去等待(类似银行业务办理窗口都满了,暂时没有受理窗口的顾客只能去候客区排人等候),但等候线程仍然保留获取锁的可能且获取锁流程仍在继续(候客区的顾客也在等着叫号,轮到了再去受理窗口办理业务)。
- 既然说到了排队等候机制,那么就一定会有某种队列形成,这样的队列是什么数据结构呢?
- 如果共享资源被占用,就需要一定的阻塞等待唤醒机制来保证锁分配。这个机制主要用的是CLH队列的变体实现的,将暂时获取不到锁的线程加入到队列中,这个队列就是AQS的抽象表现。它将请求共享资源的线程封装成队列的结点(Node) ,通过CAS、自旋以及LockSupport.park()的方式维护state变量的状态,使并发达到同步的效果。
Provides a framework for implementing blocking locks and related synchronizers (semaphores, events, etc) that rely on first-in-first-out (FIFO) wait queues. This class is designed to be a useful basis for most kinds of synchronizers that rely on a single atomic int value to represent state. Subclasses must define the protected methods that change this state, and which define what that state means in terms of this object being acquired or released. Given these, the other methods in this class carry out all queuing and blocking mechanics. Subclasses can maintain other state fields, but only the atomically updated int value manipulated using methods getState, setState and compareAndSetState is tracked with respect to synchronization.
AQS内部维护属性volatile int state (32位)
- state表示资源的可用状态
State三种访问方式
getState()、setState()、compareAndSetState()
AQS定义两种资源共享方式
- Exclusive-独占,只有一个线程能执行,如ReentrantLock
- Share-共享,多个线程可以同时执行,如Semaphore/CountDownLatch
AQS定义两种队列
- 同步等待队列
- 条件等待队列
不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:
- isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
- tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
- tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
- tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
- tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false。
AQS内部体系架构
同步等待队列
AQS当中的同步等待队列也称CLH队列,CLH队列是Craig、Landin、Hagersten三人发明的一种基于双向链表数据结构的队列,是FIFO先入先出线程等待队列,Java中的CLH队列是原CLH队列的一个变种,线程由原自旋机制改为阻塞机制。
条件等待队列
Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备时,这些等待线程才会被唤醒,从而重新争夺锁
abstract void lock()
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
对比公平锁和非公平锁的tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断!hasQueuedPredecessors()
hasQueuedPredecessors()中判断了是否需要排队,导致公平锁和非公平锁的差异如下:
- 公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
- 非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一- 个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)
- 模拟三个线程
public class AQSDemo {
public static void main(String[] args) {
ReentrantLock reentrantLock = new ReentrantLock();//非公平锁
// A B C三个顾客,去银行办理业务,A先到,此时窗口空无一人,他优先获得办理窗口的机会,办理业务。
// A 耗时严重,估计长期占有窗口
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in A");
//暂停50分钟线程
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
} finally {
reentrantLock.unlock();
}
}, "A").start();
//B是第2个,B一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口。
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in B");
} finally {
reentrantLock.unlock();
}
}, "B").start();
//C是第3个,C一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口,前面是B顾客,FIFO
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in C");
} finally {
reentrantLock.unlock();
}
}, "C").start();
}
}
RUN> 🚗🚗🚗🚙🚙🚙
----come in A ----come in B ----come in C
AQS 初始状态
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
ThreadA 来来抢占业务,当前state=0
,compareAndSetState(0, 1)
设置成功
主流程-分支1-tryAcquire
接下来B线程来抢占业务 lock-->acquire(1)--> tryAcquire(1)
-->nonfairTryAcquire(1)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
主流程-分支2-addWaiter
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
主流程-分支3-acquireQueued
addWaiter()
入队成功后,接着acquireQueued(Node(ThreadB))
Acquires in exclusive uninterruptible mode for thread already in queue. Used by condition wait methods as well as acquire.
Params:
node – the node
arg – the acquire argument
Returns:
true if interrupted while waiting
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();//前驱节点
if (p == head && tryAcquire(arg)) {//哨兵节点等于head,则再去抢占锁tryAcquire()
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) && //shouldParkAfterFailedAcquire 自旋两次,返回true
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
shouldParkAfterFailedAcquire
自旋两次,返回true,接着parkAndCheckInterrupt
线程ThreadB--> LockSupport.park
挂起成功!🚥🚥🚥
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
- C是第3个,C一看到受理窗口被A占用,只能去候客区等待,进入AQS队列,等待着A办理完成,尝试去抢占受理窗口,前面是B顾客,FIFO
new Thread(() -> {
reentrantLock.lock();
try {
System.out.println("----come in C");
} finally {
reentrantLock.unlock();
}
}, "C").start();
addWaiter(Node.EXCLUSIVE), arg)
ThreadC的Node入队列
acquireQueued
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
内部自旋两次,第一次把前置节点waitStatus修改成Node.SIGNAL
第二次,挂起当前线程Thread-C
线程Thread-C入队完毕
unlock解锁流程
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;//拿到傀儡节点的下一个节点 s;
if (s == null || s.waitStatus > 0) {//s.waitStatus 不能于-1 ,从后往前找到最前面waitStatus<=0 的节点
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
唤醒节点的处理器
参数:哨兵节点(傀儡节点) h
1.拿到傀儡节点的ws状态值-1;
2.判断ws < 0 ? CAS设置ws的值为0;
3.拿到傀儡节点的下一个节点 s;
4.判断 s == null 或者 s.waitStatus > 0
5.如果s != null,LockSupport.unpark(s.thread)唤醒线程;
state已经改为0了,阻塞的线程被唤醒,回到accquireQueued的死循环加锁的流程
经过线程A-> unparkSuccessor
线程Thread-B唤醒,return Thread.interrupted();
老哨兵退位,新哨兵上位,新的线程变成ThreadOwen