一、简介
上一篇文章讲到,ReentrantLock方法的实现全部是依靠Sync的方法。而Sync又是继承了AQS,所以需要重点分析AQS。
AQS的设计是采用模板方法模式的。即如果要使用AQS,就需要继承AQS并重写AQS里指定的方法,以下方法可以按照需要被重写:
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire(int arg)
, 独占式获取同步状态,实现该方法需要查询当前状态并判定同步状态是否符合预期,然后再进行CAS设置同步状态。
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
tryRelease(int arg)
, 独占式释放同步状态。
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
tryAcquireShared(int arg)
, 共享式获取同步状态,返回大于等于0的值,表示获取成功,反之获取失败。
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
tryReleaseShared(int arg)
, 共享式释放同步状态。
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
isHeldExclusively()
, 当前AQS是否在独占模式下被线程占用,一般该方法表示是否被当前线程独占。
而重写这些方法的时候需要操作AQS中的state
变量。该变量是一个int型的volatile变量。具有volatile读和volatile写的内存语义。
AQS提供了以下方法来操作 state
:
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
//原子性设置state的值
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update)
}
二、同步队列及Node节点
通常情况下,当线程获取同步状态(调用lock()
)失败时,会被挂起(为什么是通常情况下,因为还有tryLock()
方法,该方法获取锁失败时会返回false)。那么AQS是怎么维持这些被阻塞线程的信息的呢?答案是,同步队列,这是一个FIFO双向队列。AQS会将当前线程以及等待状态等信息构造成一个Node节点并加入到同步队列。当同步状态(锁)被释放时,会将阻塞队列的首节点的线程唤醒,使其可以再次尝试获取同步状态。
Node节点是构成同步队列的基础,AQS拥有首节点和尾节点,没有成功获取同步状态的线程将会被构造成一个节点加入到队列的尾部。
看一下Node的定义,它是AQS的静态内部类:
static final class Node {
/**一个标记,指明节点在共享模式下等待同步状态 */
static final Node SHARED = new Node();
/** 一个标记,指明节点在独占模式下等待同步状态*/
static final Node EXCLUSIVE = null;
//以上两个值是 nextWaiter 域的取值
//以下四个值是waitStatus的可能取值。
/** 在同步队列中等待的线程由于超时或被中断,将waitStatus标记为CANCELLED,意思是取消等待同步状态 */
static final int CANCELLED = 1;
/**表明当前节点如果释放了状态或者被取消,则唤醒后继节点中的线程,使得后继节点的线程得以运行 */
static final int SIGNAL = -1;
/** 表明节点在等待一个Condition的signal,该节点处于等待队列中, 当其他线程调用signal()方法时,
*该节点将会从等待队列中转移到同步队列,加入到对同步状态的获取中*/
static final int CONDITION = -2;
/**
*表示下一次共享式同步状态的获取将会无条件的传播下去。
*表明一个共享锁的释放将会传播到其他节点,而不是仅仅后继节点。
*这个状态仅仅在 doReleaseShared()中对头节点进行设置。
*/
static final int PROPAGATE = -3;
// 取值为上面的1、-1、-2、-3,或者0(以后会讲到)
// 这么理解,只需要知道如果这个值大于 0,代表此线程取消了等待,
// 也许就是说半天抢不到锁,不抢了,ReentrantLock是可以指定timeouot的。。。
volatile int waitStatus;
//前一个节点
volatile Node prev;
//后一个节点
volatile Node next;
//线程
volatile Thread thread;
//指向下一个正在等待Condition变量的节点,或者取值为SHARED、EXCLUSIVE
Node nextWaiter;
}
了解了Node节点之后,我们来看看AQS中都有哪些重要的属性:
// 头结点,可以把它当做当前持有锁的线程,注意:阻塞队列并不包含head节点
private transient volatile Node head;
// 尾节点,每个新的节点进来,都插入到最后
private transient volatile Node tail;
// 代表当前锁的状态,0代表没有被占用,大于0代表有线程持有当前锁
// 之所以说大于0,而不是等于1,是因为重入锁,每次重入都加上1
private volatile int state;
// 代表当前持有独占锁的线程,举个最重要的使用例子,因为锁可以重入
// reentrantLock.lock()可以嵌套调用多次,所以每次用这个来判断当前线程是否已经拥有了锁
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //继承自AbstractOwnableSynchronizer
三、锁的获取
直接看代码:
public void lock(){
sync.lock();
}
其中sync
既可以是FairSync,也可以是NonfairSync。区别主要体现在:
//NonFairSync
final void lock() {
//不考虑阻塞队列中有没有其他线程在等待锁的释放,直接进行加锁,成功后则设置本线程为锁的独占线程。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); //AQS中的实现
}
//FairSync
final void lock() {
acquire(1); //AQS中的实现
}
两者都调用了 AQS 的 void acquire(int arg)
方法:
public final void acquire(int arg) { //arg = 1
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
其中tryAcquire(arg)
方法是Sync的子类FairSync
和NonfairSync
重写的方法,正如名字所示,它是尝试进行加锁,成功返回true,失败返回false,此时还没有加入阻塞队列一说。
//FairSync
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//当前没有线程占有锁的情况下
if (c == 0) {
//区别主要在这,即使没有其他线程占有锁,也要判断阻塞队列中是否有其他线程正在等待锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//否则判断是否是当前线程占有了锁,此时是可以重入的
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//NonfairSync
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
//Sync
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//在没有其他线程占有锁的情况下,直接尝试插队(加锁)
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
如果加锁成功,tryAcquire(int arg)
返回true
,就没有后续的步骤了。如果失败,则就要进入阻塞队列了:
public final void acquire(int arg) { //arg = 1
if (!tryAcquire(arg) && //此时tryAcquire返回true
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //执行此步骤
selfInterrupt();
}
看看addWaiter(Node.EXCLUSIVE)
具体做了些什么:
//addWaiter做了两件事:1、封装线程,构造出Node结点;2、入队
private Node addWaiter(Node mode) { //mode = Node.EXCLUSIVE,
//前面说过,Node.EXCLUSIVE和Node.SHARED是两个标记,
//标记Node节点获取锁的模式
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 仔细看看上面的代码,代码执行到这里,
// 说明 pred==null(队列是空的) 或者 CAS失败(有线程在竞争入队)
enq(node);
return node;
}
//采用自旋的方式加入队列,也就是在CAS设置tail过程中,竞争一次竞争不到,我就多次竞争
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
//队列为空时,初始化一个头节点,完了之后,再次进入for循环,执行下面的else语句,
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
再次回到下面这个代码,addWaiter
将由当前线程构成的Node结点加入队列之后,就执行acquireQueued(Node node, int arg)
方法,
public final void acquire(int arg) { //arg = 1
if (!tryAcquire(arg) && //tryAcquire返回true
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) //此时addWaiter已经将node结点加入了队尾
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); //当前结点的前驱结点
//如果前驱结点是头节点,则可以尝试获取锁,
// p==head 说明当前节点虽然进入了阻塞队列,但它是阻塞队列的第一个,因为它的前驱是head
// 注意,阻塞队列不包含head节点,head一般指的是占有锁的线程,head后面的才称为阻塞队列
// 所以当前节点可以去试抢一下锁,那么为什么可以试着抢一下呢
//首先,它是阻塞队列的队首,其次,前面的head有可能是一个空节点,什么意思,
//前面enq方法中说过,当发现队列为空时(tail == null),会compareAndSetHead(new Node()),
//这时会构造一个什么也没有的空节点,没有设置任何线程
// 也就是说,当前的head不属于任何一个线程,所以作为队头,可以去试一试
if (p == head && tryAcquire(arg)) {
//此时获取锁成功,将head设置为当前获取锁的结点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 到这里,说明上面的if分支没有成功,要么当前node本来就不是队头,
// 要么就是tryAcquire(arg)没有抢赢别人,此时按照我们的理解,应该将线程挂起了,因为此时别人在占着锁呢,。
// 按字面意思就是,检查在失败获取锁之后,是否应该将线程挂起,如果可以,那么就挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面看一下shouldParkAfterFailedAcquire(p, node)
:
// 刚刚说过,会到这里就是没有抢到锁呗,这个方法说的是:"当前线程没有抢到锁,是否需要挂起当前线程?"
// 第一个参数是前驱节点,第二个参数才是代表当前线程的节点
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
//前驱结点的waitAtatus是SIGNAL(-1),说明当前线程可以安心挂起了,
//因为前驱结点在释放锁的时候,会signal到后驱结点,将其唤醒
return true;
if (ws > 0) {
//此时说明前驱结点已经取消等待锁(因为超时或者被中断)
// 所以下面这块代码说的是将当前节点的prev指向waitStatus <= 0的节点,
// 简单说,就是为了找个好爹,因为你还得依赖它来唤醒呢,如果前驱节点取消了排队,
// 找前驱节点的前驱节点做爹,往前循环总能找到一个好爹的
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//此时waitStatus = 0、-2、-3,将前置节点的waitStatus设置为SIGNAL
//然后返回false,回到外层的acquireQueued重新进入for循环获取锁
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 返回到前面是这个方法
// if (shouldParkAfterFailedAcquire(p, node) &&
// parkAndCheckInterrupt())
// interrupted = true;
//如果shouldParkAfterFailedAcquire返回true,说明线程应该被挂起
//此时前驱节点的waitStatus==-1,是正常情况,那么当前线程需要被挂起,等待以后被唤醒
//此时再执行挂起线程的操作
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //线程在这里被挂起,等待被唤醒,在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。
return Thread.interrupted(); //此时线程被唤醒,就检查是不是被中断的
}
如果在整个等待过程中被中断过,则返回true,否则返回false。
如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断 selfInterrupt()
,将中断补上。
回头再重点看一下 acquireQueued()
的代码:
final boolean acquireQueued(final Node node, int arg) {
//标记是否成功获取同步状态,成功为false。
boolean failed = true;
try {
boolean interrupted = false;//标记等待过程中是否被中断过
for (;;) { //自旋状态
final Node p = node.predecessor();
//如果前一个节点是头节点,就在自旋过程中再次获取同步状态
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//前一节点不是头节点或者获取状态失败,就检查当前线程是否可以进入
//waiting状态,如果可以,就park当前线程,进入waiting状态,并检查中断。
//在此状态下,有两种途径可以唤醒该线程:1)被unpark();2)被interrupt()。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//如果线程是被中断唤醒的, interrupted = true
//此时线程并不响应中断,而是继续for循环获取锁,获取到锁之后
//再进行中断,具体逻辑看下面
/*
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//此时acquiredQueued返回interrupted为true,进行自我中断
selfInterrupt();
}
*/
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
线程被唤醒的时候,就去检查中断标志位有没有置位,如果置位则返回true,继而 boolean acquireQueued(final Node node, int arg)
返回true,继而在 final void acquire(int arg)
中就执行 selfInterrupt()
,进行自我中断。
四.锁的释放
unlock()
方法调用sync
的release(int arg)
方法
public void unlock() {
sync.release(1);
}
//AQS
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head; //找到头节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); //唤醒等待队列里的下一个线程
return true;
}
return false;
}
//Sync
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false; //是否完全释放锁
if (c == 0) { //锁被完全释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
head结点释放完锁时,此时没有线程占有锁,紧接着去唤醒后面的线程, unparkSuccessor(h)
就是做这样的事:
private void unparkSuccessor(Node node) {
//这里,node一般为当前线程所在的结点。
int ws = node.waitStatus;
if (ws < 0)//置零当前线程所在的结点状态,允许失败。
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;//找到下一个需要唤醒的结点s
if (s == null || s.waitStatus > 0) {//如果为空或已取消
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒
}
线程被唤醒以后,被唤醒的线程继续执行下面的代码:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 刚刚线程被挂起在这里了
return Thread.interrupted();
}
// 又回到这个方法了:acquireQueued(final Node node, int arg),这个时候,node结点的线程去争取锁,成功的话node就变为head了
五.其他
1、可中断的获取锁
线程在等待锁的时候可以被中断,抛出异常
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//AQS
public final void acquireInterruptibly(int arg) throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg)) //尝试获取锁,如果返回false,即获取锁失败,执行下面
doAcquireInterruptibly(arg);
}
//AQS
private void doAcquireInterruptibly(int arg) throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException(); //重点在这里,直接抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
它和不响应中断版本的函数很像,只是在这里,如果线程被中断过,直接抛出异常。
2、尝试获取锁
尝试获取锁,成功返回true,失败立即返回false,不挂起线程。该方法在没有其他线程占有锁的情况下就去获取锁,即使是公平锁,不会考虑阻塞队列中有没有其他线程在等待该锁。代码比较简单。
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//AQS
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
3、带超时的获取锁
在规定的时间内获取锁,成功返回true,超时返回false,可被中断,抛出中断异常,中断标志位清空。如果时间参数小于等于0,立即返回结果,不挂起线程。
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//AQS
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) || //尝试获取锁,失败的话进入下面
doAcquireNanos(arg, nanosTimeout);
}
//AQS
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false; //直接返回失败
final long deadline = System.nanoTime() + nanosTimeout; //未来超时的时候
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime(); //剩余时间
if (nanosTimeout <= 0L)
return false; //如果剩余时间小于等于0,还没有获取到锁,返回失败
if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
//spinForTimeoutThreshold是一个阈值,剩余时间超过该阈值时才会
//去定时挂起线程知道剩余时间流逝完,否则就采取自旋的方式消耗剩余时间
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
至此,ReentrantLock所有加锁解锁相关的方法都已经分析的差不多了,其实可以看出,核心的代码也就那些,弄懂了之后其他的就不难了,主要还是掌握阻塞队列的设计思想。其他一些不常用的方法大家自己去看看就行,不是很难。就到这吧,明天继续,晚安。