Lock锁
java.util.concurrent.locks
包定义了Lock锁的使用与规范
常用的类或接口主要有ReentrantLock,ReentrantReadWriteLock,Condition
ReentrantLock
所处位置:
java.util.concurrent.locks.ReentrantLock
类结构:
public class ReentrantLock implements Lock, java.io.Serializable
ReentrantLock 实现了Lock接口
- 内部类:
class Sync extends AbstractQueuedSynchronizer
AQS - 内部类:
class NonfairSync extends Sync
非公平锁 - 内部类:
class FairSync extends Sync
公平锁
- 内部类:
构造函数
ReentrantLock 类中带有两个构造函数,一个是默认的构造函数,不带任何参数;一个是带有 fair 参数的构造函数
//无参构造函数 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();// 默认 NonfairSync 非公平锁
}
//带参数构造函数 是否是公平锁的条件
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();//FairSync 公平锁 NonfairSync 非公平锁
}
synchronized
和 ReentrantLock
锁
ReentrantLock 具有与 synchronized
关键字相同的含有隐式监视器锁(monitor
)的基本行为和语义。
synchronized
和 lock
锁的实现 ReentrantLock
区别:
类型 | ReentrantLock | synchronized |
---|---|---|
存在层面 | Java 中的一个接口的实现,是juc提供的API | Java 中的一个关键字,存在于 JVM 层面 |
锁的释放条件 | 支持响应中断,超时,尝试获取锁<br />必须在 finally 关键字中释放锁,不然容易造成线程死锁 | 使用自动释放监视器<br />1.获取锁的线程执行完同步代码后,自动释放<br />2.线程发生异常时,JVM会让线程释放锁 |
获取锁的状态 | 可以判断锁的状态 | 无法判断锁的状态 |
条件队列 | 可关联多个条件队列 | 关联一个条件队列 |
锁实现机制 | 依赖AQS | 监视器模式 |
锁的类型 | 可重入,可中断,可公平锁 | 可重入,不可中断,非公平锁 |
使用场景 | 适用于大量同步阶段 | 适用于少量同步的情况下,性能开销比较大 |
简化版的加锁流程
如果 lock 加锁设置成功,设置当前线程为独占锁的线程;
-
如果 lock 加锁设置失败,还会再尝试获取一次锁数量,
如果锁数量为0,再基于 CAS 尝试将 state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;
如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。
lock()与unlock()源码分析
获取锁 lock()
java.util.concurrent.locks.ReentrantLock#lock
//1.获取锁
public void lock() {
sync.lock();
}
//2.sync 有ReentrantLock的构造方法创建
//无参构造函数 默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();// 默认 NonfairSync 非公平锁
}
//带参数构造函数 是否是公平锁的条件
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();//FairSync 公平锁 NonfairSync 非公平锁
}
//3.非公平锁获取
final void lock() {
if (compareAndSetState(0, 1))// 借助 unsafe 原子性对 state 加1。如果初始值为0,表示没有线程占有锁
setExclusiveOwnerThread(Thread.currentThread());// 设置当前线程独占锁
else
acquire(1);// 公平锁获取方式 尝试获取锁
}
//4.compareAndSetState(0, 1)在没有线程占有锁时为true
//将调用时的当前线程设置为独占锁,其他线程再执行lock()时会调用acquire(1)尝试获取锁
public final void acquire(int arg) {//尝试获取锁
if (!tryAcquire(arg) && // 1.尝试获取锁 tryAcquire返回false表示失败,代表着锁争抢失败,进入排队竞争阶段
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//排队竞争阶段
selfInterrupt();//Thread.currentThread().interrupt(); 当前线程中断
}
tryAcquire(arg)方法详解:
//-----------------非公平锁tryAcquire()---------------
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//当前线程
// 获取当前 AQS 内部状态量
int c = getState();//AQS内部使⽤了⼀个volatile的变量state来作为资源的标识。
// 0 表示无线程占有,直接用 CAS 修改
if (c == 0) {
// 不检查排队情况,直接争抢
if (compareAndSetState(0, acquires)) {// 借助 unsafe 原子性对 state 加 acquires。
setExclusiveOwnerThread(current);// 设置当前线程独占锁
return true;//抢到锁
}
}
// 可重入锁情况
else if (current == getExclusiveOwnerThread()) {
// state 计数增加
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);// state会累加,这就是可重入
return true;//获取到锁
}
return false;
}
//-----------------公平锁tryAcquire()---------------
static final class FairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();//当前线程
int c = getState();//AQS内部使⽤了⼀个volatile的变量state来作为资源的标识。
if (c == 0) {//0 表示无线程占有
if (!hasQueuedPredecessors() && // 判断有没有别的线程排在了当前线程的前面。hasQueuedPredecessors()返回false表示没有
compareAndSetState(0, acquires)) {// 借助 unsafe 原子性对 state 加acquires。如果初始值为0,表示没有线程占有锁
setExclusiveOwnerThread(current);// 设置当前线程独占锁
return true;
}
}
// 可重入锁情况
else if (current == getExclusiveOwnerThread()) {//如果当前线程已是独占锁,再次使用tryAcquire(),state会累加,这就是可重入
int nextc = c + acquires;
if (nextc < 0)// overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);// state会累加,这就是可重入
return true;//获取到锁
}
return false;
}
}
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 方法解读
// 当前线程被包装成EXCLUSIVE排他模式的节点,通过addWaiter方法添加到队列中
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 无限循环
for (;;) {
// 当前节点的前一个节点
final Node p = node.predecessor();
// 如果前一个节点是头结点,表示当前节点适合去 tryAcquire
if (p == head && tryAcquire(arg)) {
// if 获取成功,设置当前节点为头节点,出队列
setHead(node);
// 将前面节点对当前节点的引用清空
p.next = null; // help GC
failed = false;
return interrupted;
}
// 如果返回true,需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
// 借助sun.misc.Unsafe#park 执行阻塞
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
//sun.misc.Unsafe#park
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
// 普通挂起。直到另一个持有锁线程释放锁后,触发下一个线程的 sun.misc.Unsafe#unpark
UNSAFE.park(false, 0L);
setBlocker(t, null);
}
释放锁 unlock()
//释放锁
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒下一个节点,触发 sun.misc.Unsafe#unpark
unparkSuccessor(h);
return true;
}
return false;
}
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
protected final boolean tryRelease(int releases) {//可重入系数
int c = getState() - releases;//当前线程对该锁的可重入次数重新计算
if (Thread.currentThread() != getExclusiveOwnerThread())//如果当前线程没有拥有锁
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//为0,表示没有线程占有锁 清空持有线程
free = true;
setExclusiveOwnerThread(null);
}
setState(c);//设置当前线程对该锁的可重入次数为当前值-releases(1)
return free; //free默认是false,只有c==0时才被置为true,否则都是false代表没有获取到锁
}
ReentrantLock与AQS
AQS(java.util.concurrent.locks.AbstractQueuedSynchronizer
)内提供的接口,由接口实现下面方法
// 尝试获取 独占锁
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
// 尝试释放 独占锁
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
//尝试获取 共享锁
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
//尝试释放 共享锁
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
//判断是否时当前线程在持有锁
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
ReentrantLock如何与AQS关联的?(以非公平锁为例)
加锁 | 解锁 | |
---|---|---|
ReentrantLock api | lock | unlock |
AQS核心方法 | acquire | release |
自定义同步器实现的方法 | tryAcquire、nonfairTryAcquire | tryRelease |
加锁
- 通过ReentrantLock的加锁方法(Lock)进行加锁的操作
- 调用内部类Sync的Lock方法,由于Sync的lock方法是抽象的,根据ReentrantLock初始化选择的公平锁与非公平锁执行相关的内部类的Lock方法,本质上都会执行AQS中的acquire()方法
- AQS的acquire()方法会执行tryAcquire()方法,但是由于tryAcquire()需要自定义同步器实现,因此执行了ReentrantLock中的tryAcquire方法,由于ReentrantLock是通过公平锁与非公平锁内部类实现的tryAcquire()方法,所以会根据锁类型的不同,来执行不同的tryAcquire()方法
- tryAcquire是获取锁逻辑,获取失败后,会执行AQS框架的后续逻辑,与ReentrantLock自定义的同步器无关
解锁
- 通过ReentrantLock的解锁方法(unlock)进行解锁
- unlock方法会调用内部类Sync的release方法,该方法来自于继承的AQS
- release中会调用tryRelease方法,tryRelease方法需要自定义同步器实现,tryRelease只在ReentrantLock中的Sync实现,因此可以看出,释放锁的过程,并不区分是否为公平锁
- 释放成功后,所有处理都由AQS完成,与自定义同步器无关
ReentrantLock使用
使用ReentrantLock实现同步的主要方法:
- lock():阻塞模式来获取锁
- lockInterruptibly:阻塞式获取锁,支持中断
- tryLock():非阻塞模式尝试获取锁
- tryLock(long timeout, TimeUnit unit):同上,支持时间设置
- unlock():释放锁
- newCondition():创建条件变量
- getHoldCount():当前线程对该锁的计数次数
- isHeldByCurrentThread():锁是否被当前线程持有
- isLocked():锁是否已经被某个线程持有
- getQueuedThreads():获取排队的线程列表
ReentrantLock使用示例
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
lock()与unlock()
package com.thread.study;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock1 {
private static ReentrantLock lockParent = new ReentrantLock();//唯一锁
static class IncreTest extends Thread {
private ReentrantLock lock = new ReentrantLock();// 锁实例对象
private static ReentrantLock lockStatic = new ReentrantLock();// 锁类对象
public static int COUNT = 0;
public static int SIZE = 0;
public static int NUM = 0;
@Override
public void run() {
try {
lock.lock();
for (int i = 0; i < 1000000; i++) {
COUNT++;
}
} finally {
lock.unlock();
}
try {
lockParent.lock();
for (int i = 0; i < 1000000; i++) {
SIZE++;
}
} finally {
lockParent.unlock();
}
try {
lockStatic.lock();
for (int i = 0; i < 1000000; i++) {
NUM++;
}
} finally {
lockStatic.unlock();
}
}
}
public static void main(String[] args) throws InterruptedException {
IncreTest increTest1 = new IncreTest();
IncreTest increTest2 = new IncreTest();
increTest1.start();
increTest2.start();
increTest1.join();
increTest2.join();
System.out.println("COUNT:"+IncreTest.COUNT);
System.out.println("SIZE:"+IncreTest.SIZE);
System.out.println("NUM:"+IncreTest.NUM);
}
}
//打印结果
COUNT:1126569
SIZE:2000000
NUM:2000000
总结: ReentrantLock使用lock()与unlock()时使用同一个锁才起效果。
死锁情况
package com.thread.study;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock2 {
private static ReentrantLock lock1 = new ReentrantLock();
private static ReentrantLock lock2 = new ReentrantLock();
static class IncreTest extends Thread {
private boolean flag;
IncreTest(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
try {
if (flag) {
lock1.lock();
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock2.lock();
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
} else {
lock2.lock();
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lock();
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
}
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(Thread.currentThread().getName() + "线程退出");
}
}
}
public static void main(String[] args) throws InterruptedException {
IncreTest increTest1 = new IncreTest(true);
IncreTest increTest2 = new IncreTest(false);
increTest1.start();
increTest2.start();
}
}
lockInterruptibly()
通过lockInterruptibly()方法解决使用ReentrantLock造成线程死锁问题
package com.thread.study;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock2 {
private static ReentrantLock lock1 = new ReentrantLock();//唯一锁
private static ReentrantLock lock2 = new ReentrantLock();//唯一锁
static class IncreTest extends Thread {
private boolean flag;
IncreTest(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
try {
if (flag) {
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
} else {
lock2.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock1.lockInterruptibly();
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(Thread.currentThread().getName() + "线程退出");
}
}
}
public static void main(String[] args) throws InterruptedException {
IncreTest increTest1 = new IncreTest(true);
IncreTest increTest2 = new IncreTest(false);
increTest1.start();
increTest2.start();
Thread.sleep(3000);
if (increTest2.isAlive()) {
increTest2.interrupt();
System.out.println(increTest2.getName() + "中断");
}
}
}
//打印结果
Thread-0线程获取lock1
Thread-1线程获取lock2
Thread-1中断
java.lang.InterruptedException
Thread-1线程退出
Thread-0线程获取lock2
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireInterruptibly(AbstractQueuedSynchronizer.java:898)
Thread-0线程退出
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireInterruptibly(AbstractQueuedSynchronizer.java:1222)
at java.util.concurrent.locks.ReentrantLock.lockInterruptibly(ReentrantLock.java:335)
at com.thread.study.TestLock2$IncreTest.run(TestLock2.java:38)
tryLock
tryLock(long timeout,TimeUnit unit):如果在给定的等待时间内没有被另一个线程占用 ,并且当前线程尚未被保留,则获取该锁( interrupted)。
package com.thread.study;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class TestLock3 {
static class TryLockTest implements Runnable {
private static ReentrantLock lock1 = new ReentrantLock();
@Override
public void run() {
try {
if (lock1.tryLock(3, TimeUnit.SECONDS)) {
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + "线程没有获取到锁");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
System.out.println(Thread.currentThread().getName() + "线程退出");
}
}
}
public static void main(String[] args) {
TryLockTest t1 = new TryLockTest();
new Thread(t1).start();
new Thread(t1).start();
}
}
//打印结果
Thread-0线程获取lock1
Thread-0线程退出
Thread-1线程获取lock1
Thread-1线程退出
//将执行时间改为
Thread.sleep(4000);
//打印结果
Thread-0线程获取lock1
Thread-1线程没有获取到锁
Thread-1线程退出
Thread-0线程退出
//将main改为
public static void main(String[] args) {
TryLockTest t1 = new TryLockTest();
new Thread(t1).start();
TryLockTest t2 = new TryLockTest();
new Thread(t2).start();
}
//打印结果
Thread-0线程获取lock1
Thread-1线程获取lock1
Thread-0线程退出
Thread-1线程退出
总结:tryLock()在调用时锁定未被其他线程持有的锁,如果调用方法时,锁对象被其他线程持有则放弃。tryLock(long timeout,TimeUnit unit)则是在等待时间内锁没有被其他线程持有,并且当前线程没有中断,则获取到锁。
注意:tryLock()在使用时针对的是同一个锁。
使用tryLock()避免死锁
package com.thread.study;
import java.util.concurrent.locks.ReentrantLock;
public class TestTryLock4 {
private static ReentrantLock lock1 = new ReentrantLock();//唯一锁
private static ReentrantLock lock2 = new ReentrantLock();//唯一锁
static class IncreTest extends Thread {
private boolean flag;
IncreTest(boolean flag) {
this.flag = flag;
}
@Override
public void run() {
if (flag) {
while (true) {
try {
if (lock1.tryLock()) {
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock2.tryLock()) {
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
return;
}
}
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(Thread.currentThread().getName() + "线程退出");
}
}
} else {
while (true) {
try {
if (lock2.tryLock()) {
System.out.println(Thread.currentThread().getName() + "线程获取lock2");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (lock1.tryLock()) {
System.out.println(Thread.currentThread().getName() + "线程获取lock1");
return;
}
}
} finally {
if (lock1.isHeldByCurrentThread()) {
lock1.unlock();
}
if (lock2.isHeldByCurrentThread()) {
lock2.unlock();
}
System.out.println(Thread.currentThread().getName() + "线程退出");
}
}
}
}
}
public static void main(String[] args) {
IncreTest increTest1 = new IncreTest(true);
IncreTest increTest2 = new IncreTest(false);
increTest1.start();
increTest2.start();
}
}
//打印结果 是可以自动退出的
Thread-0线程获取lock1
Thread-1线程获取lock2
Thread-0线程退出
Thread-0线程获取lock1
Thread-1线程退出
Thread-1线程获取lock2
Thread-1线程退出
Thread-0线程获取lock2
Thread-0线程退出
Thread-1线程退出
Thread-1线程获取lock2
Thread-1线程获取lock1
Thread-1线程退出