读写锁ReentrantReadWriteLock简介
ReentrantReadWriteLock是ReadWriteLock接口的实现,ReentrantReadWriteLock中有两个静态内部类:ReadLock读锁和WriteLock写锁,这两个锁实现了Lock接口,ReentrantReadWriteLock支持可重入,同步功能依赖自定义同步器(AbstractQueuedSynchronizer)实现,读写状态就是其同步器的同步状态。同步状态由一个整型变量表示,因为这个变量需要表示多个线程的读和写的状态,因此读写锁在实现上将该变量的高16位表示读,低16位表示写。
写锁的获取和释放
写锁WriteLock是支持重进入的排他锁。如果当前线程已经获取了写锁,则增加写状态。如果当前线程在获取读锁时,读锁已经被获取或者该线程不是已获取写锁的线程,则当前线程进入等待状态。读写锁确保写锁的操作对读锁可见。写锁释放每次减少写状态,当前写状态为0时表示写锁已背释放。
读锁的获取与释放
读锁ReadLock是支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(写状态为0)时,读锁总是能够被成功地获取,而所做的也只是增加读状态(线程安全)。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已经被获取,则进入等待状态。
锁降级
锁降级指的是写锁降级成为读锁。锁降级是指把持住当前拥有的写锁的同时,再获取到读锁,随后释放写锁的过程。以下是oracle官网的对于锁降级的示例代码:
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}
代码中声明了一个volatile类型的cacheValid变量,保证其可见性。首先获取读锁,如果cache不可用,则释放读锁,获取写锁,在更改数据之前,再检查一次cacheValid的值,然后修改数据,将cacheValid置为true,然后在释放写锁前获取读锁;此时,cache中数据可用,处理cache中数据,最后释放读锁。这个过程就是一个完整的锁降级的过程,目的是保证数据可见性,如果当前的线程C在修改完cache中的数据后,没有获取读锁而是直接释放了写锁,那么假设此时另一个线程T获取了写锁并修改了数据,那么C线程无法感知到数据已被修改,则数据出现错误。如果遵循锁降级的步骤,线程C在释放写锁之前获取读锁,那么线程T在获取写锁时将被阻塞,直到线程C完成数据处理过程,释放读锁。
示例
以上的处理过程稍显抽象,因此,我写了一个具体的案例来演示以上过程,代码如下:
public class ReadWriteLockTest {
private volatile boolean cacheValid = false;
private int currentValue = 0;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private Lock readLock = lock.readLock();
private Lock writeLock = lock.writeLock();
/**
* 测试用例
* @throws InterruptedException
*/
@Test
public void testLockDowngrading() throws InterruptedException {
CountDownLatch start = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(2);
ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 100, TimeUnit.SECONDS, new ArrayBlockingQueue
<Runnable>(10));
for (int i = 0; i < 2; i ++){
int finalI = i;
executor.execute(new Thread(new Runnable() {
@Override
public void run() {
Thread.currentThread().setName("thread-" + finalI);
try {
start.await();
TimeUnit.SECONDS.sleep(finalI);
System.out.println("after sleep " + finalI + " seconds, excute " + Thread.currentThread().getName());
cacheValid = false;
processCachedData(finalI);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
end.countDown();
}
}
}));
}
start.countDown();
end.await();
}
/**
* 锁降级过程
* @param num
*/
private void processCachedDataDownGrading(int num){
readLock.lock();
if(!cacheValid){
//必须先释放写锁
readLock.unlock();
writeLock.lock();
try{
//在更新数据之前做二次检查
if(!cacheValid){
System.out.println(Thread.currentThread().getName() + " has updated!");
//将数据更新为和线程值相同,以便验证数据
currentValue = num;
cacheValid = true;
readLock.lock();
}
}finally {
writeLock.unlock();
}
}
try{
//模拟5秒的处理时间,并打印出当前值,在这个过程中cacheValid可能被其他线程修改,锁降级保证其他线程写锁被阻塞,数据不被改变
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + ": " + currentValue);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.getReadHoldCount() > 0){
readLock.unlock();
}
}
}
/**
* 无锁降级的过程
* @param num
*/
private void processCachedData(int num){
readLock.lock();
if(!cacheValid){
readLock.unlock();
writeLock.lock();
try{
if(!cacheValid){
System.out.println(Thread.currentThread().getName() + " has updated!");
currentValue = num;
cacheValid = true;
}
}finally {
writeLock.unlock();
}
}
try{
//模拟5秒的处理时间,并打印出当前值,在这个过程中cacheValid可能被其他线程修改,无锁降级过程,其他线程此时可能获取写锁,并更改书数据,导致后面的数据错误
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + ": " + currentValue);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if(lock.getReadHoldCount() > 0){
readLock.unlock();
}
}
}
}
在上述的测试代码中,有两个方法,一个是processCachedDataDownGrading,该方法模拟锁降级的过程,另一个是processCachedData,模拟无锁降级的过程。
在主测试代码中,起了两个线程,每个线程执行前先将cacheValid置为false,同时,为了模拟处理数据5秒中外部线程的想要执行的数据变更,两个线程的实际执行行为相差1秒。
如果两个线程中执行的方法是processCachedData(无锁降级过程),那么输出为:
after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-1 has updated!
thread-0: 1
thread-1: 1
从输出看,线程0马上执行(不是指run前,指run中我们想测试的内容),线程1过了1秒执行,两个线程中的数据均被改变,但是最终两个线程中的值均为1(实际我们期望线程0中的数据应该为0),导致这个结果的原因就是在线程0处理数据(sleep5秒)的过程中,线程1获取了写锁并更新了数据,从而线程0的数据被更新。
如果两个线程中执行的方法是processCachedDataDownGrading(锁降级过程),那么输出为:
after sleep 0 seconds, excute thread-0
thread-0 has updated!
after sleep 1 seconds, excute thread-1
thread-0: 0
thread-1 has updated!
thread-1: 1
从输出看,线程0马上执行,线程1过了1秒执行,两个线程中的数据均被改变,但是最终两个线程中的值分别为0和1,符合期望,因为这个过程是锁降级的过程,线程0在更新数据之后,释放写锁之前,获取了读锁,并在处理完数据之后才将其释放,因此线程1在线程0在处理数据时获取写锁被阻塞,从thread-1 has updated! 语句输出在thread-0: 0 之后也可看出,从而保证了数据0的数据没有问题。
以上是我个人的一些理解,如有问题,欢迎邮件至wgy@live.cn探讨,多谢!