初识
ReentrantLock在并发场景下经常用到,非常重要。它是对lock接口实现的可重入锁,保证多线程安全。作用是对线程进行加锁,并且锁可重入,即一个线程获取锁后可重复获取锁而不会发生阻塞,其定义的内部类Sync继承AQS抽象类。通过AQS抽象类实现大部分的功能,小部分功能由Sync类实现,如AQS类里的tryAcquire()由子类nonfairTryAcquire()实现等等
基本使用
public static ReentrantLock lock = new ReentrantLock();
private static void add() {
try {
lock.lock();
count++;
}finally {
lock.unlock();
}
}
使用new ReentrantLock即可进行实例,对需要线程安全部分首尾调用lock()进行加锁,业务代码执行完调用lock.unlock()释放锁,最好是在finally中调用解锁,以防业务代码出现问题,锁不释放带来的死锁问题。
源码解析
1.核心是AQS这个类,构造一个CLH队列,结构如图
2.ReentrantLock实现了Lock,以及Serializable接口,提供了两个构造方法,其中无参构造即实例一个非公平锁,有参构造方法里如果是true则构造公平锁
非公平锁,当前线程释放锁后,后续线程无需顺序竞争锁。
公平锁,当前线程释放锁后,后续线程按创建时间竞争锁。
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
3.当直接new一个lock实例即创建一个非公平锁
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
NonfairSync类是Sync抽象类的一个子类,Sync下有两个子类,分别对应公平和非公平,同时继承父类AbstractQueuedSynchronizer也就是AQS,AQS类完成加锁以及阻塞队列构建和运转
关系图
Sync抽象类里定义的几个方法,
其中lock方法由两个类子类FairSync和NonFairSync实现,
nonfairRtyAcquire方法即为获取锁的实现被两个子类调用,后面再说。
4.NonfairSync子类的lock()方法实现(非公平锁)
final void lock() {
//4.1.首先逻辑分支语句,第一个分支使用Sync的父类AQS类里的compareAndSetState()方法来获取锁。
//AQS类里定义一个volatile修饰的变量state;用来标识当前锁是否被占用,当没有其他线程获取锁时将state值变为1,并且调用AQS的setExclusiveOwnerThread()方法将线程所有者设置为当前线程。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
//4.2.如果获取锁没有成功调用AQS的acquire(1)方法。
acquire(1);
}
5.继上一步调用AQS的acquire(1)方法,逻辑控制语句里有两个部分,都为true时,即线程已经经历过一个完整的挂起到唤醒过程,然后再调用AQS的selfInterrupt()方法对当前线程进行标识中断(其实是对下面acquireQueued方法的一个中断补偿),具体何时中断由线程自己决定。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
5.1.第一部分:AQS类里的tryAcquire()方法调用子类Sycn的nonfairTryAcquire()方法,入参为1。
nonfairTryAcquire()方法有三个分支。
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//第一个分支,依旧是使用cas原理获取锁,如果获取成功设置所属线程,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//第二个分支,则是ReenTrantLock的可重入性,即如果当前线程和所属线程一致,则将state值加1并且返回true。
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
//第三个分支,当上面都不能成立直接返回false。
return false;
}
5.2.第二部分:acquireQueued(addWaiter(Node.EXCLUSIVE), arg)首先调用addWaiter()方法,将当前线程封装成node加入到队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 判断尾结点是否存在,如果存在将新节点插入到尾结点,将尾结点指向修改为新节点。
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果不存在调用enq(node)方法
enq(node);
return node;
}
//将node设置为tail尾结点,此时head头节点是null
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
里面addwaiter方法执行完,接下来是acquireQueued(node,1)方法
final boolean acquireQueued(final Node node, int arg) {
//失败标记
boolean failed = true;
try {
//中断标记
boolean interrupted = false;
//自旋获取锁,失败则挂起尝试获取锁的线程(我称之为竞争线程),直到被唤醒
for (;;) {
//获取前驱节点
final Node p = node.predecessor();
//如果前驱节点是头节点,则尝试获取锁
if (p == head && tryAcquire(arg)) {
//将当前节点设置为头节点
setHead(node);
//释放内存,帮助垃圾回收
p.next = null; // help GC
failed = false;
//竞争线程获取锁后,返回false,结束循环
return interrupted;
}
//下面这段主要是竞争线程获取锁失败时执行,if逻辑里有两个重要方法主要实现线程阻塞,在5.2.x里分析
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
5.2.1.shouldParkAfterFailedAcquire()方法主要是用来修正当前节点以及前驱节点的状态,并判断当前node是否可以进行安全阻塞。如果可以安全阻塞则返回true继续调用parkAndCheckInterrupt()进行阻塞中断。
volatile int waitStatus; //node节点状态,默认是0
static final int CANCELLED = 1 // waitStatus值,指示线程已取消
static final int SIGNAL = -1 // waitStatus值,指示后续线程需要唤醒
static final int CONDITION = -2 // waitStatus值,指示线程正在等待条件
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//如果前驱节点的状态是-1,返回true,表示当前node可以挂起,即上面5.2可调用parkAndCheckInterrupt()
if (ws == Node.SIGNAL)
return true;
//如果前驱节点状态>0,则表示前驱节点已经取消,下面do while循环找到前驱节点的前驱节点,并将它设置为node的前驱节点
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//如果不等于-1,且>0,则将前驱节点的状态改为-1。这段方法第一次进来会执行此处。注意此时并没有去返回true来挂起当前node,而是返回false。
//因为前驱节点状态默认是0。此方法外部自旋第二次循环才走第一个if返回true,然后开始调用parkAndCheckInterrupt()挂起当前node,即挂起竞争线程
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
5.2.2.parkAndCheckInterrupt()只做两件事 1挂起 2检查中断状态,这里特别说明下,线程挂起和线程中断不是一回事,此方法挂起线程后,代码将不再继续执行,直到线程被中断,或者被其他线程进行unpark唤醒,才能继续return。
private final boolean parkAndCheckInterrupt() {
//挂起竞争线程。
LockSupport.park(this);
//如果是被unpark唤醒,即返回false,如果是被中断唤醒,返回true,重置竞争线程中断状态为false。
return Thread.interrupted();
}
继续parkAndCheckInterrupt()返回后进行分析。两种情况:
第一种情况,通过unpark方法唤醒,return Thread.interrupted() => false,进入acquireQueued()的自旋中,进行再次获取锁操作后。此时return的interrupted标识还是false,结束acquireQueued()后,第5步分析的selfInterrupt()的逻辑判断不成立,所以是不执行的,即没必要进行中断补偿。
第二种情况,通过中断唤醒,return Thread.interrupted() => true,进入acquireQueued()的自旋中,进行再次获取锁操作后,此时interrupted = true,第5步分析的selfInterrupt()的逻辑判断成立,进行中断补偿。
总结:到此线程的加锁流程已经分析完,如果竞争线程执行了最后一步parkAndCheckInterrupt()方法,则进入挂起状态,等待前面拥有锁的线程释放锁后,竞争线程被唤醒,再次走一遍流程去尝试获取锁,释放锁并唤醒等待线程在下一篇文章里再做源码解析。
----------------- 文章如有问题,请下方回复指出,感谢查阅😁 -----------------