一、介绍
前面我们讲解过ReentrantLock,它是一个独占锁,也就是说它是无区别的处理任何操作
但是在实际情景中,假如我们对一个共享元素同时进行读操作时,是不会出现线程安全问题的,由此我们引申出一个读写分离的锁
共存关系 | 读 | 写 |
---|---|---|
读 | 是 | 否 |
写 | 否 | 否 |
二、结构
ReentrantReadWriteLock
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private static final long serialVersionUID = -6992448646407690164L;
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;
}
ReadLock
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
//征用一个ReentrantReadWriteLock的Sync对象
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//读锁使用的是共享锁
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
//重写tryLock()方法
public boolean tryLock() {
return sync.tryReadLock();
}
//重写unLock()方法
public void unlock() {
sync.releaseShared(1);
}
//读锁不能存在条件队列
public Condition newCondition() {
throw new UnsupportedOperationException();
}
...
WriteLock
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
//征用一个ReentrantReadWriteLock的Sync对象
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
//写锁使用的独占锁
public void lock() {
sync.acquire(1);
}
//重写tryLock()方法
public boolean tryLock( ) {
return sync.tryWriteLock();
}
//重写unLock()方法
public void unlock() {
sync.release(1);
}
//写锁可以拥有条件等待队列
public Condition newCondition() {
return sync.newCondition();
}
...
Sync
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;
//这里将为int型的state值按高低16位一分为二
//高位为读锁
//低位为写锁
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//右移16位获取高位值
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//确保拿下的只有低位16位值
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
...
}
ReadLock.lock() 获取锁
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
ReentrantReadWriteLock中重写的tryAcquireShared()尝试获取读锁
步骤:
1)通过同步状态低16位判断,如果存在写锁且当前线程不是获取写锁的线程,返回-1,获取读锁失败;否则执行步骤2)。
2)通过readerShouldBlock判断当前线程是否应该被阻塞,如果不应该阻塞则尝试CAS同步状态;否则执行3)。
3)第一次获取读锁失败,通过fullTryAcquireShared再次尝试获取读锁。
protected final int tryAcquireShared(int unused) {
/**演练:
1. 如果另一个线程持有写锁定,则失败。
2. 否则,此线程有资格进入锁定wrt状态,因此请问是否由于队列策略而应阻塞。
如果不是,请尝试按CASing状态授予许可并更新计数。
请注意,此步骤不会检查可重入获取,这会推迟到完整版本
避免在更典型的非重入情况下必须检查保留计数。
3. 如果第2步失败,或者由于线程显然不合格,或者CAS失败或饱和,请使用完全重试循环链接到版本。
*/
//获取当前线程
Thread current = Thread.currentThread();
int c = getState();
//判断队列是否存在写锁,存在且当前线程不是拥有写锁的线程则尝试获取锁失败
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
//获取读状态值
int r = sharedCount(c);
//判断当前线程是否应该阻塞
if (!readerShouldBlock() &&
//读锁取的是高16位,所以必须小于MAX_COUNT,否则会有int溢出
r < MAX_COUNT &&
//CAS操作加1
compareAndSetState(c, c + SHARED_UNIT)) {
//获取锁成功
//如果r==0说明之前没有读锁
if (r == 0) {
//第一个读锁节点为当前线程
firstReader = current;
//第一个读锁线程重入次数为1
firstReaderHoldCount = 1;
//第一个读锁线程等于当前线程
} else if (firstReader == current) {
//重入次数加1
firstReaderHoldCount++;
//若当前获取读锁线程不是第一个读锁线程
} else {
//读锁重入计数缓存器
HoldCounter rh = cachedHoldCounter;
//若缓存不属于当前线程
// 再从ThreadLocal中获取
// readHolds本身是一个ThreadLocal,里面存储的是HoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
//rh的次数为0,则将它存入ThreadLocal
else if (rh.count == 0)
readHolds.set(rh);
//都不是则直接+1
rh.count++;
}
//成功获取锁
return 1;
}
//继续尝试获取读锁
return fullTryAcquireShared(current);
}
ReadLock.unlock()
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
ReentrantReadWriteLock中重写的tryAcquireShared()尝试释放读锁
步骤:
1)如果当前线程是第一个读线程,则直接进行释放锁或者重入次数-1操作
2)如果当前线程不是第一个读线程,则找到对饮的重入次数,进行释放或者-1操作
3)自旋更改state值
protected final boolean tryReleaseShared(int unused) {
//获取当前线程
Thread current = Thread.currentThread();
//第一线程是当前线程的情况下
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
//重入次数为1直接释放锁
if (firstReaderHoldCount == 1)
firstReader = null;
//不为1重入次数-1
else
firstReaderHoldCount--;
//第一线程不是当前线程
} else {
//找到当前线程的缓存
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
//移除缓存
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
//重入次数减1
--rh.count;
}
//自旋减1
for (;;) {
//CAS高位减1
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
WriteLock.lock()
public void lock() {
sync.acquire(1);
}
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantReadWriteLock中重写的tryAcquire()尝试获取写锁
步骤:
1)判断同步状态state是否为0。如果state!=0,说明已经有其他线程获取了读锁或写锁,执行2);否则执行5)。
2)判断同步状态state的低16位(w)是否为0。如果w=0,说明其他线程获取了读锁,返回false;如果w!=0,说明其他线程获取了写锁,执行步骤3)。
3)判断获取了写锁是否是当前线程,若不是返回false,否则执行4);
4)判断当前线程获取写锁是否超过最大次数,若超过,抛异常,反之更新同步状态(此时当前线程以获取写锁,更新是线程安全的),返回true。
5)此时读锁或写锁都没有被获取,判断是否需要阻塞(公平和非公平方式实现不同),如果不需要阻塞,则CAS更新同步状态,若CAS成功则返回true,否则返回false。如果需要阻塞则返回false。
protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
//独占锁的获取次数
int w = exclusiveCount(c);
//状态不为0,说明有锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
//如果独占锁没有被获取过,说明存在读锁。
//或者当前线程不是获取写锁的线程
//获取锁失败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//如果溢出,抛错误
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
// 到这里说明当前线程已经获取过写锁,这里是重入了,直接把state加1即可
setState(c + acquires);
return true;
}
//c==0
//如果该线程需要阻塞,或者CAS失败,则获取锁失败
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
//获取锁成功,将当前线程设为占有锁的线程
setExclusiveOwnerThread(current);
return true;
}
WriteLock.unlock()
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
ReentrantReadWriteLock中重写的tryRelease()尝试获取写锁
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}