ReentrantLock 实现了隐式锁synchronized基本的行为和语义,并且提供了额外的一些能力。
一个ReentrantLock被最后一次成功获取到锁的线程持有,直到线程释放他。
可重入的意思是一个线程如果成功调用lock()方法并且返回,
成功的获取了锁,当前线程未释放锁时,如果当前线程再次获取锁将会立即返回。
在调用线程内部可以使用isHeldByCurrentThread检查是是否被当前线程持有。
通过getHoldCount查看当前线程持有锁有多少个状态,即重入次数。
ReentrantLock有公平和非公平两种,即独占模式也分为两种。ReentrantLock内部类中分别都对其进行了实现。
默认构造函数是非公平模式,也可通过传入可选的参数指定公平实现。公平模式下锁更偏向将执行权授予最长-等待线程。
非公平模式下不保证任何访问顺序。
公平模式下的锁具有更低的吞吐量(通常来说就是很慢)比非公平锁,但是这样保证了用较少时间上的差异来获得锁以及保护线程不会饥饿。
注意:公平锁不保证线程调度的公平性,因此,多个线程使用同一把锁,其中一个线程可能获取成功了多次而其他活动线程还没什么进展也也没持有锁。
这句话的意思是尽管当前线程多次获取锁经历了排队等待这些过程,但是依然完成了任务并且成功获取了多次锁,说明被线程调度到了,而其他活动的线程的任务没有进展,尽管他们不需要锁,这是由于线程调度导致的。
同样注意没有时间参数的tryLock()方法也不满足公平性。尽管有其他线程在等待,如果他获取的时候这个锁是空闲的,仍然会获取成功。因为tryLock直接调用的是nonfairTryAcquire。
ReentrantLock有三个内部类:
Sync:锁的基类实现继承AQS,子类实现是下面的公平锁FairSync和锁NonfairSync
FairSync:公平锁实现
NonfairSync:非公平锁实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
/**非公平的tryLock。tryAcquire被子类实现,但是tryLock和tryAcquire都需要nonfair所以放在基类中,这样即便指定模式是公平锁,其tryLock也是非公平的
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
//获取AQS状态,getState 具有volatile读语义
int c = getState();
//c=0 表示锁是空闲的
if (c == 0) {
//原子性的修改状态为acquires(共享资源),acquires根据实现不同而表现不同,在ReentrantLock 0表示空闲大于1表示已经被获取和重入的次数,在semaphore表现为共享资源的数量。
if (compareAndSetState(0, acquires)) {
//成功设置排他拥有线程为当前线程
setExclusiveOwnerThread(current);
//成功直接返回表示直接获取到锁
return true;
}
}
//已经获取到锁则增加重入次数并立即返回:这就是被称为重入锁的原因
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
、**首先进行lock 。失败则调用AQS的acquire,acquire会调用NonfairSync#tryAcquire最后调用nonfairTryAcquire
* 可以发现非公平锁首先进行了两次的lock,如果成功则直接返回。不会经过入队,park,unpark线程上下文切换,这是是非公平锁快的原因。
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
/**
* Sync object for fair locks
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
//直接调用父类的acquire,acquire首先调用FairSync#tryAcquire
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//与非公平锁不同的是公平锁首先会判断是否有线程已经在排队等待,没有才会直接尝试修改状态。
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//AbstractQueuedSynchronizer#hasQueuedPredecessors
/**
查询是否已经有线程在排队,并且等待时间比当前线程长
调用这个方法等价于 (但是这个方法可能更有效率)getFirstQueuedThread() != Thread.currentThread() &&hasQueuedThreads()
注意:因为线程中断和超时可能发生在任何时候,返回true,即队列中有线程,但是其他线程也可能在当前线程之前进行aquire操作
返回false,即队列为空,也有可能其他线程在这个线程之前进行了入队操作
这个方法被设计成公平锁去避免冲突。一个同步器的方法应该返回false,tryAcquireShared应该返回一个负数,如果这个方法返回true(除非他是一个重入的acquire操作)。
例如tryAcquire方法在一个公平的可重入的独占锁应该像下面这种形式(和ReentrantLock#FairSync#tryAcquire语义一抹一样):
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
* 返回 true:如果当前线程前有排队的线程
false:如果当前线程时队列的头部或者队列是空的
**/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//1、h !=t 只有当队列尚未被初始化,AQS被初始化,head和tail都没被赋值时返回false,即非公平锁第一次进入tryAquire,hasQueuedPredecessors返回false,接着tryAquire调用compareAndSetState成功。
//第二个线程进入 同样hasQueuedPredecessors返回false,compareAndSetState失败,调用addWaiter,初始化head和tail,head和tail都不相等。
/**
2、当头结点和尾节点不相等并且 如果head节点的next节点s不为空,并且s不是当前线程。代表头节点执行完后s节点有可能被执行。当前线程无法参与竞争,所以返回true。这种情况发生在当前节点是第一次入队的情况。
3、当头结点和尾节点不相等并且 如果head节点的next节点s不为空,并且s是当前线程。代表头节点执行完后s节点(当前线程)有可能被执行所以返回fasle。(代表上一层可以去竞争独占状态)。这种情况发生在当前节点已经入队的情况下。
4、当头结点和尾节点不相等并且 如果head节点的next节点s为空,那么直接返回true,代表头节点后面没有节点。即当前线程不是头节点的next节点。返回true。不直接参与竞争,这种情况发生在(有可能头节点后面的节点被取消,或者新入队的节点next尚未赋值)
**/
return h != t && ((s = h.next) == null || s.thread != Thread.currentThread());
}
总结下这个方法:以上四点保证了如果有其他的线程在头节点的next上等待并且head节点的next节点不是当前节点时,那么这个线程不能直接参与竞争,只能加入同步队列,否则如果当前head节点的next节点指向自己,将直接执行compareAndset设置状态。
这样保证了除第一个线程外其他线程都必须经过排队等待head的next指向自己,所以要实现公平锁的首要条件就是使用hasQueuedPredecessors 作为判断条件之一。
总结下:
1、公平锁和非公平锁最大的差异在tryAcquire是否会检查队列中是否有线程在等待
2、非平锁首先会尝试修改2次,进行抢占而不管是否已经有线程在排队。
3、公平锁的第一个线程将会被直接获取到锁执行,当且仅当第一个线程没有执行完毕,第二个线程才会进入排队。执行addWaiter和acquireQueued
先顺着acquire方法分析下去,然后再逐个分析ReentrantLock的方法。
``` /**
只能在独占模式下使用acquire方法(因为addWaiter传入的mode是exclusive,忽略中断)。并且至少调用一次tryAcquire。
调用acquire成功的话会立即返回否则,线程会被入队,
可能重复的阻塞和解阻塞,调用tryAcquire直到成功。(第一次进来无法获取锁,阻塞。后面有可能其他节点线程获取锁发生异常,cancelAquire会调用unparkSuccessor,
当前节点有可能被唤醒,判断head不是自己前驱,进行取消节点裁剪后,则再次阻塞,直到head是自己的前驱并且调用release方法重置状态,唤醒自己,线程再次被唤醒)
这个方法可以用来实现Lock接口的lock语义
**/
public final void acquire(int arg) {
//tryAcquire失败后添加到队列末尾并且排队等待,如果tryAcquire成功立即返回,表示成功获取到锁。
//tryAcquire失败:1.head的next没有指向当前线程,hasQueuedPredecessors返回false 。
//2.如果tryAcquire返回true,acquire立即返回 2.无线程排队,hasQueuedPredecessors返回false。
//前一个线程未释放状态,compareAndSetState失败,tryAcquire返回false,这才开始第一次入队。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
总结:tryAcquire成功将立即返回获取到锁,失败则进行排队。下面说下tryAcquire成功失败的几种情况:
tryAcquire成功:
1、锁刚刚新建完成初始化,第一个线程来请求锁,毫无疑问 hasQueuedPredecessors中 head=null,tail=null ,hasQueuedPredecessors中h != t返回false,tryAcquire调用compareAndSet成功获取到锁。
2、档head节点的next指向一个节点,这个节点hasQueuedPredecessors返回false,因为自己就是等得最久的 ,于是不停的调用tryAcquire,获取到锁。
tryAcquire失败:
1、新线程加入:判断hasQueuedPredecessors为true,加入等待队列进行等待。
2、等待队列中的线程虽然head的next节点指向自己单身compareAndset设置失败。
//添加新的节点到等待队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//快速尝试入队,否则使用自旋入队
//将tail复制给pred
Node pred = tail;
if (pred != null) {
node.prev = pred;
//设置尾节点为node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);//确保队列为空的情况下
return node;
}
//自旋入队操作,
private Node enq(final Node node) {
for (;;) {
//第一个节点首次入队首先执行,初始化空节点
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//入队
node.prev = t;//首先保证将前驱节点赋值给prev,保证前驱稳定。所以在遍历next=null情况下使用prev向前遍历是安全可靠的。
if (compareAndSetTail(t, node)) {
t.next = node;//这一步操作是,非线程安全 可能节点已经在队列上了 t.next仍然为null,对其他线程不可见
return t;
}
}
}
}
总结下:enq在第二个线程入队时才延迟初始化head节点和当且线程节点。后续入队操作直接执行else后面的操作,并且t.next = node 不能保证已经入队的节点next不为null,这也是hasQueuedPredecessors判断next==null的原因之一。
/**
* Convenience method to interrupt current thread.设置中断状态
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
在独占且不可中断模式下已经排队的节点acquire方法。
同样也被用于条件等待方法await
**/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//判断当前节点的前驱是否是头节点,是才进行tryAcquire调用子类的方法去判断并修改状态,且成功后设置队列Head为当节点并且将head节点出队。
//当且仅当当前线程被唤醒后,重新循环,判断p==head==true,再进行tryAquire
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;//标记失败状态
return interrupted;//acquire 中根据返回的中断状态设置值当前线程的中断。
}
//设置signal状态,在下次自旋中判断返回true,调用parkAndCheckInterrupt设置中断状态,并park。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//执行到这里表明已经被前驱节点唤醒,并继续自旋获取锁
//如果被中断设置中断状态,但是并不处理
interrupted = true;
}
} finally {
//
if (failed)
cancelAcquire(node);
}
}
/**
检查和更新acquire失败的节点的状态,如果一个节点应该阻塞则返回true。
主要控制acquireQueued的死循环。
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//已经设置了前驱节点的,SIGNAL 直接返回true,park这个线程
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
* 前驱节点已经被设置为取消状态,需要修改prev节点,这里 有几个问题
* 1、为什么是在这里修改? 因为在线程即将进入park的时候需要保证自己能被唤醒,通常是谁唤醒自己呢?
前驱节点,但是不总是前驱节点。所以需要保证前驱节点是活着的。这里仅仅是尝试,就算重新设置了前驱节点,
设置后的前驱节点也会发生异常而取消,所以在唤醒的时候还会有一次查找判断
* 2、为什么在取消的时候修改了一次prev,next属性这里还要设置一次?
next属性已经由线程取消的时候被设置了,发生在这个步骤前。cancelAcquire方法可以同时被多个方法调用发送竞争,所以要使用CAS修改,但是不保证一定成功。所以在每个线程之中需要再进行保证。
* 3、修改其他节点的prev,next属性是线程安全的吗?
是的。进入这个方法时判断的是node节点前驱节点的状态。假设这个节点的前面也有个node判断前驱节点是取消状态,然后尝试修改。那么这个节点的线程找到的是他的前驱节点的前面的第一个没有被取消的节点。
并不是我们这个线程所找到的第一个未取消的节点,如果本节点后面有线程在检查也一样,最终大家都只负责修改自己的前驱,删除掉的是自己那一块区域,所以并不会发生竞争。
总结:观察cancelAcquire和unparkSuccessor最终完成将取消节点出队的都是这部分操作
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/* 获取独占锁的线程在进来的时候waitStatus都是0。表示这个线程需要设置信号,
然后在下一次自旋中被park。下次自旋中仍然会查看是否自己是head节点是的话就尝试修改状态。这样尽最大努力去尝试然后再park
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//park 并且返回当前线程是否被中断
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
/**
* Cancels an ongoing attempt to acquire.
* 取消正在进行的acquire,cancelAcquire总是在finally语句块中被调用
* 发生异常进行取消是基本算法的健壮性目标之一。cancelAcquire 首先应满足,取消当前节点后应能不影响其他线程的执行,也就是应该唤醒下一个线程。
*并且应该重新设置AQS队列,将被取消的节点进行清理,以提高队列的性能,因为在唤醒一个线程时也会首先判断队列的状态并删除取消的节点。
*但是这一操作不应该由释放锁的线程来做因为他的事情已经做完了,清理队列应该由发生异常的线程进行清理。所以cancelAcquire总是在finally语句块中。
*
* @param node the node
*/
private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
//获取到当节点前第一个状态没有设置为取消的节点,并且跳过取消的节点
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
//predNext 即为第一个找到的没有设置为取消状态的节点的下一个节点。
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);//完成一部分出队,仍保留prev引用
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);//完成一部分出队,仍保留prev引用
} else {
//
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
1.当前节点是tail节点时设置tail为pred,自己未获取到锁,不需要唤醒next节点。
注意:当前节点不可能是head节点
2、pred是head节点时,node节点故障了直接唤醒他后面的线程 unparkSuccessor(node);unpark之后进入aquiredQueued判断当前前驱是head(已经设置node.prev=pred)成功。CAS的时候分为两种情况:
2.1 CAS成功:head恰好释放锁调用release方法,当然head线程也在unparkSuccessor观察到head.next.waitStatus>0 ,
并且整个替换了,而不是只替换next属性。然后这个节点就变成了head节点,其实这就是正常的release流程。
这里只是说明不管CAS成功还是失败都不会影响到删除取消的节点,都是做的一样的操作对结果本身没有影响。
在head线程删除取消状态的节点的时候,也只删除离自己最近的节点,直到找到waitstatus<0 的节点。
2.2 CAS失败,因为,于是进入shouldParkAfterFailedAcquire,由正常节点的线程去设置新的prev属性和新前驱节点的后next属性。因为那样做是线程安全的(查看shouldParkAfterFailedAcquire)。
3、pred不是首节点,当前节点也不是tail。在pred的status为SIGNAL并且pred的thread不为空(判断pred是否已经取消),尝试设置 pred的next指向 node的next属性。
这里都是使用的CAS不保证成功设置。并且就算compareAndSetNext(pred, predNext, next);成功,也只是完成了删除节点的一部分操作,prev属性并没有没设置。
这里只是尽自己最大努力做一点事情,真正的将已经取消节点(或者说完成一半出队)出队发生在,unparkSuccessor的时候,head节点会检查next节点是否已经取消,最后将离自己最近的一些取消节点出队。
4、为什么这里只设置next属性,而不修改prev属性呢?为什么会在这里设置next属性呢?
我认为有以下原因:
1、prev被设计为可靠的属性,在unparkSuccessor,shouldParkAfterFailedAcquire,cancelAcquire都在被遍历,如果此时对prev进行修改可能导致竞争,唤醒线程、其他线程的取消都可能出问题。
而在shouldParkAfterFailedAcquire已经提到修改prev和next是按段修改的。每段都由一个线程负责,不会引起遍历修改这样的竞争情况发生。假设shouldParkAfterFailedAcquire
在修改的时候另外一个node正在unparkSuccessor,并且这个node的next已经被取消,那么最终被unpark的线程也会执行shouldParkAfterFailedAcquire,最终修改prev和next来释放取消的节点。
2、只是尽最大努力修改
总结:cancelAcquire 只是设置取消状态,保证最大努力交付(设置next的值,设置prev整个队列的状态将受到影响)。真正做事的是shouldParkAfterFailedAcquire和unparkSuccessor,因为他两才能保证这么做是安全的。
shouldParkAfterFailedAcquire直接进行整个节点的赋值,将取消节点踢出去,cancelAcquire只是设置next的值,或者交给shouldParkAfterFailedAcquire,shouldParkAfterFailedAcquire同时将prev和next都修改了。
作用域:cancelAcquire作用于发生异常的node,shouldParkAfterFailedAcquire入队的等待的线程,unparkSuccessor 作用于唤醒当前节点的下个节点的时候。
所以: 应该是是三个步骤 1.设置状态(cancelAcquire) 2.shouldParkAfterFailedAcquire(park前检查) 3.unparkSuccessor(unpark前检查)。三步来减少取消节点对队列性能的影响。
``` //ReentrantLock#unlock
public void unlock() {
sync.release(1);
}
//AQS#release,独占锁释放。
public final boolean release(int arg) {
if (tryRelease(arg)) {//首先由子类释放状态和OwnerThread
Node h = head;//AQS 更关注的是队列
/**
只有当head不为空的情况下并且head的状态不为0才唤醒继任者线程。
1、h为空表明只有一个一个线程使用过锁并且没有发生任何竞争,
队列的head和tail都没初始化,直接返回true,释放成功。
1.1当这种情况发生时有第二个线程进行争抢锁,hasQueuedPredecessors为true,compareAndSetState为false,调用addWaiter,进入acquireQueued,
判断p == head ==true,并且再次tryAcquire进行compareAndSetState失败,然后被park,最后第一个线程释放状态,也会调用unparkSuccessor,但是当前head节点是一个空节点并没有Thread在其中。即使是这样
unparkSuccessor唤醒的是head的next节点,而释放锁的线程不是head,thread为null的线程才是当前head,所以第二个线程依然可以被唤醒并成为head。这里说明了当前执行的线程不总是head线程,
唤醒下一个节点的线程也不总是head节点的线程。当前释放锁的线程可能不是head节点,也没有进入队列中。
2、h不为空,h的waitStatus =0 意味着没有后续节点需要被唤醒否则waitStatus=Node.SINGAL,表示有继任节点,当前的节点的waitStatus=Node.SINGAL只能由后续节点设置
**/
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
//false 表示释放锁失败,可能当前释放锁的线程和获取锁的线程不是同一个,当然tryRelease 可选抛出还是不抛出异常。这里返回false只是稳妥的方式。
//以便其他方法在调用时可以使用这个判断标志作为依赖而抛出其他异常或者设置新的标志,例如后面讲到fullRelease的时候会说到
return false;
}
//ReentrantLock#Sync#tryRelease
protected final boolean tryRelease(int releases) {
//state是共享变量为什么这里不用加锁呢?很简单只有获取到锁的线程才能释放,这里其实只当前锁线程才能释放
int c = getState() - releases;
//持有锁的线程是当前线程,否则抛出异常。
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//尝试在unpark前清除node当前的等待状态(由Node.SIGNAL 变为0),失败和被等待线程修改都没有关系,这个操作不重要。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
//下一个节点为null或者被取消则,从tail节点向前,进行扫描直到找到waitStatus<=0 (最后一个,离head最近的那一个)即需要被唤醒,
//同时将找到的节点赋值给node.next 实际上对其中状态>0被取消的节点进行了裁剪,但是只设置了node的next属性并未设置prev属性。
//只有一个地方是用来彻底释放取消节点的。即在shouldParkAfterFailedAcquire中。
//这里首先进行next属性的尝试,但是因为next节点不总是可靠,有可能节点被取消或者next属性值还没被赋值。所以使用next属性实际上是一种优化
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到next节点后唤醒他
if (s != null)
LockSupport.unpark(s.thread);
}
好了公平锁完完整的流程就走完了。下面继续分析ReentrantLock其他的方法。
获取锁除非当前线程被中断
如果当前锁没有被其他线程持有,则立即返回并且设置hold count 为1
如果当前线程持有这个锁,hold count加1 并且立即返回
如果当前锁被其他线程持有,当前线程将变得不会被线程调度,假死直到下面两种情况之一发生:
1.当前线程主动获取锁(被前驱节点唤醒)
2.其他线程调用 Thread.interrupt 中断这个线程(在线程获取锁的时候被中断)
主动获取锁成功和lock方法效果是一样的,如果线程在等待的过程中被中断将抛出 InterruptedException,并且当前线程的中断状态被重新设置
此方法具有优先检测中断的能力
**/
//#ReentrantLock#lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
独占式的获取锁,中断的时候终止。
通过首先检查中断状态,然后调用至少一次的tryAquire,在成功的时候返回。否则这个线程被入队,可能重复的阻塞或者解阻塞,(第一次进来无法获取锁,阻塞。后面有可能其他节点线程获取锁发生异常,cancelAquire会调用unparkSuccessor,
当前节点有可能被唤醒,判断head不是自己前驱,进行取消节点裁剪后,则再次阻塞,直到head是自己的前驱并且调用release方法重置状态,唤醒自己,线程再次被唤醒)
然后调用tryAcquire, 直到成功获取到锁或者这个线程被中断。
**/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//首先判断中断状态
if (Thread.interrupted())
throw new InterruptedException();
//tryAquire不再分析,公平锁和非公平锁的区别在于是调用hasQueuedPredecessors,此处是公平锁
//tryAquire 判断队列是否有节点,CAS设置状态等不再前面已经说过。
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
独占获取并且响应中断
和acquireQueued 对比
acquireQueued 有返回值 参数为Node 忽略中断 入队操作在外层aquire中(方便被重用)
doAcquireInterruptibly 无返回值 参数为aquire的数量 显示抛出InterruptedException 入队操作在自己方法体中
**/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//注意:被唤醒之后如果检测中断状态被设置,才接着抛出异常
throw new InterruptedException();
}
} finally {
if (failed) //抛出InterruptedException 后 failed=true,执行取消流程。
cancelAcquire(node);
}
}
**总结:doAcquireInterruptibly尝试获取锁时会检测中断状态,抛出异常。
在unpark之后会检测中断状态抛出异常。这都是在获取锁的过程中最大努力的处理中断,
但还是有延迟,线程被park住了,即使有中断也无法抛出。acquireQueued也可以完全设置的和doAcquireInterruptibly参数
返回值都一样,acquireQueued只是为了方便重用。同样都用try finally 取消线程(或者说设置线程的取消状态)。其他流程都和acquireQueued一样。**
/**
请求这个锁并立即返回。如果当前锁未被其他线程持有返回true,否则false。
注意:即使构造ReentrantLock时设置公平状态为公平锁,tryLcok仍然是调用的非公平的。
这个行为在某些情况下非常用于,即使你使用了公平锁,仍然可以不判断队列状态,尝试获取锁,只要当前锁CAS尝试修改状态成功。
**/
//#ReentrantLock#tryLock
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
/**
获取锁如果在给定的等待时间内,并且当前线程没有被中断
如果ReentrantLock已经被设置为公平模式,那么这个方法必须遵守排队策略。这个是和tryLock方法相反的。
可以像下面这样使用组合非公平和公平锁同时利用两种锁的优点:
if(lock.tryLock() || lock.tryLock(timeout,unit)){
}
和不带参数的tryLock比较:
tryLock(): 非公平锁 立即返回 可检测中断
tryLock(timeout,unit):公平 (超时)等待阻塞 可检测中断
**/
//#ReentrantLock#tryLock(long timeout, TimeUnit unit)
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
/**
排他模式下使用,可响应中断,超时失败返回false。
也是首先检查中断状态,然后调用至少一次的tryAquire,成功就直接返回。否则
将排队等待,可能重复的park或者unpark。知道最后调用tryAquire成功,或者中断、超时。
第一个参数值是 :aquire的数量 第二个参数值是:等待的纳秒
返回true:tryAquire成功,false:超时
**/
//#AQS#tryAcquireNanos
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
//首先检测中断状态
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
/**
排他模式下超时方法使用
**/
//#AQS#doAcquireNanos
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)//时间<=0 直接返回false,即失败
return false;
final long deadline = System.nanoTime() + nanosTimeout;//等待截止时间
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) //等待时间到了直接返回false
return false;
//可以这么理解截止时间减去第一次尝试tryAquire的时间=剩下的时间<自旋阀值的话将一直自旋,否则剩下的时间将park住。
//然后自己醒来,再判断中断状态,再进行尝试获取锁,最后重新判断剩余时间<=0 返回false.
//返回false之后因为failed=true,执行cancelAcquire,取消节点。
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed) //这里的cancelAquire需要处理两种情况的异常: 1.返回false,即超时,可认为超时也是一种异常。2.中断 cancelAquire流程见前面分析:cancelAcquire
cancelAcquire(node);
}
}
######总结:doAcquireNanos 通过spinForTimeoutThreshold,自己进入等待,防止传入等待时间过长,自旋时间太长,过多浪费CPU资源,限制了最长自旋时间不超过1000纳秒。LockSupport.parkNanos(this, nanosTimeout);则让线程在随后的时间睡眠,直到给定的时间,自动唤醒自己。当然也可以被头节点唤醒。无论被谁唤醒线程被unblock后.首先检查下中断状态,然后判断下是否是head节点,截止获取下锁。如果不成功则,就超时。所以即便线程超时他还是有最后一次机会去证明自己。
/**
查询是否有任何线程在排队等待获取锁。
注意:因为取消可能发生在任何时候,返回true并不保证任何其他线程将一定获取到这个锁。
这个方法主要被设计用来监控系统的状态。
**/
//#ReentrantLock#hasQueuedThreads
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
/**
只是判断头结点不等于尾节点。
**/
//#AQS#hasQueuedThreads
public final boolean hasQueuedThreads() {
return head != tail;
}
/**
给定一个线程判断他是否在队列中等待获取锁。
注意:取消可能发生在任何时候,返回true,不保证这个线程将一定获取到这个锁。
这个方法主要被设计用来监控系统的状态。
**/
//#ReentrantLock#hasQueuedThread
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
/**
从后向前遍历找到这个线程,找到返回true,未找到返回false。
**/
//AQS#isQueued
public final boolean isQueued(Thread thread) {
if (thread == null)
throw new NullPointerException();
for (Node p = tail; p != null; p = p.prev)
if (p.thread == thread)
return true;
return false;
}
#### AQS和ReentrantLock (公平锁和非公平锁部分)的总结:
AQS做的事情是管理队列,然后提供一些监控方法,子类只需要实现 protected 标记未exclusive的方法,其中有关独占模式的AQS中最重要的管理队列的方法:
- 实现AQS核心算法排队自旋有关:
acquireQueued:提供最基本的锁获取方式。
doAcquireInterruptibly:在acquireQueued中增加可响应中断
doAcquireNanos:在doAcquireInterruptibly的基础上增加可设置超时
acquire**类方法一般形如如下模板:
try{
for(;;;){
//判断是否头节点&&tryAcquire成功{设置头节点为当前节点,前头节点出队,最后返回成功}
//判断是否需要park
//park
//超时处理
//中断处理
}
}finally{
if(failed)
cannelAcquire;
}
shouldParkAfterFailedAcquire:节点状态是实现前驱节点唤醒后驱节点设置状态为Node.SIGNAL的唯一地方其他方法中也可以设置如cancelAcquire,但是不保证成功。
unparkSuccessor:根据shouldParkAfterFailedAcquire设置的waitStatus来唤醒后驱节点
release:释放锁(修改状态),可能调用unparkSuccessor。
- 管理队列性能、优化相关:
cancelAcquire:取消无效节点,设置状态为Node.CANCELLED
shouldParkAfterFailedAcquire:也会清除被取消的节点
unparkSuccessor:也会清除被取消的节点
-原始队列原子操作相关:
enq、addWaiter
- 监控相关:
hasQueuedThreads、hasQueuedThread
子类只需要实现:
tryAcquire():通过setState、getState设置修改状态
release():子类可能不叫releas叫 unlock,同样也是设置状态,
- 公平非公平锁实现相关:
hasQueuedPredecessors:获取队列中是否有比自己排队更久的节点。