在多线程场景中使用生产者消费者模式较为流行,一般主要的实现方式有以下几种
1.最古老的等待通知方式 wait/notify/notifyAll,这3个方法需要跟synchronized配套
/***
* 生产者消费者模式之wait/notify/notifyAll
*/
public class ProducerConsumer1 {
private LinkedList<String> products = new LinkedList<>();
private static final int MAX_SIZE = 10; // 最大库存
private Object lock = new Object();
/**
* 生产商品,添加到库存
*/
public void produce(String e) {
synchronized (lock) {
try {
while (products.size() >= MAX_SIZE) {
lock.wait();
}
products.add(e);
System.out.println(" 放入一个商品: " + e + ",总库存为:" + products.size());
lock.notifyAll();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
/**
* 消费商品,从库存中移除
*/
public void consume() {
synchronized (lock) {
try {
while (products.size() == 0) {
lock.wait();
}
String product = products.removeFirst();
System.out.println(" 消费一个商品: " + product + ", 总库存为: " + products.size());
lock.notifyAll();
} catch (Exception e) {
e.printStackTrace();
}
}
}
/***
* 生产者线程
*/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(" 商品 " + i);
}
}
}
/***
* 消费者线程
*/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
consume();
}
}
}
public static void main(String[] args) {
ProducerConsumer1 lc = new ProducerConsumer1();
new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start();
}
}
2.lock condition的await/signall/signalAll,这3个方法需要跟lock/unlock配套
await/signal/signalAll是在JAVA进程中实现的,无需切换到内核态
wait/notify/notifyAll 是通过monitor管程来实现的,有可能会切换到内核态
一般不推荐使用wait/notify/notifyAll
/***
* 生产者消费者模式之await/signal/signalAll
*/
public class ProducerConsumer2 {
private LinkedList<String> products = new LinkedList<>();
private static final int MAX_SIZE = 10; // 最大库存
private Lock lock = new ReentrantLock();// 资源锁
private Condition condition = lock.newCondition();// 库存非满和非空条件
/**
* 生产商品,添加到库存
*/
public void produce(String e) {
lock.lock();
try {
while (products.size() >= MAX_SIZE) {
condition.await();
}
products.add(e);
System.out.println(" 放入一个商品: " + e + ",总库存为:" + products.size());
condition.signalAll();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
lock.unlock();
}
}
/**
* 消费商品,从库存中移除
*/
public void consume() {
lock.lock();
try {
while (products.size() == 0) {
condition.await();
}
String product = products.removeFirst();
System.out.println(" 消费一个商品: " + product + ", 总库存为: " + products.size());
condition.signalAll();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
/***
* 生产者线程
*/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(" 商品 " + i);
}
}
}
/***
* 消费者线程
*/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
consume();
}
}
}
public static void main(String[] args) {
ProducerConsumer2 lc = new ProducerConsumer2();
new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start();
}
}
3.通过BlockingQueue的实现方式最简单直接,也是最常用的方式
/***
* 生产者消费者模式之BlockingQueue
*/
public class ProducerConsumer3 {
private LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
/**
* 生产商品,添加到库存
*/
public void produce(String e) {
try {
queue.put(e);
System.out.println(" 放入一个商品: " + e + ",总库存为:" + queue.size());
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* 消费商品,从库存中移除
*/
public void consume() {
String product = null;
try {
product = queue.take();
System.out.println(" 消费一个商品: " + product + ", 总库存为: " + queue.size());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/***
* 生产者线程
*/
private class Producer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
produce(" 商品 " + i);
}
}
}
/***
* 消费者线程
*/
private class Customer implements Runnable {
public void run() {
for (int i = 0; i < 20; i++) {
consume();
}
}
}
public static void main(String[] args) {
ProducerConsumer3 lc = new ProducerConsumer3();
new Thread(lc.new Producer()).start();
new Thread(lc.new Customer()).start();
}
}