导语
开篇之前,先说清楚此篇文章要讲什么
- ReentrantLock实现原理是什么?
- 什么独占锁,什么是共享锁
- await()、single()实现原理是什么?
- concurrent包下都有什么锁,实现原理是什么?
神秘的AQS(AbstractQueuedSynchronizer)
为什么突然一下就转到AQS上面了,因为这个抽象类才是实现concurrent包下所有的锁的核心原理(核心代码都在这里,弄懂这个,所有锁就都明白了)
他是基于CAS操作的乐观并发的锁机制
什么是AQS
-
AQS制定了两种资源共享的方式
-
Exclusive :独占,当前锁只能被一个线程独自占有,如
ReentrantLock
-
Share : 共享,当前锁可以被多个线程持有(简而言之就是,多个线程同时被阻塞;当释放锁资源的时候,将共享这个锁的线程同时释放),如
Semaphore
、CountDownLatch
、ReeantrantReadWriteLock
-
Exclusive :独占,当前锁只能被一个线程独自占有,如
-
AQS定义了两个队列
-
Sync Queue:存储那些等待锁资源的线程数据的
Node
(存储锁数据的model)节点, 在独占模式和共享模式下都会使用;此队列的head指针只是用来作为头指针使用,没有实际意义 -
Condition Queue: 存储那些await的线程数据的
Node
节点,因为Sync Queue中Thread获取到锁之后,会出队列;此时如果线程await了,那么需要释放锁资源,然后进入到Condition Queue中;当此线程signal()的时候,会出队列,然后进入到Sync Queue等待锁资源;只有独占模式会使用
-
Sync Queue:存储那些等待锁资源的线程数据的
Node节点,用户存储锁相关资源的数据(如:线程),
Sync Queue
和Condition Queue
中存储的数据-
AQS重要成员变量
- state: 标识当前锁的状态,一般独占锁标识当前锁是否有线程占有,但是共享锁一般标识当前是否有线程被阻塞;比如:
ReentrantLock
还利用它实现了可重入的特性,CountDownLock
利用它实现了倒计时释放锁的特性 - exclusiveOwnerThread: 当前持有锁的线程,通过此变量可以判断一个线程是否是当前获取锁的线程
- head、tail: 头尾指针(Sync Queue队列里的),通过head == tail来判断当前锁没有等待获取的线程,也就是Sync Queue为空(
hasQueuedThreads()
方法就是这样判断)
- state: 标识当前锁的状态,一般独占锁标识当前锁是否有线程占有,但是共享锁一般标识当前是否有线程被阻塞;比如:
Node节点
数据结构如下:
static final class Node {
/** 标记:共享模式节点 */
static final Node SHARED = new Node();
/** 标记:独占模式节点 */
static final Node EXCLUSIVE = null;
/** waitStatus状态: 标识当前等待的显示是否被cancel掉了 */
static final int CANCELLED = 1;
/** waitStatus状态: 标识下一个节点是需要唤醒的阻塞线程 */
static final int SIGNAL = -1;
/** waitStatus状态: 标识当前节点的状态是await状态 */
static final int CONDITION = -2;
/**
* waitStatus 状态:标识下一个节点是需要唤醒的共享模式线程
*/
static final int PROPAGATE = -3;
/**
* 标记Node节点的wait状态
*/
volatile int waitStatus;
volatile Node prev;
volatile Node next;
/**
* 当前等待队列的线程
*/
volatile Thread thread;
/**
*
*/
Node nextWaiter;
注意:
- 对于独占模式下的节点,当前节点是否能获取锁,是由prev节点的waitStatus状态,也就是当前prev node是SIGNAL状态,才能获取锁资源
- 对于condition queue,也就是await状态的node,
源码解析
独占模式详解,以ReentrantLock为入口例子
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
public void lock() {
sync.lock();
}
public void unlock() {
sync.release(1);
}
通过上面可知:ReentrantLock的锁机制,是通过成员变量sync实现的,下面来看sync的实现
abstract static class Sync extends AbstractQueuedSynchronizer {
/**
* 下面公平锁和非公平锁实现的抽象方法
* 非公平锁:直接调用nonfairTryAcquire()即可;也就是当前线程在竞争锁资源的时候,只要当前锁是空闲的,不管Sync Queue有没有阻塞的线程;谁快,谁就可以获取,所以不公平
* 公平锁:只要当前Sync Queue不为空,那么即使当前锁空闲,当前线程也不能获取
*/
abstract void lock();
/**
* 非公平尝试获取锁的方式
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 当前锁的状态
int c = getState();
if (c == 0) { // 如果空闲,直接获取锁资源
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果当前线程就是获取锁的线程(一个带lock的方法,调用了另外一个带lock的方法),在原有的state基础上累计,代码可重入,这里不能执行获取锁操作,直接执行代码就可以了
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// 判断当前线程是否是获取锁的线程
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 获取condition,也就是实现await的对象
final ConditionObject newCondition() {
return new ConditionObject();
}
// 获取当前获取锁的线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
final boolean isLocked() {
return getState() != 0;
}
}
我们先以非公平锁为例切入
static final class NonfairSync extends Sync {
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
// 如果当前锁空闲,直接获取锁,执行就可以了
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 申请锁资源
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
下面看AQS.acquire()方法实现,也就是线程如何申请锁资源的,也是核心
public final void acquire(int arg) {
// tryAcquire()是子类实现的,用于判断当前线程是否获取到了锁资源,如果没有获取到,需要将线程加入到Sync Queue中,并且阻塞当前线程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
AQS.addWaiter()是将当前线程加入到Sync Queue中操作,并且声明了当前节点是EXCLUSIVE模式
acquireQueued() 是一个自旋操作,也就是先尝试获取锁,没有获取到,就被Lock住,等待后面的唤醒
private Node addWaiter(Node mode) {
Node node = new Node(mode);
// 将当前添加到末尾,并设置tail,然后返回当前Node
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
// CAS操作
U.putObject(node, Node.PREV, oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
// 初始化Sync Queue队列,对head赋值,并且tail=head
initializeSyncQueue();
}
}
}
核心方法acquireQueued()来了
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
// 获取node的prev节点
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { // 当前节点的前一个节点是head节点,并且tryAcquire()为true(当前锁空闲)
setHead(node); // 将当前节点设置为头节点,也就相当于将当前线程从Sync Queue删除
p.next = null; // help GC
return interrupted;
}
// 这里判断当前线程是否要进行到阻塞状态,并且进行线程阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
// 执行此方法,就代码当前线程获取了锁,并且将当前node从Sync Queue中删除
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
下面来看shouldParkAfterFailedAcquire()和 parkAndCheckInterrupt()操作
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus; // prev节点的waitStatus
// 如果前置节点是SINGNAL状态,才返回true,然后进行到阻塞状态
if (ws == Node.SIGNAL)
return true;
if (ws > 0) { // 只有CANCED状态才大于0,因此表示prev节点线程被关掉了,下面需要将当前节点删除掉
/*
* 删除prev节点,直到找到一个没有canced的node
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else { 表示waitStatus是0,也就是head的初始状态,需要将其设置为SIGNAL,表示下一个node是要被唤醒的状态
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 将当前线程阻塞住,释放cpu资源
return Thread.interrupted();
}
通过上面两个方法,以及前面的acquireQueued()的死循环,可以得知第一次调用shouldParkAfterFailedAcquire()由于当前node的prev的waitStatus是0,需要将其设置为SINGAL状态,然后第二次循环shouldParkAfterFailedAcquire()就会返回true,然后将当前线程阻塞住。
总结:
通过上面acquire()的分析,可以得知线程获取锁实现的流程是:
- 尝试获取锁,失败,进入2
- 为当前Thread 创建Node,并且加入到Sync Queue中
- 循环自旋操作, tryAquire()失败,然后进入到阻塞状态
- 其他线程释放锁,当前线程处于Sync Queue第一个,释放线程的阻塞
- 继续tryAquire(),成功,则执行当前线程代码;失败(新线程抢占了锁资源),进入阻塞状态
下面继续看ReentrantLock.unlock()的原理, 通过上面代码可知其实现是调用了AQS.release(1)
public final boolean release(int arg) {
// 尝试释放锁(ReentrantLock实现)
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0) // head.waitStatus != 0 表示后面还有需要唤醒的线程(Sync Queue出队列,都会将head的waitStatus重置为0)
unparkSuccessor(h); // 释放Sync Queue中第一个waitStatus不是CANCELED
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0); // 重置head节点的waitStatus为0
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) { // 如果head下一个节点为null,或者waitStatus为CANCELED,则选取第一个不为CANCELED状态的node
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒这个线程
}
上面总结里面的第4点,等待其他线程唤醒
,上面的代码就是一个线程唤醒Sync Queue中阻塞线程的代码。
Condition实现原理(仅在独占锁里使用)
我们知道Condition是用来线程之间通信的,await()会阻塞当前正在执行的线程,释放锁资源;然后等待其他线程调用signal()方法来唤醒当前阻塞的线程,然后当前线程去继续竞争锁资源。
知道了如何使用,那么我们来看具体的源码吧
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 为当前节点创建Node节点,然后添加到Condition Queue中
int savedState = fullyRelease(node); // 调用release()方法释放正在Sync Queue中的线程
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 判断当前节点是否在Sync Queue里面,如果不在,那么就阻塞当前线程;当调用single()方法会释放当前线程阻塞状态,并且将当前node添加Sync Queue, 退出当前循环
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// acquireQueued() 又进入到上面我们分析的获取锁的阻塞方法里面
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
private Node addConditionWaiter() {
Node t = lastWaiter;
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters(); // 删除那些CANCELLED的Node
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
下面我们继续single()是如何释放当前线程阻塞状态,并且添加到Sync Queue里面的
public final void signal() {
// 保证调用signal()是当前拥有当前锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
// 找到当前Condition Queue中第一个不是CANCELLED状态的Node
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
// 将当前node 的 waitStatus 重置为0
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
// 将当前node加入到Sync Queue里面, 并且返回当前node 的prev node
Node p = enq(node);
int ws = p.waitStatus;
// 如果prev node是CANCELLED状态,或者状态改变了,释放当前节点的阻塞
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
总结:
condition的操作流程
- 获取lock正在执行的线程,调用await()方法
- 调用release()唤醒Sync Queue 的线程,并且释放锁(tryRelease()修改state)
- 阻塞当前线程
- 其他线程调用signal()方法,将Condition Queue中第一个不为CANCELLED的Node添加到Sync Queue里面,等待唤醒
- 如果刚添加的node的prev node的waitStatus状态是CANCELLED或者改变,直接唤醒当前阻塞的Condition node
- 然后当前node调用acquireQueued()回归到EXCLUSIVE node获取锁的流程中
最后,来看看共享模式是如何实现
我们以CountDownLatch为入口例子分析
首先说明CountDownLatch的作用是:
- 初始化会传递一个count,表示倒数的次数,当count减少到0,唤醒阻塞的线程
- 调用await()方法阻塞当前线程,可以多个线程调用,会阻塞同时阻塞主这些线程的
- 调用countDown(),将count递减,当减少到0时,会同时释放上面所有阻塞的线程
- 共享锁的核心思想就是:多个线程同时共享这一个锁,阻塞同时阻塞,释放同时释放
public class CountDownLatch {
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {
setState(count);
}
int getCount() {
return getState();
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
}
上面的代码很简单,我就不解析了,下面直接AQS.acquireSharedInterruptibly() 和 AQS.releaseShared() 方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 获取到锁
// 这个是实现共享锁的核心,也是和独占锁实现的区别所在,下面的代码和独占锁都是相同的
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node); // 将当前节点从Sync Queue删除
// 下面的代码很简单,就是继续释放当前node的后面的Share模式的Node的阻塞状态
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared(); // 继续执行释放Share node阻塞状态的方法
}
}
通过上面可以看出,共享锁和独占锁的区别:当有一个SHARE模式的node被释放阻塞状态,Sync Queue中它后面连续的SHARE模式的node都会释放阻塞状态。
下面继续看释放阻塞状态的代码:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // Sync Queue中还有数据
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h); // 释放head后继节点的阻塞状态
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
总结:
- 调用AQS.acquireShared()会阻塞当前线程
- 调用AQS.releaseShared()会释放当前Sync Queue中第一个node
- 当SHARE模式的Node获取到锁资源,会同时释放当前SyncQueue后连续的SHARE node阻塞状态
AQS的具体应用
ReentrantLock(可重入锁)
因为上面分析独占锁的时候,就是以ReentrantLock
为例子来讲解的,因此这里就不详细赘述上面已经说过的功能了。
可重入实现
通过state的设置,每次lock,都会将state递增;当unlock的时候,也必须将state降低为0,才能释放当前锁;;;可重入
的体现:当已经获取到锁了,不会再进行任何操作,继续执行代码就可以了。
下面重点聊一聊,公平锁和非公平锁的实现,上面应该已经在代码注释中写过
- 非公平锁,只要来一个新Thread获取锁,只要当前锁空闲(state==0),当前线程就可以获取锁执行;不管Sync Queue还有没有阻塞的线程,
谁快谁用
- 公平锁:新Thread获取锁,如果当前锁空闲;此时会检测当前Sync Queue中还有没有阻塞的线程,如果有,就不允许当前线程获取锁;必须要入队Sync Queue等待
- 非公平锁的吞吐量优于公平锁
- 他们的实现都在tryAcquire()里面实现
CountDownLatch
关于CountDownLatch
,上面讲解共享锁的时候已经将原理都讲明白了。
重点说下使用场景: 当一个任务,需要等其他1个或几个异步任务都执行完毕,才能执行的场景就可以使用。
Android中SharePreference
就使用到了CountDownLatch
,还没有详细解析,后面会补上。
现在补上:
- 每一个edit()操作,都是一个任务,会开启一个线程将改动内容同步到磁盘;会将任务发到QueuedWork里面
- 在一个Activity执行stop时,也就是ActivityThread.handleStopActivity()的时候,会判断sp里面的任务是否全部更新到磁盘存储中;此时会调用
QueuedWork.waitToFinish()
,此方法会遍历还没有完成同步任务的CountDownLatch.await()
等待 - 一个任务的写入任务完成,会调用
CountDownLatch.count()
将上面阻塞释放 - 继续遍历下一个任务,直到所有任务都同步完
因此上面的wait()操作会阻塞主线程导致anr。也就是我们经常看到的SharedPreference
引发的anr。
得到什么启发:
- 尽量多个更新操作,批量执行;这样只需要同步一次本地磁盘缓存;不然一个Activity里面多次sp写操作,容易出现anr
- 不要将大量数据写入到统一文件中,因为这样会到IO很慢(读取和更新都很慢),因此推荐分多文件存储
- 额外优化:在MultiDex.install()执行
SharedPreference
的初始化操作,将本地磁盘缓存同步到内存中(因为在执行get()和set()操作前,需要wait()这个操作),因此这样能加快主进程的执行。
源码就不解析了,如果需要看详细代码,可以随便晚上搜索,有很多的
ReentrantReadWriteLock(可重入读写锁)
注意下面几点:
- 读锁是共享锁,写锁是独占锁
- 如果当前线程获取了写锁,同时获取读锁,是可以获取的;反之则不行
- 读锁和写锁是互斥的,写锁和写锁是互斥的,读锁和读锁是共享的
- state(锁的标志),前16为用来存储共享锁的标志,后16位存储独占锁的标志(重入,代表这后16为会累加)
具体源码就不解析了,有了上面分析锁的基础,看源码就很简单了。而且网上很多解析这个文章,我就不重复赘述了。