什么是读写锁
在同一时刻可以允许多个读线程访问,或者写线程访问时,所有的读线程和其他写线程均被阻塞的锁。读写锁一分为二,分为一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有提升。
读写锁的继承与实现关系
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable
读写锁自定义同步器源码
自定义同步器的实现类:
abstract static class Sync extends AbstractQueuedSynchronizer
自定义同步器的关键变量和方法
//移位的偏移常量
static final int SHARED_SHIFT = 16;
/* 读锁增加的数量级
* 读锁使用高16位
* 所以读锁增加1就相当于增加了2*16
*/
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
/* 写锁的可重入的最大次数
* 读锁允许的最大数量
*/
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
/*
* 写锁的掩码
* 写锁使用低16位
* 这个掩码为了便于与运算去掉高16位
*/
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/**
* 保存当前线程持有的重入读锁的数目,在读锁重入次数为 0 时移除
*/
private transient ThreadLocalHoldCounter readHolds;
/*
* 最近一个成功获取读锁的线程的计数。
*/
private transient HoldCounter cachedHoldCounter;
/* firstReader是一个在锁空闲的时候将最后一个把共享计数从0改为1的线程,
* 并且从那开始还没有释放读锁。
* 如果不存在则为null。
*/
private transient Thread firstReader = null;
/*
* firstReaderHoldCount 是 firstReader 的重入计数。
*/
private transient int firstReaderHoldCount;
//当前持有读锁的线程数
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//写锁重入次数的计数
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
/**
* 每个线程读取次数的计数
* 缓存在HoldCounter中
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = Thread.currentThread().getId();
}
/**
* 初始化HolderCounter
* 返回每个线程初始化的局部变量的值
*/
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}
Sync() {
//实例化当前线程持有的重入读锁的数目
readHolds = new ThreadLocalHoldCounter();
// 确保 readHolds 的内存可见性,利用 volatile 写的内存语义。
setState(getState());
}
/**
* 如果当前线程在试图获取读锁时返回true
*/
abstract boolean readerShouldBlock();
/**
* 如果当前线程在试图获取写锁时返回true
*/
abstract boolean writerShouldBlock();
自定义同步器的实现方法
tryRelease() 方法:是否允许它在独占模式下释放锁资源
/*
* 是否允许它在独占模式下释放锁资源
*/
protected final boolean tryRelease(int releases) {
// 如果当前线程不是以独占式的方式进行,则抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 计算最新的同步状态值
int nextc = getState() - releases;
// 写锁重入次数的计数是否为0,就是线程是否已经处于初始化状态了
boolean free = exclusiveCount(nextc) == 0;
// 如果free为true,说明当前线程已经处于初始化状态了
if (free)
// 设置拥有独占访问权限的线程为null
setExclusiveOwnerThread(null);
// 原子方式更新最新同步状态值
setState(nextc);
return free;
}
步骤解析:
(1)先判断当前线程是不是以独占的方式进行,计算最新的同步状态值。
(2)如果是独占方式运行的话,那么通过之前计算的最新的同步状态值来判断写锁的重入次数是否为0了。
(3)如果成功释放了写锁,那么设置独占访问权限线程不存在为null。
(4)原子方式更新最新同步状态值,返回是否释放写锁。
tryAcquire() 方法:是否允许它在独占模式下获取同步状态
// 是否允许它在独占模式下获取对象状态
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取当前同步状态
int c = getState();
// 获取写锁重入的次数
int w = exclusiveCount(c);
// 如果当前的同步状态不是初始化状态
if (c != 0) {
// 如果写重入的次数为0或者当前线程不是拥有独占访问权限的线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写重入的次数加上当前线程请求获取得写重入次数大于了最大写重入次数,抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 原子的方式设置新的同步状态值
setState(c + acquires);
return true;
}
// 如果当前线程尝试获取写锁或者当前同步状态值不等于c,则返回false
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
// 设置拥有独占访问权限的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
步骤解析:
(1)如果有线程占据着锁,那么原子的方式更新同步状态值。
(2)如果当前尝试获取写锁或者同步状态值不为最新的同步状态值,则返回false。(写锁是互斥的,已经存在写锁了,如果还有线程尝试获取写锁,会无法保证原子性和一致性的问题)。
(3)设置独占访问权限的线程为当前线程,返回true。
tryAcquireShared()方法:是否允许在共享模式下获取同步状态值
protected final int tryAcquireShared(int unused) {
// 获取当前线程
Thread current = Thread.currentThread();
// 获取同步状态值
int c = getState();
// 如果写锁重入的次数不为0并且当前线程不是独占线程
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
// 获取持有读锁的线程数
int r = sharedCount(c);
// 如果当前线程试图获取读锁并且持有读锁的线程数小于最大线程数并且当前同步状态值为c
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
// 如果当前持有读锁的线程数为0
if (r == 0) {
// 将当前线程设置为第一个读线程
firstReader = current;
// 读线程持有计数设置为1
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 如果当前线程是第一个读线程
// 读线程持有计数加1
firstReaderHoldCount++;
} else {
// 最近一个成功获取读锁的线程的计数器
HoldCounter rh = cachedHoldCounter;
// 如果最近一个成功获取读锁的线程的计数为null或者线程不是当前线程
if (rh == null || rh.tid != current.getId())
// 最近一个成功获取读锁的线程计数器被设置为当前读计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)// 如果读计数器的计数为0
// 设置读计数器
readHolds.set(rh);
// 读计数器加1
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current);
}
解释:
① 获取当前线程和同步状态值。
② 判断写锁重入次数不为0并且是当前线程继续向下执行,获取持有读锁的线程数,cas方式更新持有读锁的线程数加1。
③ -- 如果当前持有读锁的线程数为0,那么直接设置当前线程为第一个读线程,读线程持有计数加1,。
--如果当前持有读锁I的线程数不为0,但并且第一个读线程为当前线程,就是读锁重入了,直接读线程持有计数加1。
--如果当前持有读锁的线程数不为0并且第一个读线程不是当前线程,设置最近一个成功获取读锁的线程为空或者对应的线程ID不是当前的线程ID,那么最近一个成功获取读锁线程被设置为当前线程。最近一个成功获取读锁线程的计数未0,那么直接设置最近一个成功获取读锁线程作为当前线程读计数器。
tryReleaseShared() 方法:
// 是否允许它在共享模式下释放锁资源
protected final boolean tryReleaseShared(int unused) {
// 获取当前的线程
Thread current = Thread.currentThread();
// 如果当前线程是第一个读线程
if (firstReader == current) {
// 如果第一个读线程的计数为1的情况
if (firstReaderHoldCount == 1)
// 设置读线程为null
firstReader = null;
else
// 否则就将读线程计数减一
firstReaderHoldCount--;
} else {
// 获取最近一个成功获取读锁的线程的计数。
HoldCounter rh = cachedHoldCounter;
// 如果最近一个成功获取读锁的线程的计数为null或者线程不是当前线程
if (rh == null || rh.tid != current.getId())
// 赋值当前线程持有的读锁的数目
rh = readHolds.get();
// 获取当前线程持有的读锁的数目
int count = rh.count;
// 如果读锁的数目小于等于1,就将当前线程持有的读锁移除
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 死循环
for (;;) {
// 原子方式获取当前同步状态
int c = getState();
// 计算最新的同步状态值
int nextc = c - SHARED_UNIT;
// 如果当前线程的同步状态值等于c,那么就将当前线程的同步状态值更新为nextc
if (compareAndSetState(c, nextc))
// 最新同步状态值为0,说明共享释放成功
return nextc == 0;
}
}
步骤解析:
(1)首先查看当前线程是不是第一个获取读锁的线程,如果第一个获取读锁的线程计数为1,那么就是当前线程,直接将第一个获取读锁的线程设置为null,否则将计数减1。
(2)如果当前线程不是第一个获取读锁的线程,获取最近一个成功获取读锁的线程,如果为null或者不是当前线程,就将重入锁的次数赋值给最近一个成功获取读锁的线程,然后将重入锁的次数减1。
(3)死循环+cas方式来设置最新同步状态。
tryWriteLock() 方法:
/**
* 是否允许尝试获取写锁
*/
final boolean tryWriteLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 原子方式获取同步状态
int c = getState();
// 如果当前同步状态不是初始化状态
if (c != 0) {
// 获取线程写锁重入的次数
int w = exclusiveCount(c);
// 如果写锁重入的次数为0或者独占访问的权限的线程不是当前线程,返回false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
// 如果写锁重入的次数等于了最大次数,抛出异常
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
// 如果当前线程的同步状态不为c,则返回false
if (!compareAndSetState(c, c + 1))
return false;
// 设置拥有独占访问权限的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
步骤解析:
(1)如果有线程占据着锁,获取写锁重入的次数。
(2)注意:
---- 如果写锁重入的次数为0,那么说明此时没有线程占据写锁
---- 如果写锁重入的次数不为0,那么说明有线程占据写锁,但是此线程不是当前线程作为独占线程。
以上都情况都是不符合同步状态值不为0的情况。
(3)如果写锁重入的次数大于了临界值,将抛出异常。
(4)设置拥有独占访问权限的线程为当前线程。
tryReadLock() 方法:是否允许尝试获取读锁
/**
* 是否允许尝试获取读锁
*/
final boolean tryReadLock() {
// 获取当前线程
Thread current = Thread.currentThread();
// 死循环
for (;;) {
// 原子方式获取同步状态
int c = getState();
// 如果写锁重入的次数不为0并且拥有独占访问线程的权限不为当前线程,返回false
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
// 获取读锁的线程数量
int r = sharedCount(c);
// 如果读锁的线程数量等于最大数量,抛出异常
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
// 如果当前同步状态是c,那么就将当前同步状态更新为c+SHARED_UNIT
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 如果读锁的线程数量为0
if (r == 0) {
// 设置第一个读线程为当前线程
firstReader = current;
// 读线程持有的计数为1
firstReaderHoldCount = 1;
} else if (firstReader == current) {// 如果第一个读线程为当前线程
// 第一个读线程的计数加1
firstReaderHoldCount++;
} else {
// 最近一个成功获取读锁的线程的计数器
HoldCounter rh = cachedHoldCounter;
// 如果计数器为null或者计数器的线程不为当前线程
if (rh == null || rh.tid != current.getId())
// 最近一个成功获取读锁的线程计数器被设置为当前读计数器
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)//如果读计数器的计数为0
// 设置读计数器
readHolds.set(rh);
// 读计数器的计数加1
rh.count++;
}
return true;
}
}
}
步骤解析:
(1)如果写锁重入的次数不为0,并且当前线程没有独占访问权限,返回false。
(2)如果当前同步状态为c:如果读线程的数量为0,设置第一个读线程为当前线程,读线程的计数为1,如果第一个读线程是当前线程,就将计数加1,否则将最近一个成功获取读锁的线程的计数器取出,判断是否为null或者是否不为当前线程,将重新设置最近一个成功获取读锁的线程的计数器,判断如果计数器的计数值为0,那么设置读计数器。最后将读计数器加1。
(3)如果当前同步状态不为c,那么就进行死循环。继续执行(2)。
读写锁中读锁源码
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;
/**
* 使用实现类构造读锁
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 获取读锁
*/
public void lock() {
sync.acquireShared(1);
}
/**
* 仅当写入锁在调用期间未被另一个线程保持时,再获取读锁
*/
public boolean tryLock() {
return sync.tryReadLock();
}
/**
* 尝试释放读锁资源
*/
public void unlock() {
sync.releaseShared(1);
}
}
读写锁中写锁源码
public static class WriteLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -4992448646407690164L;
private final Sync sync;
/**
* 使用实现类构造写锁
*/
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
/**
* 获取写锁
*/
public void lock() {
sync.acquire(1);
}
/**
* 仅当写入锁在调用期间未被另一个线程保持时获取该锁。
*/
public boolean tryLock( ) {
return sync.tryWriteLock();
}
/**
* 释放写锁
*/
public void unlock() {
sync.release(1);
}
}
阅读总结
(1)读写锁中的自定义同步器也是基于AQS实现的。
(2)读写锁的读写设计:
采用了一个同步状态值分成读锁和写锁两瓣来使用。读锁使用高16位进行读操作,写锁使用低16位进行写操作。
(3)读写锁的读写策略:
由于读写锁中的写锁采用AQS的独占锁实现的,所以只要有写线程在写,其他线程的写和读都是阻塞的;由于读写锁中的读锁采用AQS的共享锁实现的,所以有读线程在读,那么其他线程也可以读。
(4)读写锁的重入:
读锁重入:根据tryReadLock中compareAndSetState方法可以看出使用cas更新当前同步状态值为c + SHARED_UNIT(SHARED_UNIT = 1<< 16 读锁使用高16位,相当于高位加1)。
写锁重入:根据tryWriteLock中compareAndSetState方法可以看出使用cas更新当前同步状态值为c + 1(写锁使用低16位,直接加1就可以了)。
读线程在读,其他线程也可以读,但是如果其他线程获取了写锁,那么更新的数据对本身以外其他读锁的线程是不可见的,所以锁不可以升级。
(5)锁降级(写锁降级为读锁):
根据tryAcquireShared中的写锁重入次数不为0并且是当前线程,仍然可以获取持有读锁的线程数,并且还可以cas的方式更新读锁的同步状态值高位加1。
---------------------------该源码为jdk1.7版本的