1.ReentrantLock和AQS
ReentrantLock中使用了AbstractQueuedSynchronizer也就是AQS,完成了锁的获取和释放等。从ReentrantLock类中进入,我们看到它的默认构造方法:
public ReentrantLock() {
sync = new NonfairSync();
}
默认使用的是非公平锁,而它的lock方法中,使用的sync.lock()是非公平锁的实现。
public void lock() {
sync.lock();
}
2.获取锁
我们进入NonfairSync的lock方法中,查看内部实现如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
stateOffset = unsafe.objectFieldOffset
(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
AQS中的锁是可重入的,即同一个线程可以多次获取锁
AQS中的锁也是独占的,即一个线程没有释放锁时,其他的线程不能获取锁
这里,对state的偏移量中的内存进行CAS操作,设置为1,表明第一次获取锁。AQS中,state是一个volatile变量,表示线程获取锁的次数,state为0,表示没有获取锁,state大于1,则表示重入锁获取的次数。执行完compareAndSetState第一次获取锁之后,执行setExclusiveOwnerThread,在AbstractOwnableSynchronizer类中设置成员变量Thread exclusiveOwnerThread为当前线程,表示这个线程独占了这个锁。
如果第二次调用lock()获取锁,由于内存中state不为0,cas失败,则进入else执行acquire(1);
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
该方法执行逻辑如下,如果tryAcquire为true,则直接返回,否则执行acquireQueued(addWaiter(Node.EXCLUSIVE), arg),tryAcquire是一个抽象方法,NonfairSync中的实现如下:
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
// 得到state数量
int c = getState();
// 没有获取过锁
if (c == 0) {
// 获取一次,cas设置state为acquires,也就是1
if (compareAndSetState(0, acquires)) {
// 设置拥有者线程
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
// 增加重入次数
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 设置state,这里不需要同步,因为已经是统一线程第二次获取锁,也是在本线程里获取锁,没有线程安全问题,第一次调用的state和ownerThread一定是可见的
setState(nextc);
return true;
}
return false;
}
3.队列中添加元素
acquires参数为每次获取锁的增量,传入1,每次递增1。其中的判断c==0的这段逻辑其实在NonfairSync的lock方法中又实现了一遍,个人认为重复了,lock方法中可以去掉这一块。解释完这段代码,回到acquire方法中,执行
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
private Node addWaiter(Node mode) {
// mode=Node.EXCLUSIVE,排他模式 Node = null
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 获取AQS的尾节点
Node pred = tail;
// 尾节点不为null,在后方插入新的node,也就是null节点
if (pred != null) {
node.prev = pred;
// 设置AQS的tail为node节点
if (compareAndSetTail(pred, node)) {
// 连接上一个尾节点和node
pred.next = node;
// 返回新建的节点,封装了本线程
return node;
}
}
// 尾节点为null,则在队列中加入本节点
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果其他线程在之前往队列中添加了节点,则tail可能不为null,需要再次判断
if (t == null) { // Must initialize
// 尾节点为null则需要新建Node,cas设置头节点为node,head为冗余节点
if (compareAndSetHead(new Node()))
// 设置尾节点为head
tail = head;
} else {
// 在队列后面添加节点,完成双向链表添加节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
Node(Thread thread, Node mode) { // Used by addWaiter
// Node = null
this.nextWaiter = mode;
// thread = Thread.currentThread();
this.thread = thread;
}
addWaiter方法中主要作用是在当前AQS维护的队列尾部,添加一个节点,如果队列为null,则需要创建一个冗余节点head,否则在队列尾部加入封装了新创建线程的Node,完成双向链表的节点添加。
4.acquireQueued
执行完该操作,返回新创建的Node节点,调用acquireQueued(addWaiter(Node.EXCLUSIVE), 1)方法,该方法细节如下:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取node的前继节点
final Node p = node.predecessor();
// 是头节点的话,也就是node为第二个节点,尝试获取锁
if (p == head && tryAcquire(arg)) {
// 获取锁成功,则更新头节点为当前节点,消除冗余节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// node不是头节点或者获取锁失败,执行判断是否需要阻塞等待
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
本方法中,判断了当前节点的前一个节点是否是head节点,如果是的话,则node节点是第二个节点,可以获取锁,获取锁成功,则设置头节点为node,消除了原来初始化产生的冗余节点,并且返回中断状态为false。
5.shouldParkAfterFailedAcquire
否则进行判断是否需要阻塞等待,如果需要阻塞等待,则执行parkAndCheckInterrupt()。
/**
* 判断未获取到锁的节点是否需要阻塞等待
* @param pred 前一个节点
* @param node 当前节点
**/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 移除所有前继节点,直到pred.waitStatus不为>0
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
节点的状态再AQS中有四种,在AQS构造新节点时,使用的是默认值0,方法进入else中,设置pred节点的status为Node.SIGNAL。在AQS#acquireQueued()中,如果还是没有成功获取锁,则第二次进入本方法,则进入第一个if判断。上一轮循环中,已经将pred.waitStatus设置为`Node.SIGNAL = -1,表示应该阻塞。
ws > 0的情况是当pred所维护的获取请求被取消时,pred.waitStatus会被设置为CANCELLED = 1,此时,移除所有pred.waitStatus > 0的前继节点,并且直接返回false。这里不需要判断pred是否为null,因为队列中前面的节点可能是冗余节点,waitStatus = 0;或者是已经加入的节点,waitStatus = 0;或者是已经阻塞的节点,waitStatus = SIGNAL=-1,在遍历到头节点时,一定会跳出循环。
6.parkAndCheckInterrupt
回到AQS#acquireQueued(),判断前继节点是否为头节点,进行处理,并执行parkAndCheckInterrupt()。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
该方法中使用LockSupport的park方法阻塞线程,进入等待状态。并使用Thread.interrupted()判断当前线程是否被中断,并清除中断标记。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
如果该线程被中断,则标记中断标记为true;
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
回到acquire中,如果acquireQueued也返回true则会执行selfInterrupt。
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
设置当前线程中断状态。
最后的队列状态如下:
(1) 除了头节点非阻塞,其余节点全部为阻塞状态
(2) 除了尾节点,其余节点都满足waitStatus==SIGNAL,表示释放后需要唤醒后继节点
7.释放锁
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
释放锁的代码如上,先调用tryRelease()尝试释放锁。成功则获取头节点,如果头节点不为空并且状态不为0,则调用unparkSuccessor(h)。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
* status为负,则把预期的SIGNAL状态设置为0
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
* 当前node是释放锁的node,则在队列中后面的节点,一般是需要解除阻塞的节点
* 但是这些节点也可能是CANCELED状态或者为空,
* 所以需要从尾部开始遍历,找到距离node节点最近,并且waitStatus<= 0的节点
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
8.为什么要从尾部遍历?
为什么不能从尾节点向前遍历?如上注释中可以看出,队列中的节点会出现CANCELLED状态或者null状态
(1) null状态的产生
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
在enq时,如果走到下面的else中,cas操作可能失败,这时,t.next = node这句代码不会被执行,旧的尾节点没有指向新的节点。
队列变成如下状态:
在上图中,可能需要释放的节点为node2,如果向后遍历,该节点的next为null,将找不到节点,但是实际tail节点是可以唤醒的。
(2) CANCELED状态
中间节点可能被状态改变成了CENCELED
node1的状态为CANCELED,这种状态从tail往前遍历也是能够找到最近的state<=0,并且不为node本身,这样的节点的。
根据上述(1),(2)两种状态,需要从后开始遍历。这步操作主要还是为了高效地找到node节点后面,离node节点最近的可以唤醒的节点。
9.整体调试
使用如下代码进行debug:
public class AQS {
private static Lock lock = new ReentrantLock();
public static void main(String[] args) {
for (int i = 0; i < 10; i++) {
new Thread(new ReentrantLockTask(), "thread" + i).start();
}
}
public static class ReentrantLockTask implements Runnable{
@Override
public void run() {
System.out.println("Thread " + Thread.currentThread().getName() + " started. ");
try {
lock.lock();
System.out.println("Thread " + Thread.currentThread().getName() + "get lock.------");
} finally {
lock.unlock();
}
}
}
}
总的来说,流程如下:
假设thread0首先获得锁
1.thread0获取锁,执行任务中
2.thread1调用NonfairSync#lock()尝试compareAndSetState(0,1)失败,调用NonfairSync#acquire(1)
3.tryAcquire(1)失败,调用acquireQueued(addWaiter(Node.EXCLUSIVE), 1),先调用addWaiter(Node.EXCLUSIVE),将添加Node封装本线程,到AQS的队列中
4.执行acquireQueued(node, 1),轮询判断前一个节点是否头节点,尝试获取锁
成功:设置头节点为当前节点
失败:shouldParkAfterFailedAcquire(p, node)判断是否需要阻塞线程,内部通过状态判断,waitStatus为-1返回true,执行parkAndCheckInterrupt(),阻塞当前线程LockSupport.park(this)。
代码wait在wait标记注释。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
// wait处
return Thread.interrupted();
}
5.thread1执行完成,则执行unlock(),tryRelease(1)释放锁成功,获取队列头节点h,调用unparkSuccessor(h)设置状态为0默认状态。调用LockSupport.unpark(s.thread)解除队列头部状态为可以唤醒的线程的阻塞。
6.阻塞的线程从wait的代码行往下执行,判断Thread.interrupted()是否为true,回到acquireQueued,继续获取锁。设置头节点为本节点,依次执行下面代码,完成同步和锁的获取。
最后,附上我参考过的大佬文章,写的比较深入: