1. 非公平锁实现原理
加锁解锁流程
先从构造器开始看,默认为非公平锁实现,NonfairSync 继承自 AQS
public ReentrantLock() {
sync = new NonfairSync();
}
没有竞争时
第一个竞争出现时
Thread-1 执行了
- CAS 尝试将 state 由 0 改为 1,结果失败
- 进入 tryAcquire 逻辑,这是 state 已经是 1,结果仍然失败
- 接下来进入 addWaiter 逻辑,构造 Node 队列
- 图中黄色三角表示该 Node 的 waitStatus 状态,其中 0 为默认正常状态
- Node 的创建是懒惰的
- 其中第一个 Node 称为 Dummy(哑元)或哨兵,用来占位,并不关联线程
当前线程进入 acquireQueued 逻辑
- acquireQueued 会在一个死循环中不断尝试获取锁,失败后进入 park 阻塞
- 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 head 的 waitStatus 改为 -1,这次返回 false
- shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
- 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
- 进入 parkAndCheckInterrupt,Thread-1 park(灰色表示)
再次有多个线程经历上述过程竞争失败,变成这个样子
Thread-0 释放锁,进入 tryRelease 流程,如果成功
- 设置 exclusiveOwnerThread 为 null
- state = 0
当前队列不为 null,并且 head 的 waitStatus = -1,进入 unparkSuccessor 流程
找到队列中离 head 最近的一个 Node(没取消的),unpark 恢复其运行,本例中即为 Thread-1
回到 Thread-1 的 acquireQueued 流程
如果加锁成功(没有竞争),会设置
- exclusiveOwnerThread 为 Thread-1,state=1
- head 指向刚刚 Thread-1 所在的Node,该 Node 清空 Thread
- 原本的head因为从链表断开,而可被垃圾回收
如果这时候有其它线程来竞争(非公平的提现),例如这时有 Thread-4 来了
如果不巧又被 Thread-4 占了先
- Thread-4 被设置为 exclusiveOwnerThread,state = 1
- Thead-1 再次进入 acquireQueued 流程,获取锁失败,重新进入 park 阻塞
加锁源码
先从 NonfairSync 的 lock 方法开始看
final void lock() {
// 第一次尝试CAS获取锁,将state的值从0改为1
if (compareAndSetState(0, 1))
// 如果加锁成功,设置exclusiveOwnerThread为当前线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果加锁失败,走下面这个流程,这是个模板方法
acquire(1);
}
AQS 里面的 acquire 方法
public final void acquire(int arg) {
// 第二次尝试CAS获取锁
if (!tryAcquire(arg) &&
// 如果再次或者锁失败,则需要创建Node节点放到阻塞队列(双向链表)中,并阻塞当前线程
// 这个后续再说,先看tryAcquire里面的逻辑
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
NonfairSync 里面的具体实现 tryAcquire 方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取state的值
int c = getState();
// 如果state为0,则去获取锁,如果成功,设置exclusiveOwnerThread为当前线程,返回true
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 如果exclusiveOwnerThread是当前线程(锁重入的实现,对state的值进行累加)
// 将累加后的state的值进行更新,返回true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
// 如果上述流程都不满足,则获取锁失败,返回false
return false;
}
如果 tryAcquire 方法返回false,流程会继续向下走 addWaiter 方法和 acquireQueued 方法
private Node addWaiter(Node mode) {
// 根据当前线程创建一个Node节点
Node node = new Node(Thread.currentThread(), mode);
// 如果链表有节点,将新创建的当前线程节点放到链表末尾
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果链表还没有节点,将会先创建一个无线程的空节点作为占位节点,为了用于唤醒后继节点使用
// 然后再将新创建的当前线程节点放到链表末尾
// 注意这时线程还处于waitStatus为0的活跃状态,需要后续acquireQueued方法来进行park
enq(node);
return node;
}
AQS 里面的 acquireQueued 方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是头节点,说明当前节点是第二节点
// 第三次尝试CAS获取锁
// 如果获取成功,将当前节点作为头节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取失败,先走shouldParkAfterFailedAcquire方法来改变前置节点的waitStatus的值
// 如果前置节点waitStatus为0,则CAS将waitStatus改为-1,返回false,进入下次循环
// 如果前置节点waitStatus为-1,则返回true,走parkAndCheckInterrupt方法来park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 取消获取逻辑
if (failed)
cancelAcquire(node);
}
}
解锁源码
先从 ReentrantLock 的 unlock 方法开始看
public void unlock() {
sync.release(1);
}
AQS 里面的 release 方法
public final boolean release(int arg) {
// 尝试去解锁,里面包含了锁重入对state递减操作,如果解锁成功,才会走下面的逻辑
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 找到头节点之后的最近一个被阻塞的节点,并将其线程unpark
unparkSuccessor(h);
return true;
}
return false;
}
让我们先看下 tryRelease 方法,这个需要在 Sync 的具体实现中来看
protected final boolean tryRelease(int releases) {
// 对state值进行递减,可能因为锁重入导致state值大于1
int c = getState() - releases;
// 如果当前线程不是持有锁线程,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 如果state减为0,说明已经释放了锁,将exclusiveOwnerThread置为null,返回free为true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 重新设置state的值
setState(c);
return free;
}
AQS 里面的 unparkSuccessor 方法
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
// 如果头节点的waitStatus小于0,则CAS将其修改为0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 获取后驱节点,如果后驱节点状态大于0(已取消)
// 从链表尾部开始循环,找到最近的一个后驱节点waitStatus小于等于0的来进行unpark
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 如果这个后驱节点不为空,对其线程进行unpark操作
if (s != null)
LockSupport.unpark(s.thread);
}
这时候非公平锁就和公平锁就有不同之处了,如果这时候恰巧有个线程来和刚刚被 unpark 的线程进行竞争锁,是有可能刚来的线程获取到锁,刚刚被唤醒的线程因没有获取到锁而再次被 park,这时候得跳到 AQS 里面的 acquireQueued 方法来看,因为线程一直在这死循环呢
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前节点的前置节点
final Node p = node.predecessor();
// 如果前置节点是头节点,说明当前节点是第二节点
// 再次尝试CAS获取锁,这时候就是非公平锁的体现
// 有可能刚刚被唤醒的线程再次获取锁失败,从而走下面的逻辑再次被park
// 如果获取成功,将当前节点作为头节点
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果获取失败,先走shouldParkAfterFailedAcquire方法来改变前置节点的waitStatus的值
// 如果前置节点waitStatus为0,则CAS将waitStatus改为-1,返回false,进入下次循环
// 如果前置节点waitStatus为-1,则返回true,走parkAndCheckInterrupt方法来park当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 取消获取逻辑
if (failed)
cancelAcquire(node);
}
}
2. 公平锁实现原理
公平锁的整体实现流程和非公平锁差别不是很大,但还有一些细节地方稍有不同,让我们直接从源码入手
公平锁源码
// 与非公平锁主要区别在于tryAcquire方法的实现
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 先检查AQS队列中是否有前驱节点,没有才去竞争
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// h != t 时表示队列中有Node
return h != t &&
// (s = h.next) == null 表示队列中还有没有老二
// 或者队列中老二线程不是此线程
((s = h.next) == null || s.thread != Thread.currentThread());
}
3. 可重入原理
可重入原理上面已经提到过,这里就不赘述了。
当加锁时,如果当前线程为锁持有线程,则对state的值进行累加。
当解锁时,对state的值进行递减,直至state减至0时,解锁完成。
4. 可打断原理
不可打断模式
在此模式下,即使它被打断,扔会驻留在 AQS 队列中,等获得锁后方能继续运行(是继续运行!只是打断标记被设置为 true)
// Sync继承自AQS
static final class NonfairSync extends Sync {
// ...
private final boolean parkAndCheckInterrupt() {
// 如果打断标记已经是true,则park会失效
LockSupport.park(this);
// interrupted会清除打断标记
return Thread.interrupted();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
// 还是需要获取锁后,才能返回打断状态
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果是因为interrupt被唤醒,返回打断状态为true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果打断状态为true
selfInterrupt();
}
static void selfInterrupt() {
// 重新产生一次中断
Thread.currentThread().interrupt();
}
}
可打断模式
static final class NonfairSync extends Sync {
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 如果没有获得到锁,进行(一)
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
// (一)可打断的获取锁流程
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 在park过程中如果被interrupt会进入此
// 这时候抛出异常,而不会再次进入for (;;)
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}
5. 条件变量实现原理
每个条件变量其实就对应着一个等待队列,其实现类是 ConditionObject
await 流程
开始 Thread-0 持有锁,调用 await,进入 ConditionObject 的 addConditionWaiter 流程
创建新的 Node 状态为 -2(Node.CONDITION),关联 Thread-0,加入等待队列尾部
接下来进入 AQS 的 fullyRelease 流程,释放同步器上的锁
unpark AQS 队列中的下一个节点,竞争锁,假设没有其他竞争线程,那么 Thread-1 竞争成功
park 阻塞 Thread-0
signal 流程
假设 Thread-1 要来唤醒 Thread-0
进入 ConditionObject 的 doSignal 流程,取得等待队列中第一个 Node,即 Thread-0 所在 Node
执行 transferForSignal 流程,将该 Node 加入 AQS 队列尾部,将 Thread-0 的 waitStatus 改为 0,Thread-3 的 waitStatus 改为 -1
Thread-1 释放锁,进入 unlock 流程,略