- 内部实现读锁,即可重入共享锁
private final ReentrantReadWriteLock.ReadLock readerLock;
- 内部实现写锁,即可重入独占锁
private final ReentrantReadWriteLock.WriteLock writerLock;
-
final Sync sync;
继承AbstractQueuedSynchronizer,支持锁功能的类。包含公平锁子类FairSync
和非公平锁子类NonfairSync
。
一 读锁
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);
}
public Condition newCondition() {//共享锁不支持条件对象
throw new UnsupportedOperationException();
}
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}
二 写锁
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock( ) {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
三 Sync
- 锁计数,低16位表示独占写锁计数,高16位表示共享读锁计数
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//读锁占用数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//写锁占用数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
3.1 读锁
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
//非本线程已经获取了写独占锁,则不可再获取读锁
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
//线程获取写锁后还可以继续获取读锁,释放写锁后则降级为读锁。
//反之先获取读锁,后获取写锁则不行。
int r = sharedCount(c);
if (r == MAX_COUNT)//读共享锁已达上限,不可再获取
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
//更新读共享锁计数+1
if (r == 0) {//更新首个读锁获取线程的信息,及获取读锁计数
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
//缓存读锁获取统计信息
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
//从threadlocal变量中获取当前线程的读锁统计数据
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;//计数+1
}
return true;
}
}
}
- 父类tryAcquireShared接口实现,获取锁的接口会调用
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
//非公平锁,或公平锁当前线程优先级最高,则先尝试获取锁
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//获取成功,更新统计数据
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
//获取失败,等待队列节点处理
return fullTryAcquireShared(current);
}
/**
* 当前线程获取了写锁或读锁,则可以继续获取读锁
* 其他线程获取了写锁,则当前线程读锁放入等待锁队列中
* 全都是读锁,若有等待写锁的线程节点,则只有已获取过读锁的线程可继续获取读锁,没有获取过读锁的线程则放入等待锁队列中
* 无等待写锁的线程节点,则所有线程都可获取读锁
*/
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
//非当前线程获取的写锁,返回失败,放入等待锁队列中
return -1;
//当前线程获取的写锁,则可以获取读锁。
} else if (readerShouldBlock()) {
//有优先级更高的等待获取锁的节点,如等待写锁的节点
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
//首个获取读锁线程,则可以继续获取读锁
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
//当前线程未获取过读锁,则放入等待锁队列。
return -1;
//当前线程有读锁,则增加读锁统计继续获取读锁
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//获取读锁,增加统计计数
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//修改锁使用计数
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))//更改锁状态
// 若锁已完全释放,返回true,唤醒等待锁队列后续节点尝试获取锁
return nextc == 0;
}
}
3.2 写锁
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
if (c != 0) {//有获取读锁或写锁
int w = exclusiveCount(c);
//读锁不可升级为写锁或其他线程获取了写锁
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
//更新写锁状态
if (!compareAndSetState(c, c + 1))
return false;
//设置锁独占线程
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// Reentrant acquire
setState(c + acquires);
return true;
}
//若写锁获取需要排队,则放入等待锁队列
//否则尝试获取锁
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final boolean tryRelease(int releases) {
if (!isHeldExclusively())//只能释放本线程占用的锁
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
boolean free = exclusiveCount(nextc) == 0;
if (free)//锁占用全释放,则清空锁占用线程信息
setExclusiveOwnerThread(null);
setState(nextc);//更新锁状态
//若此时线程还有读锁,则写锁降级为读锁
return free;
}
四 总结
- 线程a获取写锁后,线程a还可以继续获取读锁和写锁,其他线程不可获取锁
- 线程a获取写锁后再获取读锁,释放写锁后则降级为读锁
- 线程a获取读锁后,所有线程还可以获取读锁,不可获取写锁。