一、ReentrantLock
从jdk发行1.5版本之后,在原来synchronize的基础上,增加了重入锁 ReentrantLock。
首先来看一个实例:
class ReentrantLockTestDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2
, 10
, 1
, TimeUnit.HOURS
, new ArrayBlockingQueue<>(4)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
MyRunnable myRunnable = new MyRunnable();
threadPoolExecutor.submit(myRunnable);
threadPoolExecutor.submit(myRunnable);
}
}
未使用ReentrantLock:
static class MyRunnable implements Runnable {
@Override
public void run() {
for (int i = 0; i < 3; i++) {
System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
}
}
}
未使用ReentrantLock打印的结果是没有顺序,杂乱无章的
index=0 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-1
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-1
index=2 thread=pool-1-thread-2
使用ReentrantLock加入锁:
static class MyRunnable implements Runnable {
@Override
public void run() {
ReentrantLock reentrantLock = new ReentrantLock();
// 加锁
reentrantLock.lock();
try {
for (int i = 0; i < 3; i++) {
System.out.println("index=" + i + " thread=" + Thread.currentThread().getName());
}
} finally {
// 解锁
reentrantLock.unlock();
}
}
}
打印出的结果,是有顺序的
index=0 thread=pool-1-thread-1
index=1 thread=pool-1-thread-1
index=2 thread=pool-1-thread-1
index=0 thread=pool-1-thread-2
index=1 thread=pool-1-thread-2
index=2 thread=pool-1-thread-2
这就是锁的作用,它是互斥的,当一个线程持有锁的时候,其他线程只有等待,待待线程执行结束,释放锁,等待的线程再通过竞争得到锁。
二、Condition
通常在开发并发程序的时候,会碰到需要停止正在执行业务A,来执行另一个业务B,当业务B执行完成后,业务A继续执行。就可以通过ReentrantLock和Condtion等待/唤醒来完成这样的操作。在LinkedBlockingQueue的put/take操作中就有使用到。
相较于synchronize的wait()、notify()/notifyAll()则更有针对性、灵活性。可以唤醒符合某个条件线程去执行,而notify/notifyAll()则是随机通知的,具有很大的不可控性。
1、使用Condition实现线程等待和唤醒
class ConditionTestDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3
, 10
, 1
, TimeUnit.HOURS
, new ArrayBlockingQueue<>(4)
, Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
MyService service = new MyService();
// 线程1、2是符合条件A的
threadPoolExecutor.submit(new RunnableA(service));
threadPoolExecutor.submit(new RunnableA(service));
// 线程3是符合条件B的
threadPoolExecutor.submit(new RunnableB(service));
// 主线程sleep2s后,主动唤醒符合条件B的线程。再由线程B去唤醒符合条件A的两个线程。
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "等待2s 去唤醒符合条件B的所有线程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
service.signalB();
}
static class RunnableA implements Runnable {
private MyService service;
public RunnableA(MyService service) {
this.service = service;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "符合条件A--------" + DateUtils.INSTANCE.getCurrDataStr());
service.awaitA();
}
}
static class RunnableB implements Runnable {
private MyService service;
public RunnableB(MyService service) {
this.service = service;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "符合条件B--------" + DateUtils.INSTANCE.getCurrDataStr());
service.awaitB();
}
}
static class MyService {
ReentrantLock reentrantLock = new ReentrantLock();
Condition threadACondition = reentrantLock.newCondition();
Condition threadBCondition = reentrantLock.newCondition();
/**
* 符合添加A的线程进入等待
*/
public void awaitA() {
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取到锁。被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
threadACondition.await();
System.out.println(Thread.currentThread().getName() + "被唤醒--------" + DateUtils.INSTANCE.getCurrDataStr());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 符合添加B的线程进入等待
*/
public void awaitB() {
reentrantLock.lock();
try {
System.out.println(Thread.currentThread().getName() + "获取到锁,被要求等待--------" + DateUtils.INSTANCE.getCurrDataStr());
threadBCondition.await();
System.out.println(Thread.currentThread().getName() + "被唤醒--------" + DateUtils.INSTANCE.getCurrDataStr());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
/**
* 唤醒符合条件A的所有线程
*/
public void signalA() {
reentrantLock.lock();
try {
threadACondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public void signalB() {
reentrantLock.lock();
try {
// 唤醒符合条件B的所有线程
threadBCondition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
System.out.println(Thread.currentThread().getName() + "等待2s 再去唤醒符合条件A的所有线程" + "--------" + DateUtils.INSTANCE.getCurrDataStr());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 在唤醒符合条件B的所有线程后,2s再去唤醒符合条件A的所有线程
signalA();
}
}
}
打印结果:
pool-1-thread-1符合条件A--------------22/04/01 21:37:21
pool-1-thread-3符合条件B--------------22/04/01 21:37:21
pool-1-thread-1获取到锁。被要求等待-----22/04/01 21:37:21
pool-1-thread-2符合条件A--------------22/04/01 21:37:21
pool-1-thread-2获取到锁。被要求等待-----22/04/01 21:37:21
pool-1-thread-3获取到锁,被要求等待-----22/04/01 21:37:21 // 三个线程一开启,就被要求等待
main等待2s 去唤醒符合条件B的所有线程-----22/04/01 21:37:23 // 主线程等待2s后,主动去唤醒符合条件B的线程
pool-1-thread-3被唤醒----------------22/04/01 21:37:23 // 符合条件B的线程被唤醒
main等待2s 再去唤醒符合条件A的所有线程---22/04/01 21:37:25 // 符合条件B的线程被唤醒后,再等待2s,去唤醒符合条件A的所有线程
pool-1-thread-1被唤醒----------------22/04/01 21:37:25
pool-1-thread-2被唤醒----------------22/04/01 21:37:25 // 线程1、2符合条件A,被同一时间唤醒
分别实例化了两个Condition对象,都是使用同一个lock注册。注意 conditionA对象的等待和唤醒只对使用了conditionA的线程有用,同理 conditionB对象的等待和唤醒只对使用了conditionB的线程有用。
2、模拟生产/消费者
static class MyService {
private ReentrantLock mReentrantLock = new ReentrantLock();
private Condition mCondition = mReentrantLock.newCondition();
private boolean isFull;
private int index;
public void put() {
mReentrantLock.lock();
try {
// 如果队列已满,则进入等待中
if (isFull) {
System.out.println("队列已满,生产者进入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
mCondition.await();
}
System.out.println("开始生产,index=" + index + "需要2s----" +DateUtils.INSTANCE.getCurrDataStr());
// 每隔2s放一个元素
index++;
Thread.sleep(2000);
// 通知取
isFull = true;
mCondition.signalAll();
System.out.println("结束生产,index=" + index + "唤醒消费者进行生产----" + DateUtils.INSTANCE.getCurrDataStr());
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
public void take() {
mReentrantLock.lock();
try {
// 如果队列已空,则进入等待中
if (!isFull) {
System.out.println("队列已空,消费者进入等待中----" + DateUtils.INSTANCE.getCurrDataStr());
mCondition.await();
}
System.out.println("开始消费,index=" + index + "需要3s----" + DateUtils.INSTANCE.getCurrDataStr());
index--;
Thread.sleep(3000);
isFull = false;
// 提醒生产者
mCondition.signalAll();
System.out.println("结束消费,index=" + index + "唤醒生产者进行生产----" + DateUtils.INSTANCE.getCurrDataStr());
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
}
生产者类:
static class PutRunnable implements Runnable {
MyService myService;
public PutRunnable(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.put();
}
}
}
消费者类:
static class TakeRunnable implements Runnable {
MyService myService;
public TakeRunnable(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.take();
}
}
}
启动类:
class ConditionDemo {
public static void main(String[] args) {
MyService myService = new MyService();
ExecutorService executorService = Executors.newFixedThreadPool(4);
executorService.execute(new PutRunnable(myService));
executorService.execute(new TakeRunnable(myService));
}
}
打印结果:
开始生产,index=0需要2s----22/04/02 13:21:15
结束生产,index=1唤醒消费者进行生产----22/04/02 13:21:17
队列已满,生产者进入等待中----22/04/02 13:21:17
开始消费,index=1需要3s----22/04/02 13:21:17
结束消费,index=0唤醒生产者进行生产----22/04/02 13:21:20
队列已空,消费者进入等待中----22/04/02 13:21:20
开始生产,index=0需要2s----22/04/02 13:21:20
结束生产,index=1唤醒消费者进行生产----22/04/02 13:21:22
队列已满,生产者进入等待中----22/04/02 13:21:22
开始消费,index=1需要3s----22/04/02 13:21:22
结束消费,index=0唤醒生产者进行生产----22/04/02 13:21:25
队列已空,消费者进入等待中----22/04/02 13:21:25
开始生产,index=0需要2s----22/04/02 13:21:25
...
3、顺序执行线程
充分发掘Condition的灵活性,可以用它来实现顺序执行线程。
class MyService {
private ReentrantLock mReentrantLock = new ReentrantLock();
// 有三个线程,所有注册三个Condition
private Condition mConditionA = mReentrantLock.newCondition();
private Condition mConditionB = mReentrantLock.newCondition();
private Condition mConditionC = mReentrantLock.newCondition();
// 通过index控制下一个执行的线程
private int index;
public void actionA() {
mReentrantLock.lock();
try {
// 只有index 不等于 0,则进入等待中
if (index != 0) {
mConditionA.await();
}
System.out.println("A执行");
Thread.sleep(1000);
index = 1;
mConditionB.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
public void actionB() {
mReentrantLock.lock();
try {
// 只有index 不等于 1,则进入等待中
if (index != 1) {
mConditionB.await();
}
System.out.println("B执行");
Thread.sleep(1000);
index = 2;
mConditionC.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
// 只有index==2时,才执行下面操作,否则休眠
public void actionC() {
mReentrantLock.lock();
try {
// 只有index 不等于 2,则进入等待中
if (index != 2) {
mConditionC.await();
}
System.out.println("C执行");
Thread.sleep(1000);
index = 0;
mConditionA.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
mReentrantLock.unlock();
}
}
}
业务类:
class RunnableA implements Runnable {
MyService myService;
public RunnableA(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionA();
}
}
}
class RunnableB implements Runnable {
MyService myService;
public RunnableB(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionB();
}
}
}
class RunnableC implements Runnable {
MyService myService;
public RunnableC(MyService myService) {
this.myService = myService;
}
@Override
public void run() {
while (true) {
myService.actionC();
}
}
}
启动类:
class ConditionDemo {
public static void main(String[] args) {
MyService myService = new MyService();
ExecutorService executorService = Executors.newFixedThreadPool(4);
// 这边故意打乱启动顺序
executorService.execute(new RunnableB(myService));
executorService.execute(new RunnableA(myService));
executorService.execute(new RunnableC(myService));
}
}
打印结果:
A执行
B执行
C执行
A执行
B执行
C执行
...