ReentrantLock类
- 可重入性
即当该子程序正在运行时,可以再次进入并执行它。如果进入同一个线程,该线程的锁的计数器就是增加1,只有等到锁的计数器降为0时才会被释放。 - 独占锁
java在并发模式下提供两种加锁模型,共享锁和独占锁。在独占模式下,只有一个线程可以拥有该锁,其他线程只有在该线程释放这个锁后才能申请拥有该锁。而共享锁是允许多个锁同时拥有该锁,并发访问共享资源,ReentrantLock是独占锁。 - 悲观锁
悲观锁总是假设是最坏的情况,每次去拿数据的时候都会认为会被修改,所以每次在拿数据的时候都会上锁,别人想拿这个锁的时候就会发生阻塞现象。乐观锁总是假设最好的情况,每次去拿数据的时候都会认为别人不会修改,所以不会上锁,但是在更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号机制和CAS算法去实现。ReentrantLock是悲观锁。独占锁是一种悲观锁,它避免了读、读冲突,共享锁是一种乐观锁。 - 公平锁与非公平锁
是指线程在请求获取锁的过程中,是否允许插队。在公平锁中,线程会按照它们发出的请求来获得锁,而非公平锁则允许在线程发出请求后立即尝试获取锁,尝试失败才进行排队等待。ReentrantLock默认采用非公平模式
锁的类Node
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;
#因超时或中断,该线程被取消
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
#该线程的后继线程已阻塞,当该线程release或cancel要重启这个后继线程
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
#表明该线程被处于条件队列,就是因为调用了Condition.await而被阻塞
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
#传播共享锁
static final int PROPAGATE = -3;
volatile int waitStatus;
volatile Node prev;
volatile Node next;
volatile Thread thread;
Node nextWaiter;
private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
在上面代码中,有一个参数state,初始化为0,表示未锁定状态。A线程Lock时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其他线程才有机会获取该锁。再A线程释放该锁之前,A线程可以重复获取该锁,并执行state+1,即为可重入。但要注意的是,获取多少次就要释放多少次,这样才能保证state是能回到零态的。
公平锁FairSync类
FairSync主要有两个方法 lock()和tryAcquire(),
lock方法中会调用AbstractQueuedSynchronizer类中的acquire()方法。
AbstractQueuedSynchronizer是一个抽象类,tryAcquire()的具体方法过程在FairSync类和NonfairSync类中实现。
与下文的非公平锁相比较,tryAcquire()多了一个判断条件hasQueuedPredecessors()
如果hasQueuedPredecessors返回true,表示有其他线程先于当前线程等待获取锁,此时为了实现公平,保证等待时间最长的线程先获取到锁,不能执行CAS
public final void acquire(int arg) {
//尝试获取锁
if (!tryAcquire(arg) &&
//获取不到,则放到等待队列中去,返回是否中断
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
//返回中断,调用该方法
selfInterrupt();
}
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
protected final boolean tryAcquire(int acquires) {
//获取当前线程
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
//没有线程获取锁
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前线程是否是获取锁的线程
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
上面用到了compareAndSetState(0, acquires)方法,是一种CAS(compare and swap),即比较和替换
CAS(compare and Swap)
<font color = "red" >该部分内容引用了简书大神占小狼的内容https://www.jianshu.com/p/fb6e91b013cc </font>
它有三个参数,一个为当前内存值V,旧的预期值A和即将更新的值B,当且仅当旧的预期值A和内存值V相等时,才能将内存值修改为B并返回true,其他情况不执行任何操作并返回false.
Unsafe是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过本地(native)方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this,
#变量stateOffset表示该变量值在内存中的偏移地址,以为Unsafe就是根据内存偏移地址获取数据的
stateOffset, expect, update);
}
- 使用场景
使用于简单的对象的操作,比如布尔值,整型值
适合冲突较少的情况,比如太多线程在同时自旋,那么长时间自旋会导致CPU开销很大 - CAS的ABA问题
ABA问题就是如下所示:
- 线程1获取该位置的数值为A,之后执行CAS操作,期望值为A,只有当前的值为A才能被修改
- 线程2将数据修改为B
- 线程3将数据改为A
- 线程1检测当前的值为A,与预期值相同,则执行操作。
在上述并发条件下,线程1最后执行了修改操作,但此时的A与初始的A不一样了,中间经历了其他修改,CAS就是防止在执行操作时预防有别的线程在操作。
- CAS的ABA问题的解决办法
新增了AtomicStampedReference类来解决该问题,为值增加了修改版本号,每次修改时,都会修改其版本号。首先检查当前引用是否等于预期引用,并且当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
public class AtomicStampedReference<V> {
private static class Pair<T> {
//对象引用
final T reference;
//标志版本
final int stamp;
private Pair(T reference, int stamp) {
this.reference = reference;
this.stamp = stamp;
}
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
# expectedReference:更新之前的期望值
# newReference 要更新的新值
# expectedStamp 期待更新的标志版本 newStamp: 将要更新的标志版本
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
非公平锁NonfairSync类
非安全锁在上锁时会先通过compareAndSetState(0, 1) 判断如果没有线程拥有该锁时就直接添加到该线程中
如果有线程拥有该锁的话,就调用acquire()方法,同上面的acquire()方法一致
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
同步队列添加元素
先获取同步队列的尾结点,然后将插入的结点的前驱结点连接到原尾结点,之后通过CAS方式设置同步结点的尾结点的地址为插入的结点位置。当尾结点为空或者设置tail指向新插入的结点失败时执行enq方法。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
//自旋操作
for (;;) {
Node t = tail;
//如果尾结点为空
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
//通过CAS添加到尾部
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
锁的释放
首先调用unlock方法,然后调用AbstractQueuedSynchronizer类中的release()方法
调用tryRelese()方法释放当前线程并唤醒启动等待队列里的头结点
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
#尝试释放当前线程
if (tryRelease(arg)) {
#启动等待线程队列里的头结点,h为当前执行的线程,在unparkSuccessor方法中会唤醒当前线程的后继结点
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
在tryRelease()方法中只有当线程的state为0时才会释放该线程,否则说明是重入锁,需要多次释放将state改为0才能该线程释放掉。
protected final boolean tryRelease(int releases) {
#线程计数器减1
int c = getState() - releases;
#只有锁的拥有者是当前线程时才能继续执行,否则会抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
#唤醒该进程
LockSupport.unpark(s.thread);
}
才疏学浅,文章如果有不足请多多指教
该文章也发布在了我的个人博客 https://spurstong.github.io