在讲AQS之前,我们先熟悉一下,多线程开发中要注意的问题,这也是AQS致力于要解决的问题
在多线程开发的过程中会涉及到两类问题:
- 多个线程之间资源竞争的问题
- 线程之间的通信问题
java最原始的解决方案:
- 使用synchronized关键字给资源加锁,以解决资源竞争问题
- 使用Object.wait,notify,以解决线程之间的通信问题 调用之前必须先获得锁
下面使用最经典的生产者和消费者模式,来解释这两个问题:
需求:
1.多个生产者同时生产,多个消费者同时消费
2.一共可以生产5000个商品,生产完成之后,生产者自动停止
3.一共可以消费5000个商品,消费完成之后,消费者自动停止
4.存储商品的容器大小为100
商品:
public class Product {
/**
* 生产者名称
*/
private String producer;
/**
* 序列号
*/
private Integer serialNumber;
public Product(String producer, Integer serialNumber) {
this.producer = producer;
this.serialNumber = serialNumber;
}
@Override
public String toString() {
return "生产者:" + producer + ", 序列号:" + serialNumber;
}
}
生产者:
当容器满的时候,调用lock.wait,当前线程会阻塞,直到被其他线程唤醒,才会继续执行;
当生产了一件之后,调用lock.notifyAll,唤醒其他等待的线程
public class Producer {
/**
* 生产的数量
*/
private AtomicInteger produceNum = new AtomicInteger(0);
private final Object lock;
private List<Product> products;
public Producer(Object lock, List<Product> products) {
this.lock = lock;
this.products = products;
}
/**
*
* @return 生产了几个
* @throws Exception
*/
public int produce() throws Exception {
synchronized(lock) {
// 如果已经生产了5000个,则直接终端当前线程,并通知其他线程
if(produceNum.get() >= 5000) {
lock.notifyAll();
throw new InterruptedException(Thread.currentThread().getName() + "已经生产了5000件,无需再生产了");
}
// 如果容器中的数量大于或者等于100,等待消费者消费
if(products.size() >= 100) {
System.out.println("#####################生产者开始等待#####################");
lock.wait();
return 0;
}
// 生产一件,并通知所有的等待者
produceNum.getAndIncrement();
Product product = new Product(Thread.currentThread().getName(), produceNum.get());
products.add(product);
System.out.println("++++++++++++++++++生产[" + product.toString() + "]++++++++++++++++++,size=" + products.size());
lock.notifyAll();
return 1;
}
}
}
消费者:
当容器为空时,调用lock.wait,当前线程会阻塞,直到被其他线程唤醒,才会继续执行;
当消费完一件后,需要调用lock.notifyAll去通知其他等待的线程
public class Consumer {
/**
* 生产的数量
*/
private AtomicInteger consumeNum = new AtomicInteger(0);
private final Object lock;
private List<Product> products;
public Consumer(Object lock, List<Product> products) {
this.lock = lock;
this.products = products;
}
public int consumer() throws Exception {
synchronized(lock) {
// 如果已经消费了5000个,则直接终端当前线程,并通知其他线程
if(consumeNum.get() >= 5000) {
throw new InterruptedException(Thread.currentThread().getName() + "已经消费了5000件,无需再消费了");
}
// 如果容器中的没有产品了
if(products.size() <= 0) {
System.out.println("********************消费者开始等待********************");
lock.wait();
return 0;
}
// 消费一件,并通知所有的等待者
consumeNum.getAndIncrement();
Product product = products.remove(0);
System.out.println("----------------------消费[" + product.toString() + "]----------------------,size=" + products.size());
lock.notifyAll();
return 1;
}
}
}
执行逻辑:
@Test
public void producerAndConsumerTest() {
// 用于装产品的容器
List<Product> products = Lists.newArrayList();
Object lock = new Object();
final Producer producer = new Producer(lock, products);
final StringBuilder msg = new StringBuilder();
Thread p1 = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
try {
i += producer.produce();
} catch (Exception e) {
msg.append("*********************p1共生产了" + i + "件产品\n");
System.out.println(e.getMessage());
break;
}
}
}
}, "p1");
Thread p2 = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
try {
i += producer.produce();
} catch (Exception e) {
msg.append("*********************p2共生产了" + i + "件产品\n");
System.out.println(e.getMessage());
break;
}
}
}
}, "p2");
final Consumer consumer = new Consumer(lock, products);
Thread c1 = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
try {
i += consumer.consumer();
} catch (Exception e) {
msg.append("*********************c1共消费了" + i + "件产品\n");
System.out.println(e.getMessage());
break;
}
}
}
}, "c1");
Thread c2 = new Thread(new Runnable() {
@Override
public void run() {
int i = 0;
while (true) {
try {
i += consumer.consumer();
} catch (Exception e) {
msg.append("*********************c2共消费了" + i + "件产品\n");
System.out.println(e.getMessage());
break;
}
}
}
}, "c2");
p1.start();
p2.start();
c1.start();
c2.start();
try {
Thread.sleep(1000);
} catch (Exception e) {
// ignore
}
System.out.println(msg.toString());
}
运行结果截图:
图片.png
其实使用java原生的方法,存在一个小的问题,当生产者生产了一件商品之后,需要调用notifyAll方法去唤醒所有的等待线程,我们无法对等待的线程做一个分组,好在AQS解决了这个问题,以ReentrantLock为例,来解释一下AQS如何解决这个问题的
使用ReentrantLock实现生产者和消费者模型
未完待续。。。