并发源码分析篇:
基于上篇文章分析了同步器的作用之后,接下来了解ReentrantReadWriteLock读写锁也就简单多了,ReentrantReadWriteLock比ReentrantLock性能又更佳了一些,针对读操作锁共享,针对写操做锁互斥,当有线程在对数据写入的时候读操作和写操作都会阻塞,都是读操作的时候锁共享,线程不会阻塞。读写锁的基本用法如下
public class ReadWriteLockDemo {
static ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
static ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
static ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
static HashMap<String,Object> map = new HashMap<>();
public static void main(String[] args) {
new Thread(()->{
getObject("name");
}).start();
new Thread(()->{
writeObject("name","xiaoming");
}).start();
}
public static void writeObject(String key,Object value) {
try {
writeLock.lock();
map.put(key,value);
} finally {
writeLock.unlock();
}
}
public static Object getObject(String key) {
try {
readLock.lock();
return map.get(key);
} finally {
readLock.unlock();
}
}
}
ReadLock和WriteLock的获取是通过ReentrantReadWriteLock类提供的方法获取的,可以看到ReentrantReadWriteLock的构造函数如下
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
通过ReentrantReadWriteLock构造函数初始化了Sync和readerLock,writerLock 属性,此时的Sync的实际对象是NonfairSync。并且ReadLock,WriteLock通过传参构造将ReentrantReadWriteLock本身传递进去而且将
ReadLock,WriteLock类中的Sync也赋值为ReentrantReadWriteLock类中Sync,所以ReadLock,WriteLock中Sync都是NonfairSync。
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
接下来可以先看下WriteLock中的lock方法
public void lock() {
sync.acquire(1);
}
这里走的还是AQS中的acquire方法,这里除了tryAcquire的实现有稍微的区别其他的都和ReentrantLock一样,没获取到锁就会被构成链表阻塞。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
继续走tryAcquire方法,该方法实际调用的是ReentrantReadWriteLock类中的重写方法。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
这里,我们分两种情况来分析,先来分析假如有写锁先获取到锁,后面的读操作和写操作会是什么情况。同样线程A表示写入的线程,线程B为读线程,线程C为写入的线程。
线程A先执行这段代码,获取到state值为0,所以会执行writerShouldBlock方法,实际上执行的就是NonfairSync的writerShouldBlock方法,直接返回的false标识,所以继续会走下面的CAS将状态改为1,并且独占线程设置为线程A,返回true。线程A就这样成功的获取到锁了,并且可以正常执行逻辑。读线程B此时进来,调用readLock.lock()
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
所以这里就直接返回了-1,读线程B被阻塞。同理线程C进来也将阻塞。 而unlock方法跟ReentrantLock是一样的逻辑,都是将state设置为0并且唤醒链表中的下一个线程获取锁。
从这里可以看出,如果有写线程获取锁,其他线程都将会被阻塞,跟ReentrantLock一样都是互斥的
在来分析如果是读线程先获取到锁,后续将是一个什么情况。同样假设A为读线程,B为读线程,C为写线程。
首先,线程A调用lock方法,这里我们就要看ReadLock中的lock方法
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
tryAcquireShared同样掉调用的是ReentrantReadWriteLock中的tryAcquireShared方法,接下来继续分析该方法的实现
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
首先是获取state的值,此时state=0,接着获取独占线次数exclusiveCount(c)
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
这里先说明一下,exclusiveCount方法就是用state & (2^16-1)的值就是独占次数,为什么这样就能获得独占次数,其实从下面的compareAndSetState(c, c + SHARED_UNIT))方法就可以看出,共享锁也就是读线程是共享的,每次获取共享锁state的值就会+2^16 这就说明了如果只存在共享锁,exclusiveCount(state)的值一定会是0,所以回过头看写锁中的这段代码
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
state !=0 w==0说明只可能是有共享锁,所以获取写锁失败,如果w!=0 继续会判断是否为同一个线程,不是的话也写入失败。
接着来分析A线程,此时独占次数和共享次数都是0,程序接着走readerShouldBlock方法
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}
该方法的作用就是判断阻塞队列中第一个元素是否是独占线程,也就是写数据线程,此时链表中是没有数据的,所以放回false。所以A线程最终执行下面代码
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
}
最终返回1,获取到了锁。
这是读线程B又调用了lock方法
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
这时候c=2^16,exclusiveCount(c)=0,r=1,并且链表中依然是空的,所以代码将走else逻辑
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
用HoldCounter对象存储每个读线程获取锁的次数,一个线程读锁也是可以重入的,所以没有完全释放锁的话就获取不到写锁。
如果此时链表中有个写线程阻塞了,!readerShouldBlock()就会为false,代码将走fullTryAcquireShared逻辑
/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
方法注释就可以知道该方法的作用
读取的完整版本,处理CAS未命中*和tryAcquireShared中未处理的重入读取(直接有道翻译的)。
!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)
其实就是处理这三个状态值分别为false的情况,所以现在是链表中有写线程阻塞,代码会执行
else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
所以获取读锁失败,读线程会加到链表之后。
总结
这篇文章没有在重复链表中的各个状态,可以先看ReentrantLock源码分析,有详细的AQS分析 ReentrantLock源码分析
ReentrantReadWriteLock锁其实可以简单理解为有写线程占据锁的话,其他线程都阻塞,如果有读线程占据锁的话,写线程阻塞,读线程可以正常获取锁,如过有写线程在阻塞等待锁,读线程也需要被阻塞