ReentrantLock
ReentrantLock的类型
ReentrantLock,即并发下常用的可重入锁,它分为两种锁策略类型:公平锁和非公平锁.
先来看一下ReentrantLock的构造函数:
代码1:ReentrantLock的构造函数
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
由ReentrantLock构造函数可知,其锁策略默认为非公平锁.若想自定义获取锁的策略类型,可以采用带有一个boolean类型参数fair的构造方法来创建ReentrantLock锁对象,fair为true时,获取的是公平锁;为false时,获取的是非公平锁.
ReentrantLock加锁的实现原理
AQS
ReentrantLock中有一个属性sync,通过该属性来持有公平与非公平的锁竞争方式的实现,这两种实现分别是FairSync和NonfairSync.首先来看一下这两个类的继承体系结构:
图1:FairSync和NonfairSync继承体系
可以看到这两个类均是AbstractQueuedSynchronizer的子类实现,AbstractQueuedSynchronizer简称AQS,是JDK1.5提供的一个基于FIFO等待队列实现的一个用于实现同步器的基础框架,JCU包里面几乎所有的有关锁、多线程并发以及线程同步器等重要组件的实现都是基于AQS这个框架.AQS的核心思想是基于volatile int state这样的一个属性同时配合Unsafe工具对其原子性的操作来实现对当前锁的状态进行修改.ReentrantLock内部也使用了AQS去维护锁的获取与释放.
另外,AbstractQueuedSynchronizer继承自AbstractOwnableSynchronizer,AbstractOwnableSynchronizer主要提供一个exclusiveOwnerThread属性,用于记录当前持有该锁的线程.
此处先举个例子来大体描述一下AQS的等待队列大概长什么样子,以便后面锁实现原理的分析.
假设目前有三个线程Thread1、Thread2、Thread3同时去竞争锁,如果结果是Thread1获取了锁,Thread2和Thread3进入了等待队列,那么他们的样子如下:
图2:AQS等待队列示例
该AQS队列为一个双向链表,其中的每个节点node中都一个一个prev属性指向前一节点,next属性指向后一节点.
NonfairSync
以NonfairSync为例,研究下锁的实现原理.ReentrantLock获取锁的入口方法如下:
代码2:ReentrantLock的lock()方法
/**
* Acquires the lock.
*
* <p>Acquires the lock if it is not held by another thread and returns
* immediately, setting the lock hold count to one.
*
* <p>If the current thread already holds the lock then the hold
* count is incremented by one and the method returns immediately.
*
* <p>If the lock is held by another thread then the
* current thread becomes disabled for thread scheduling
* purposes and lies dormant until the lock has been acquired,
* at which time the lock hold count is set to one.
*/
public void lock() {
sync.lock();
}
方法中直接调用sync的lock方法,也就是sync持有的NonfairSync对象的lock()方法,跟进看一下NonfairSync具体实现.
代码3:NonfairSync
/**
* Sync object for non-fair locks
*/
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
上面便是NonfairSync类的全部代码,在lock方法中首先会执行if分支,尝试对state属性置位,通过compareAndSetState()方法来实现,该方法中是通过Unsafe工具以原子操作的方式来执行的,当state的值为0的时候,标识改Lock不被任何线程所占有,尝试将其设置为1,如果设置成功,代表加锁成功,执行setExclusiveOwnerThread()方法,将exclusiveOwnerThread属性的值设置为当前线程,后面的可重入功能,正式通过这个属性判断竞争锁的线程与当前占用锁的线程是否是同一个来实现的.
如果上述if分支的逻辑执行失败,未能立即获取到锁,就会进入到else分支去执行acquire()方法,该方法是实现锁竞争的关键.
代码4: acquire方法
public final void acquire(int arg) {
// 先尝试抢占,第一处体现非公平的地方
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个方法由父类AbstractQueuedSynchronizer中实现,下面依次对该方法下包含的几个方法进行分析.
tryAcquire()方法
具体的tryAcquire()方法,是交由子类实现.子类NonfairSync中则是通过直接调用ReentrantLock中定义的nonfairTryAcquire()方法来实现.
代码5: nonfairTryAcquire方法
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
// 取得当前想要获取锁的线程
final Thread current = Thread.currentThread();
// 取得取得当前AQS队列也就是锁的state
int c = getState();
if (c == 0) {
// 执行到这里表示之前持有锁的线程刚好执行完毕,将锁资源释放了,锁当前未被任何线程持有.
// 这里尝试一次获取锁,如果获取成功,将改线程设为当前锁的持有线程,返回true
// 进行一次抢占,这里是非公平的第二处体现
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果state不为0,表示锁当前正在被某一线程持有,判断持有锁的线程和当前线程是否为同一线程
else if (current == getExclusiveOwnerThread()) {
// 如果持有锁的线程与当前线程为同一线程,增加一次获取锁的次数,即state加1
int nextc = c + acquires;
// 由此可知这里锁对同一个线程的可重入次数最大为int类型的最大值,超过这个值将会抛出异常
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
acquireQueued()方法
addWaiter()方法
acquireQueued()方法中的第一个参数是通过addWaiter()方法获取的,先来看一下这个方法的具体实现
代码6
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
代码7
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
上面两个方法,addWaiter方法负责将想要获取锁失败的线程所在的node串联成AQS队列,并返回当前想要获取锁的线程所在的node;enq方法负责以循环和CAS的方式设置队列头尾节点.具体逻辑步骤如下:
一.以当前线程和排他模式Node.EXCLUSIVE创建一个当前节点,获取当前AQS队列的尾节点.
二.如果当前AQS队列尾节点不为null
1.将当前节点的prev指向尾节点
2.以CAS方式将当前节点设置为新的尾节点
A.如果设置成功,将之前的尾节点的next指向当前节点,并返回当前节点,此时最新的节点已经正确串联进了AQS队列中
B.未设置成功,说明在此期间有其他线程先一步将自己设为了AQS队列的尾节点,执行步骤三
三.执行enq方法,死循环+CAS
1.获取AQS队列尾节点
A.尾节点为空,以CAS方式将当前AQS队列的头节点设置为一个新建的空的头结点,该节点中不包含线程,若成功,将AQS队列的尾节点属性tail指向头结点head,此时AQS队列中只有一个空节点,tail和head均指向该节点.执行下一次循环 三.1
B.尾节点不为空
a.将当前节点的prev指向尾节点
b.以CAS方式将当前节点设置为AQS队列尾节点,若设置成功将之前的尾节点的next指向当前节点,并返回当前节点<font color=red>(此处是唯一能跳出循环的地方)</font>;若失败,执行下一次循环 三.1
acquireQueued方法
代码8
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环(一直阻塞),直到node的前驱节点p之前的所有节点都执行完毕,p成为了head且当前线程获取锁成功
for (;;) {
final Node p = node.predecessor();
// 抢占,第三处提现非公平的地方
if (p == head && tryAcquire(arg)) {
// 将当前节点设为AQS的head,清空当前节点的thread和prev,清空之前的头结点的next指向,这里是唯一能跳出循环的地方
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 检测当前节点是否可以被安全的挂起(阻塞)并挂起线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
再来看一下shouldParkAfterFailedAcquire方法
代码9
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取前驱节点(即当前线程的前一个节点)的等待状态
int ws = pred.waitStatus;
// 如果前驱节点的等待状态是SIGNAL,表示当前节点将来可以被唤醒,那么当前节点就可以安全的挂起了
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
// 1. 当ws>0(即CANCELLED==1),前驱节点的线程被取消了,我们会将该节点之前的连续几个被取消的前驱节点从队列中剔除,返回false(即不能挂起)
// 2. 如果ws<=0&&!=SIGNAL,将当前节点的前驱节点的等待状态设为SIGNA
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/*
* 尝试将当前节点的前驱节点的等待状态设为SIGNAL
* 1/这为什么用CAS,现在已经入队成功了,前驱节点就是pred,除了node外应该没有别的线程在操作这个节点了,那为什么还要用CAS?而不直接赋值呢?
* (解释:因为pred可以自己将自己的状态改为cancel,也就是pred的状态可能同时会有两条线程(pred和node)去操作)
* 2/既然前驱节点已经设为SIGNAL了,为什么最后还要返回false
* (因为CAS可能会失败,这里不管失败与否,都返回false,下一次执行该方法的之后,pred的等待状态就是SIGNAL了)
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
至此,已经分析出了锁的获取,AQS排队,挂起等加锁逻辑.
锁的释放逻辑unlock方法比较简单,就不贴代码了,主要逻辑就是:
- 锁的重入数,即state减1;
- 唤醒头结点的下一个节点
另外,公平锁与费公平锁的实现机制类似,主要区别就在于去掉了抢占插队的过程,因此都需要排队,以此来实现公平锁机制.
参考:
http://www.cnblogs.com/java-zhao/p/5131544.html
http://www.importnew.com/24006.html