概述
在编码中常使用ReentrantLock时候,它可以实现线程在获取锁时候公平与非公平。所谓公平在排队者挨个获取锁,非公平排队者第一个可能和插队者争抢锁。我们想来上一个类图了解他们之间关系。
源码分析
我们先从ReentrantLock构造方法开始分析。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
从代码可以看出,默认采用非公平锁。接下来了解,公平锁和非公平锁他们代码具体差异在哪里。
//公平锁
static final class FairSync extends Sync {
final void lock() {
//1参数为1 表示要去获得锁
acquire(1);
}
//非公平锁
static final class NonfairSync extends Sync {
final void lock() {
//2 通过CAS直接加锁,如果锁未被他人使用,
if (compareAndSetState(0, 1))
//3设置当前执行的线程
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
- 注释1,公平锁,直接独占方式获取锁。
- 注释2,非公平锁,通过CAS直接加锁,如果锁未被他人使用,修改成功获得锁。如果没有获取到锁然后调用 acquire(1)方法。
- 注释3,设置当前执行的线程。
我们继续看上面类是如何尝试获取锁的。
//公平锁
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//1 此方法保证获得锁根据队列来 先来先获得锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//非公平锁
static final class NonfairSync extends Sync {
//2 尝试在去获得锁
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
// 尝试获取锁
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//3 尝试获取锁,如果锁未被使用。先进入的线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//4 如果锁被自己占用,自己获取锁可重入锁
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
//5 判断头结点是否是当前线程
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer{
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
}
- 注释1,公平锁,hasQueuedPredecessors方法判断头结点(AQS后面讲,线程转换成双向链表才存储)是否是当前线程。如果是(按照链表顺序执行)当前线程,直接尝试获取锁。
- 注释2,非公平锁,尝试获取锁
- 注释3,发起线程尝试获取锁,如果锁未被使用则获取到搜,这里和链表的head存在竞争关系,锁被争抢。
- 注释4,如果锁被自己占用,自己获取锁可重入锁。
- 注释5,判断头结点是否是当前线程。
接下来我们来分析AQS(AbstractQueuedSynchronizer)源码,不管是公平锁还是非公平锁,以下实现机制都一样。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
尝试获取锁,如果没有获取到锁,把当前线程封装成Node节点,存入链表中。
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
把线程封装成Node,且有自己的状态waitStatus。
//1 为获取锁的线程,添加等待节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 2 tail 尾结点
Node pred = tail;
// 尾结点不为空
if (pred != null) {
//3 在尾结点添加新节点
node.prev = pred;
//4 修改尾结点的内存地址内存,tail = node
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
// 创建tail节点内容,第一个节点为哨兵节点,第二个节点为node
private Node enq(final Node node) {
for (;;) {
Node t = tail;
//5 尾结点为空,创建哨兵节点(Node都是一个空对象)
if (t == null) { // Must initialize
//6 tail head 都为哨兵节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
//7 为node 指定 prev节点,首先初始化哨兵节点,然后把哨兵节点作为node的prev。
node.prev = t;
//8 尾结点变化成 node
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
- 注释1,为获取锁的线程,添加等待节点。
- 注释2,拿到尾结点。
- 注释3,如果pre节点不为空,添加新节点到尾结点处。
- 注释4,修改尾结点的内存地址内存的值,tail = node。
- 注释5,尾结点为空,创建哨兵节点(Node都是一个空对象)。
- 注释6,给tail和head 赋值哨兵节点。
- 注释7,为node 指定 prev节点,首先初始化哨兵节点,然后把哨兵节点作为node的prev。
- 注释8,尾结点变化成 node。
总结,上面两个方法主要做的事情是,创建哨兵节点,下一个节点为node。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
//1 获得node的上一个节点
final Node p = node.predecessor();
//2 p节点是头结点,通过调用tryAcquire方法获得锁
if (p == head && tryAcquire(arg)) {
//3 把node节点变成哨兵节点
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//4 Node状态变成等待,删除线程取消的节点。parkAndCheckInterrupt等待获取锁
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
// 进入阻塞状态
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
//5 等待获取锁状态
if (ws == Node.SIGNAL)
return true;
// 6 线程被取消
if (ws > 0) {
do {
//7 删除被取消的线程节点
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
- 注释1,获得node(新线程)的上一个节点。
- 注释2,p节点是头结点,通过调用tryAcquire方法获得锁。
- 注释3,把node节点变成哨兵节点。
- 注释4,Node状态变成等待,删除线程取消的节点。parkAndCheckInterrupt等待获取锁。
- 注释5,等待获取锁状态。
- 注释6,被标识为取消的节点。
- 注释7,删除被取消的线程节点。
我们了解到通过AQS给需要加锁的线程,封装成Node双向链表。接下来我们看看如何唤醒阻塞的线程。
//ReentrantLock.java
// 释放锁
public void unlock() {
sync.release(1);
}
//AbstractQueuedSynchronizer.java
public final boolean release(int arg) {
if (tryRelease(arg)) {
//1 获取得到头结点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//2 拿到头结点下一个节点,都是等待获取锁的节点
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
// 给节点s,发一张允许票
if (s != null)
LockSupport.unpark(s.thread);
}
- 注释1,获取得到头结点。
- 注释2,拿到头结点下一个节点,都是等待获取锁的节点。
以上是所有线程使用ReentrantLock原理,如有描述错误地方请大家多多指正。