1、ReentrantLock 可重入锁
2、ReadWriteLock 读写锁或共享锁
3、StampedLock 独占锁或排它锁
具体源码讲解
1.1 ReentrantLock 四种加锁方式
lock 没拿到锁无限等待
trylock 没拿到锁立即返回
trylock(long timeout, TimeUnit unit) 获取锁超时即失败
lockInterruptibly 没拿到锁无限等待,但能被中断
以上都支持公平锁,可重入
a、以非公平锁加锁实现为例解读,其他方式只是个别不同
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 加锁
final void lock() {
// CAS获取锁
if (compareAndSetState(0, 1))
// CAS获取成功则记录当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 没有成功则再进行尝试,不成功则入队
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
public final void acquire(int arg) {
// 进行重试加锁 或 进行重入
if (!tryAcquire(arg) &&
// 重试或重入失败,则进行入队
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
// 重试获取锁
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 判断是否可重入
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 可重入即增加重入次数
setState(nextc);
return true;
}
return false;
}
// 入队
private Node addWaiter(Node mode) {
// 为当前加锁线程创建节点
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 如果已有尾节点,则CAS尝试入队
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// CAS入队失败 或 队列未初始化,则进入
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 队列未初始化,则进行创建空节点,进行初始化
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
// CAS 尝试入队,不成功则重试
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 判断当前线程是否是队列头结点(即是空节点的下一个节点)
// 如果是头结点,tryAcquire一定会拿到锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 这里会尝试2次进入阻塞状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 如果前驱节点是等待信号状态,则返回true进入阻塞
if (ws == Node.SIGNAL)
return true;
// 如果前驱节点大于0,则是取消状态,
if (ws > 0) {
// 向前移除已取消的节点,直到头节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 不是前两种情况,则进入等待信号状态
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
b、trylock
// 只是判断了是否可以获取锁,或者是否可重入,否则直接返回失败
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
c、tryLock(long timeout, TimeUnit unit)
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return true;
}
nanosTimeout = deadline - System.nanoTime();
// 规定时间内未获取到锁,则返回失败
if (nanosTimeout <= 0L)
return false;
if (shouldParkAfterFailedAcquire(p, node) &&
// 这里规定了锁等待超时时间要大于1s,不然不会进行阻塞,
// 这里考虑两点,1、时间太短,进行重试即可,2、调用阻塞的性能损耗较高,但是CAS重试也有性能损耗
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
if (Thread.interrupted())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
d、lockInterruptibly
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
// 如果改线程已被中断,则会抛出异常,即获取不到锁,且会抛错
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
e、公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 获取锁之前会判断是否当前线程是否是头结点,不然不会尝试获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
2.1 ReentrantReadWriteLock
读写锁有这几种性质,读读共享,读写互斥,写写互斥
下面分析下读锁,写锁加锁过程
a、读锁加锁
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
// 低16位表示读锁次数,高16位表示写锁次数
int c = getState();
// 如果存在写锁且占用线程不是当前线程,则返回失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 判断当前读锁是否需要阻塞,因为当前线程存在已经加了写锁未释放情况
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 读锁次数为0,表示第一次加读锁,记录第一读锁信息
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
// 如果是第一次读锁,则++
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 判断当前线程是否已存在加读锁,存在则++
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// CAS重试加读锁
return fullTryAcquireShared(current);
}
// 这个方法就是CAS重试,逻辑和tryAcquireShared没什么区别
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}