文章结构:
1)ReentrantLock介绍
2)使用demo
3)形象说明其原理
4)源码阅读分析
5)思考几个问题
一、ReentrantLock介绍:
(1)简介:
ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。
ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待)。
(2)对比synchronized归纳为三个方面:
1)可重入互斥锁:
可重入:意思就是可被单个线程多次获取。
synchronized也是可重入的
2)公平与非公平:
synchronized是非公平的。
ReentrantLock有两种模式,默认是非公平。
在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。
3)等待可中断
也就是持有锁的线程长期不释放时,正等待的线程可选择放弃。不像synchronized会导致死锁,然后交给JVM处理死锁。
二、使用demo:
我们用最经典的生产者消费者代码来演示ReentrantLock。
(1)纯ReentrantLock使用:
定义仓库
public class Depot {
private int size; // 仓库的实际数量
private Lock lock; // 独占锁
public Depot() {
this.size = 0;
this.lock = new ReentrantLock();
}
public void produce(int val) {
lock.lock();
try {
size += val;
System.out.println("生产者"+Thread.currentThread().getName()+" 生产"+val+"数量"+ " 仓库数量变为:"+ size);
} finally {
lock.unlock();
}
}
public void consume(int val) {
lock.lock();
try {
size -= val;
System.out.println("消费者"+Thread.currentThread().getName()+" 消费"+val+"数量"+ " 仓库数量变为:"+ size);
} finally {
lock.unlock();
}
}
};
定义生产者消费者
public class Customer {
private Depot depot;
public Customer(Depot depot) {
this.depot = depot;
}
// 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
public void consume(final int val) {
new Thread() {
public void run() {
depot.consume(val);
}
}.start();
}
}
public class Producer {
private Depot depot;
public Producer(Depot depot) {
this.depot = depot;
}
// 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
public void produce(final int val) {
new Thread() {
public void run() {
depot.produce(val);
}
}.start();
}
}
测试
public class Test {
public static void main(String[] args) {
Depot depot = new Depot();
Producer producer = new Producer(depot);
Customer customer = new Customer(depot);
producer.produce(70);
producer.produce(120);
customer.consume(60);
customer.consume(150);
producer.produce(130);
}
}
运行结果:
生产者Thread-0 生产70数量 仓库数量变为:70
消费者Thread-3 消费150数量 仓库数量变为:-80
生产者Thread-4 生产130数量 仓库数量变为:50
消费者Thread-2 消费60数量 仓库数量变为:-10
生产者Thread-1 生产120数量 仓库数量变为:110
分析:
1)Depot 是个仓库。可以通过生产者去生产货物,通过消费者去消费货物。
2)通过ReentrantLock独占锁,可以锁住仓库,每个线程的修改都是原子性质,并且是有序的。
3)但是我们可以发现一个问题。现实中,仓库的容量不可能为负数,仓库的容量是有限制的。所以基于这样的生产消费模型,有实际业务需求,我们又需要利用ReentrantLock去设计。
(2)结合Condition条件队列解决我们的实际需求:
public class Depot {
private int capacity; // 仓库的容量
private int size; // 仓库的实际数量
private Lock lock; // 独占锁
private Condition fullCondtion; // 生产条件
private Condition emptyCondtion; // 消费条件
public Depot(int capacity) {
this.capacity = capacity;
this.size = 0;
this.lock = new ReentrantLock();
this.fullCondtion = lock.newCondition();
this.emptyCondtion = lock.newCondition();
}
public void produce(int val) {
lock.lock();
try {
// left 表示“想要生产的数量”(有可能生产量太多,需多此生产)
int left = val;
while (left > 0) {
// 库存已满时,等待“消费者”消费产品。
while (size >= capacity) {
fullCondtion.await();
}
// 获取“实际生产的数量”(即库存中新增的数量)
// 如果“库存”+“想要生产的数量”>“总的容量”,则“实际增量”=“总的容量”-“当前容量”。(此时填满仓库)
// 否则“实际增量”=“想要生产的数量”
int inc = (size + left) > capacity ? (capacity - size) : left;
size += inc;
left -= inc;
System.out.println(
"生产者" + Thread.currentThread().getName() + " 本线程想生产" + val + " 本次生产后还要生产" + left + " 仓库增加" + inc + " 仓库目前实际数量"
+ size);
// 通知“消费者”可以消费了。
emptyCondtion.signal();
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
public void consume(int val) {
lock.lock();
try {
// left 表示“客户要消费数量”(有可能消费量太大,库存不够,需多此消费)
int left = val;
while (left > 0) {
// 库存为0时,等待“生产者”生产产品。
while (size <= 0) {
emptyCondtion.await();
}
// 获取“实际消费的数量”(即库存中实际减少的数量)
// 如果“库存”<“客户要消费的数量”,则“实际消费量”=“库存”;
// 否则,“实际消费量”=“客户要消费的数量”。
int dec = (size < left) ? size : left;
size -= dec;
left -= dec;
System.out.printf(
"消费者" + Thread.currentThread().getName() + " 本线程想消费" + val + " 本次消费后还要消费" + left + " 仓库减少" + dec + " 仓库目前实际数量"
+ size);
fullCondtion.signal();
}
} catch (InterruptedException e) {
} finally {
lock.unlock();
}
}
public String toString() {
return "capacity:" + capacity + ", actual size:" + size;
}
}
生产者消费者:
public class Customer {
private Depot depot;
public Customer(Depot depot) {
this.depot = depot;
}
// 消费产品:线程从仓库中消费产品。一个线程代表一个消费者
public void consume(final int val) {
new Thread() {
public void run() {
depot.consume(val);
}
}.start();
}
}
public class Producer {
private Depot depot;
public Producer(Depot depot) {
this.depot = depot;
}
// 消费产品:线程向仓库中生产产品。一个线程代表一个生产者
public void produce(final int val) {
new Thread() {
public void run() {
depot.produce(val);
}
}.start();
}
}
测试:
public class Test {
public static void main(String[] args) {
Depot depot = new Depot(100);
Producer producer = new Producer(depot);
Customer customer = new Customer(depot);
producer.produce(70);
producer.produce(120);
customer.consume(60);
customer.consume(150);
producer.produce(130);
}
}
结果:
生产者Thread-0 本线程想生产70 本次生产后还要生产0 仓库增加70 仓库目前实际数量70
消费者Thread-2 本线程想消费60 本次消费后还要消费0 仓库减少60 仓库目前实际数量10
生产者Thread-4 本线程想生产130 本次生产后还要生产40 仓库增加90 仓库目前实际数量100
消费者Thread-3 本线程想消费150 本次消费后还要消费50 仓库减少100 仓库目前实际数量0
生产者Thread-4 本线程想生产130 本次生产后还要生产0 仓库增加40 仓库目前实际数量40
消费者Thread-3 本线程想消费150 本次消费后还要消费10 仓库减少40 仓库目前实际数量0
生产者Thread-1 本线程想生产120 本次生产后还要生产20 仓库增加100 仓库目前实际数量100
消费者Thread-3 本线程想消费150 本次消费后还要消费0 仓库减少10 仓库目前实际数量90
生产者Thread-1 本线程想生产120 本次生产后还要生产10 仓库增加10 仓库目前实际数量100
从结果来看,解决了仓库的容量不可能为负数,仓库的容量是有限制的这两个实际业务需求。
(3)对比一下使用synchronized如何解决我们的这两个实际业务需求。
三、形象说明其原理:
背景:一群人排队领钱。一个家庭的人同属一个线程。
(1)公平锁:
1)非重入情况
2)重入情况:
一家人,就是同一个线程,那就优先。
(2)非公平锁:
直接尝试插队。
尝试失败,那就去队尾
(3)如果加入条件队列呢?
1)公平锁:
2)非公平锁:
(4)如果多个条件队列呢?
1)公平锁下:
然后变成这样的队列
2)非公平锁下:
去争夺锁的占有权
如果成功占有
四、源码阅读分析:
//实现Lock接口
public class ReentrantLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = 7373984872572414699L;
/** Synchronizer providing all implementation mechanics */
// 同步队列
private final Sync sync;
//抽象同步器Sync继承AQS,子类可以非公平或者公平锁
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
// 获取锁
abstract void lock();
// 非公平锁的尝试获取
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
// 获取状态
int c = getState();
if (c == 0) {
// 跟公平锁获取的区别时,这里少了判断队列是否为空的函数hasQueuedPredecessors
// 即不管排队队列是否为空,该线程都将直接尝试获取锁
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
else if (current == getExclusiveOwnerThread()) {// 当前线程拥有该锁
int nextc = c + acquires;// 增加重入次数
if (nextc < 0) // 超过了int的表示范围
throw new Error("Maximum lock count exceeded");
// 设置状态
setState(nextc);
return true;
}
return false;
}
//tryRelease方法获取状态并减去releases的值,如果为0表示锁完全被释放
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
//只有持有锁的线程才能进行释放动作
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
//锁被释放,独占线程设置为null
setExclusiveOwnerThread(null);
}
//更改状态
setState(c);
return free;
}
// 判断资源是否被当前线程占有
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
// 新生一个条件
final ConditionObject newCondition() {
return new ConditionObject();
}
// 返回资源的占用线程
final Thread getOwner() {
return getState() == 0 ? null : getExclusiveOwnerThread();
}
// 返回状态
final int getHoldCount() {
return isHeldExclusively() ? getState() : 0;
}
// 资源是否被占用
final boolean isLocked() {
return getState() != 0;
}
/**
* Reconstitutes the instance from a stream (that is, deserializes it).
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
setState(0); // reset to unlocked state
}
}
// 非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
// 获得锁
final void lock() {
// 比较并设置状态成功,状态0表示锁没有被占用
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
// 锁已经被占用,或者set失败;以独占模式获取对象
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 公平锁
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
// 以独占模式获取对象,忽略中断
acquire(1);
}
// 尝试公平获取锁
protected final boolean tryAcquire(int acquires) {
// 获取当前线程
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {// 状态为0,锁没被占用
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
// 设置当前线程独占
setExclusiveOwnerThread(current);
return true;
}
}
//如果c != 0,表示该锁已经被线程占有,则判断该锁是否是当前线程占有,若是设置state,否则直接返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)// 超过了int的表示范围
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
public ReentrantLock() {
// 默认非公平策略
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
//可以传递参数确定采用公平策略或者是非公平策略,参数为true表示公平策略
sync = fair ? new FairSync() : new NonfairSync();
}
//委托给Sync同步器
public void lock() {
sync.lock();
}
//委托给Sync同步器
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
//委托给Sync同步器
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}
//委托给Sync同步器
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
//委托给Sync同步器
public void unlock() {
sync.release(1);
}
//委托给Sync同步器
public Condition newCondition() {
return sync.newCondition();
}
//委托给Sync同步器
public int getHoldCount() {
return sync.getHoldCount();
}
//委托给Sync同步器
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
//委托给Sync同步器
public boolean isLocked() {
return sync.isLocked();
}
//委托给Sync同步器
public final boolean isFair() {
return sync instanceof FairSync;
}
//委托给Sync同步器
protected Thread getOwner() {
return sync.getOwner();
}
//委托给Sync同步器
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
//委托给Sync同步器
public final boolean hasQueuedThread(Thread thread) {
return sync.isQueued(thread);
}
//委托给Sync同步器
public final int getQueueLength() {
return sync.getQueueLength();
}
//委托给Sync同步器
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
//委托给Sync同步器,管理等待队列
public boolean hasWaiters(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.hasWaiters((AbstractQueuedSynchronizer.ConditionObject)condition);
}
//委托给Sync同步器,管理等待队列
public int getWaitQueueLength(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitQueueLength((AbstractQueuedSynchronizer.ConditionObject)condition);
}
//委托给Sync同步器,管理等待队列
protected Collection<Thread> getWaitingThreads(Condition condition) {
if (condition == null)
throw new NullPointerException();
if (!(condition instanceof AbstractQueuedSynchronizer.ConditionObject))
throw new IllegalArgumentException("not owner");
return sync.getWaitingThreads((AbstractQueuedSynchronizer.ConditionObject)condition);
}
public String toString() {
Thread o = sync.getOwner();
return super.toString() + ((o == null) ?
"[Unlocked]" :
"[Locked by thread " + o.getName() + "]");
}
}
五、思考几个问题
(1)关键的CAS方法的设计
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
对应unsafe类compareAndSwapInt方法的四个参数分别是:
1)var1:需要改变的对象
2)var2:偏移量(即之前求出value offset的值--在本对象中变量位置,对象中要修改变量的偏移量)
3)var3:期待的值
4)var4:更新后的值
如果this.value的值与expect这个值相等,那么则将value修改为update这个值,并返回一个true。
如果调用该方法时,this.value的值与expect这个值不相等,那么不做任何操作,并返回一个false。
(2)为什么要自己产生中断?
就是线程等候可以自己中断也可以别人中断。
即使在等待队列中,线程在阻塞中被中断唤醒而获得CPU的执行权,但是如果此线程前面还有线程,根据公平性,依旧不能拿到锁,再次阻塞。所以要维持中断状态,重新产生中断。
(3)理解下流程图中所说的waitStatus
1)设计原因:
如果获取锁失败,判断是否应该挂起当前线程,可见,挂起线程是有条件的。
这是AQS静态内部类Node的成员变量,用于记录Node对应的线程等待状态。
2)waitSatus值含义:
1- SIGNAL
从前面的代码状态转换可以看得出是前面有线程在运行,需要前面线程结束后,调用unpark()方法才能激活自己,值为:-1
2- CANCELLED
当AQS发起取消或fullyRelease()时,会是这个状态。值为1,也是几个状态中唯一一个大于0的状态,所以前面判定状态大于0就基本等价于是CANCELLED的意思。
通俗来说,因超时或中断,节点会被设为取消状态,被取消状态则不该去竞争锁,只能保持取消状态不变,不能转换状态。这个状态的节点会踢出队列,被GC收回。
3- CONDITION
线程基于Condition对象发生了等待,进入了相应的队列,自然也需要Condition对象来激活,值为-2。
4- PROPAGATE
读写锁中,当读锁最开始没有获取到操作权限,得到后会发起一个doReleaseShared()动作,内部也是一个循环,当判定后续的节点状态为0时,尝试通过CAS自旋方式将状态修改为这个状态,表示节点可以运行。
5- 状态0
初始化状态,也代表正在尝试去获取临界资源的线程所对应的Node的状态。