前言
在实现生产者消费者问题时,可以采用三种方式:
1.使用Object的wait/notify的消息通知机制;
2.使用Lock的Condition的await/signal的消息通知机制;
3.使用BlockingQueue实现。本文主要将这三种实现方式进行总结归纳。
一.wait/notify实现生产者和消费者模式
public class ProductAndConsumer{
/*
1.生产者不断的向里面加东西
2.消费者不断的取东西
3.生产者发现容器里满了就wait
4.消费者发送容器里没东西了就wait
5.生产者生产东西唤醒消费者
6.消费者消费东西唤醒生产者
7.锁对象要是同一个对象才行*/
public static void main(String[] args){
ProductAndConsumer ps = new ProductAndConsumer();
Resources resources = ps.new Resources(10,new LinkedList<Product>());
new Thread(ps.new Producer(resources)).start();
new Thread(ps.new Consumer(resources)).start();
}
class Product{
String name;
public Product(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
//生产者
class Producer implements Runnable{
Resources resources;
@Override
public void run() {
while (true){
String name = (System.currentTimeMillis()+">"+(int)(Math.random()*100))+"号产品";
System.out.println("生产者生产了:"+name);
resources.put(new Product(name));
}
}
public Producer(Resources resources) {
this.resources = resources;
}
}
//消费者
class Consumer implements Runnable{
Resources resources;
@Override
public void run() {
while (true){
Product product = resources.pop();
System.out.println("消费者消费了:"+product.name);
}
}
public Consumer(Resources resources) {
this.resources = resources;
}
}
class Resources{
int maxLength;
private List<Product> productList;
synchronized void put(Product product){
while (productList.size()==maxLength){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productList.add(product);
this.notifyAll();
}
synchronized Product pop(){
while(productList.isEmpty()){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = productList.remove(0);
this.notifyAll();
return product;
}
public Resources(int maxLength, List<Product> productList) {
this.maxLength = maxLength;
this.productList = productList;
}
}
}
二.使用Lock的Condition的await和signal的方式
Lock&Condition
Lock用于控制多线程对同一状态的顺序访问,保证该状态的连续性。
Condition用于控制多线程之间的、基于该状态的条件等待。
重点:
Condition它更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。
public class ProductAndConsumerLock {
class Resources{
int maxLength;
private List<Product> productList;
ReentrantLock lock;
Condition full;
Condition empty;
void put(Product product){
lock.lock();
while (productList.size()==maxLength){
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
productList.add(product);
empty.signalAll();
lock.unlock();
}
Product pop(){
lock.lock();
while(productList.isEmpty()){
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Product product = productList.remove(0);
full.signalAll();
lock.unlock();
return product;
}
public Resources(int maxLength, List<Product> productList, ReentrantLock lock, Condition full, Condition empty) {
this.maxLength = maxLength;
this.productList = productList;
this.lock = lock;
this.full = full;
this.empty = empty;
}
}
public static void main(String[] args) {
ProductAndConsumerLock ps = new ProductAndConsumerLock();
ReentrantLock lock = new ReentrantLock();
Condition full = lock.newCondition();
Condition empty = lock.newCondition();
Resources resources = ps.new Resources(10, new LinkedList<Product>(), lock, full,empty);
new Thread(ps.new Producer(resources)).start();
new Thread(ps.new Consumer(resources)).start();
}
}
三.使用BlockingQuene实现
BlockingQueue即阻塞队列,它是基于ReentrantLock,依据它的基本原理,我们可以实现生产者与消费者模式,大致如下图所示:
在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。
BlockingQueue的入队和出队的方法有很多,下面是各种方法的区别
1.入队
offer(E e):如果队列没满,立即返回true; 如果队列满了,立即返回false-->不阻塞
put(E e):如果队列满了,一直阻塞,直到队列不满了或者线程被中断-->阻塞
offer(E e, long timeout, TimeUnit unit):在队尾插入一个元素,,如果队列已满,则进入等待。
2.出队
poll():如果没有元素,直接返回null;如果有元素,出队
take():如果队列空了,一直阻塞,直到队列不为空或者线程被中断-->阻塞
poll(long timeout, TimeUnit unit):如果队列不空,出队;如果队列已空且已经超时,返回null;如果队列已空且时间未超时,则进入等待。
在这里,我们使用put和take的方式实现,生产者消费者模式
public class ProductAndConsumerBlockQuene {
public static void main(String[] args){
ProductAndConsumerBlockQuene ps = new ProductAndConsumerBlockQuene();
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();
BlockingQueue<Product> queue = new LinkedBlockingQueue<>(15);
new Thread(ps.new Producer(queue)).start();
new Thread(ps.new Consumer(queue)).start();
}
class Product{
String name;
public Product(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
//生产者
class Producer implements Runnable{
private BlockingQueue<Product> queue;
@Override
public void run() {
while (true){
String name = (System.currentTimeMillis()+">"+(int)(Math.random()*100))+"号产品";
System.out.println("生产者生产了:"+name);
try {
queue.put(new Product(name));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Producer(BlockingQueue<Product> queue) {
this.queue = queue;
}
}
//消费者
class Consumer implements Runnable{
private BlockingQueue<Product> queue;
@Override
public void run() {
while (true){
Product product = null;
try {
product = queue.take();
System.out.println("消费者消费了:"+product.name);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public Consumer(BlockingQueue<Product> queue) {
this.queue = queue;
}
}
}