ReentrangLock与synchronized对比:
ReentrantLock:
- 依赖AQS实现
- 支持响应中断,超时
- 需要显式调用unlock解锁
- 支持公平锁 非公平锁
- 支持多个条件队列
Synchronized:
- jvm内部实现
- 自动释放锁,不需要手动调用
- 只支持非公平锁
- 支持一个条件队列
公平锁与非公平锁
ReentrantLock 支持公平与非公平两种方式,通过构造函数中的boolean参数,即可选择不同的类型,内部通过Sync来做加锁解锁操作,而Sync就是AQS的一个实现。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
FairSync
lock 方法:
// ReentrantLock.java - FairSync
final void lock() {
acquire(1);//直接调用acquire方法 参数可以理解为要获取的锁
}
// AbstractQueuedSynchronizer.java
public final void acquire(int arg) {// acquire方法位于AQS中,是一个模版方法
//这里主要分为三步骤
// 1 tryAcquire尝试拿锁
// 2 tryAcquire 拿锁失败 则通过addWaiter进行Node创建以及入队操作
// 3 通过acquireQueued方法 尝试拿锁,拿不到锁的时候 阻塞自己
if (!tryAcquire(arg) && //针对不同的模式 有不同的traAcquire实现
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// acquireQueued会返回是否中断标示,因为内部会擦除中断标志位,
// 所以这里重新设置下,方便用户捕获中断并执行对应操作
selfInterrupt();
}
// ReentrantLock.java - FairSync
protected final boolean tryAcquire(int acquires) { // 公平锁的拿锁操作
final Thread current = Thread.currentThread(); // 获取当前线程
int c = getState(); // 获取当前状态
if (c == 0) { // 0则没有线程上锁
if (!hasQueuedPredecessors() && // 队列中没有先驱
compareAndSetState(0, acquires)) { // 设置状态位(拿锁)
// 拿锁成功
setExclusiveOwnerThread(current); // 设置独占线程为当前线程
return true;
}
// 已经被上锁 但是当前线程是上锁的线程(重入)
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires; // state 进一步加1
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc); // 这里不需要通过cas操作了 因为都是一个线程
return true;
}
return false;
}
unlock方法:
// ReentrantLock.java - FairSync
public void unlock() {
sync.release(1); // 借助sync的release方法完成释放锁的操作
}
// AbstractQueuedSynchronizer.java
public final boolean release(int arg) { // AQS中的模版方法
if (tryRelease(arg)) { // 释放锁
Node h = head; // 拿到AQS队列头节点
if (h != null && h.waitStatus != 0) // 头节点不为空并且状态不为0(初始状态)
unparkSuccessor(h); // 唤醒后继节点(后继节点拿锁)
return true;
}
return false;
}
// ReentrantLock.java - FairSync
protected final boolean tryRelease(int releases) { // 释放锁操作
int c = getState() - releases; // 重入数量减1
// 如果当前线程不是记录的独占的线程 则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 所有加的锁都被释放(如:lock 10次 也要unlock 10次)
free = true;
setExclusiveOwnerThread(null); // 设置独占线程为null
}
setState(c); // 修改状态
return free;
}
可以看到 ,整个流程通过state字段表示当前线程加锁的次数,state为0的时候就是没有线程上锁,通过exclusiveOwnerThread表示当前拿到锁的唯一的线程(因为这里是互斥锁),通过state+n支持重入操作。
NonfairSync
NonfairSync 与FairSync在整个加锁流程上并没有什么本质上的区别,唯一的区别就是,加锁的时候,不先去判断队列是否有等待的线程,而是直接尝试拿锁,拿不到锁才进行入队操作。
lock 方法:
final void lock() {
// 主要区别:进来直接尝试加锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
// 主要区别:没有执行hasQueuedPredecessors方法判断队列是否有值
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
上面主要讲了ReentrantLock 加锁与解锁的一些流程,还有一些涉及AQS操作的方法我们没有深入,下面来深入看一下这些方法:
AQS内部维护了一个双向链表,用来记录所有等待的线程,其链表节点就是Node:
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
// 节点的状态 枚举就是上面这些 这里的状态其实记录的是应该对下一个节点中的线程采取的措施
// 比如如果该节点状态为-1 那么下一个节点中的线程就是可被唤醒的)
volatile int waitStatus;
volatile Node prev; // 上一个节点
volatile Node next; // 下一个节点
volatile Thread thread; // 正在等待的线程
Node nextWaiter; // 为condition准备
}
然后是 上面acquire方法中的几个关键方法:
首先会创建节点并将节点入队:
addWaiter:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 这里调用addWaiter会传入节点模式 这里可以看到是EXCLUSIVE模式
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建一个对应模式的node
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) { // 如果队列中已经有节点 则先尝试下入队 否则直接直接enq()
node.prev = pred; // 节点入队
if (compareAndSetTail(pred, node)) { // 可能别的线程也在尝试入队 所以这里用cas
pred.next = node;
return node; // 如果cas成功 世界返回
}
}
enq(node); // 如果上面入队失败 这里开始自旋入队
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 队列必须被初始化
if (compareAndSetHead(new Node())) // 设置头节点为空节点 (链表基本操作)
tail = head;
} else { // 本节点入队
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
然后会通过acquireQueued方法 执行抢锁或者阻塞逻辑:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
// 通过addWaiter拿到当前节点后 执行acquireQueued
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false; // 标识该线程是否被阻塞
for (;;) {
final Node p = node.predecessor(); // 拿到上一个节点
// 上一个节点是头节点 就可以进行加锁操作了
if (p == head && tryAcquire(arg)) {
// 加锁成功则设置当前节点为头节点 并清除记录的线程
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 这里拿锁失败则通过shouldParkAfterFailedAcquire判读是否要park阻塞线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 走到这里说明 当前节点有被中断过 所以将标示返出去 方便外界补操作(再次中断)
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 判断前驱节点的状态来决定 当前线程是否应该被park
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 前驱节点为SIGNAL 则线程可以被park
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) { // ws大于0 则该节点已被废弃
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { // 更新前驱节点状态为SIGNAL
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() { // 挂起当前线程
LockSupport.park(this);
return Thread.interrupted(); // 将当前线程是否被中断return出去
}
整个AQS等待队列大概就这样,不过这里只分析了独占的状态,还有一些放在后面的文章分析。
后记
Thread t1 = new Thread(() -> {
while (true){
System.out.println("prepare");
LockSupport.park();
System.out.println("continue");
}
});
t1.start();
Thread.sleep(1000);
// LockSupport.unpark(t1);
t1.interrupt();
t1.join();
输出:
prepare
continue
prepare
continue
prepare
continue
prepare
continue
prepare
continue
prepare
continue
...
interrupt后线程会记录标识,后面不会被park了 除非调用 Thread.interrupted()擦除标识位:
Thread t1 = new Thread(() -> {
while (true){
System.out.println("prepare");
LockSupport.park();
System.out.println("continue");
Thread.interrupted();
}
});
t1.start();
Thread.sleep(1000);
// LockSupport.unpark(t1);
t1.interrupt();
t1.join();
输出:
prepare
continue
prepare
可以看到 这样就会再次park了。