在实际的软件开发过程中,经常会碰到如下场景:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者;
生产者和消费者之间通过缓冲区(通常是一个阻塞队列)实现通讯, 生产者将产生的数据放入缓冲区,消费者从缓冲区中获取数据。
举个栗子
去食堂吃饭,食堂的叔叔阿姨会先将饭做好,放到食堂窗口,同学们会去食堂打饭。
生产者(食堂的叔叔阿姨) -> 生产数据(做饭) -> 缓冲区(食堂窗口) -> 消费数据(打饭) -> 消费者(同学)
生产者消费者实现思路
生产者和消费者的任务很明确,生产者只管生产数据,然后添加到缓冲队列。而消费者只管从缓冲队列中获取数据
可以说生产者消费者都很无脑,而缓冲队列则要忙一些,他起到了一个平衡生产者和消费者的作用。
如果生产者生产速度过快,消费者消费的很慢,并且缓冲队列达到了最大长度时。缓冲队列会阻塞生产者,让生产者停止生产,等待消费者消费了数据后,再唤醒生产者
同理,当消费者消费速度过快时,队列为空时。缓冲队列则会阻塞消费者,待生产者向队列添加数据后,再唤醒消费者
实现
通过上述的分析后,我们来用最基本的Java代码实现一下
我们先来定义一下Consumer 和Producer ,他们的逻辑比较简单,这里我们只循环十次模拟一下生产消费的场景。Buffer 为缓冲区,我们待会再看
/**
* 消费者
*/
class Consumer extends Thread {
private Buffer buffer;
private int number;
public Consumer(Buffer b, int number) {
buffer = b;
this.number = number;
}
public void run() {
int value;
for (int i = 0; i < 10; i++) {
// 从缓冲区中获取数据
value = buffer.get();
try {
// 模拟消费数据
sleep(1000);
} catch (InterruptedException e) {
}
System.out.println("消费者 #" + this.number + " got: " + value);
}
}
}
/**
* 生产者
*/
class Producer extends Thread {
private Buffer buffer;
private int number;
public Producer(Buffer b, int number) {
buffer = b;
this.number = number;
}
public void run() {
for (int i = 0; i < 10; i++) {
try {
// 模拟生产数据
sleep(500);
} catch (InterruptedException e) {
}
// 将数据放入缓冲区
buffer.put(i);
System.out.println("生产者 #" + this.number + " put: " + i);
}
}
}
可以看到 Consumer 和Producer 没有什么逻辑,只是对缓冲区的读写操作,下面我们来重点看一下 Buffer的实现
/**
* 缓冲区
*/
class Buffer {
private List<Integer> data = new ArrayList<>();
private static final int MAX = 10;
private static final int MIN = 0;
public synchronized int get() {
while (MIN == data.size()) {
try {
wait();
} catch (InterruptedException e) {
}
}
Integer i = data.remove(0);
notifyAll();
return i;
}
public synchronized void put(int value) {
while (MAX == data.size()) {
try {
wait();
} catch (InterruptedException e) {
}
}
data.add(value);
notifyAll();
}
}
-
分析
put(): 生产者向缓冲区写入数据的操作,当MAX == data.size()
时,就是我们刚刚所说的生产者速度过快,消费者速度过慢的情况,这个时候身为 "缓冲区" 要平衡一下,调用 Object.wait(),让当前生产者线程进入挂起状态,等待消费者消费数据后将其唤醒当
MAX > data.size()
时,向ArrayList中添加数据,并尝试唤醒正在等待的消费者(这一步是必须的)get(): 消费者向缓冲区读数据的操作,和上述逻辑相反,当
MIN == data.size()
时,这时消费者速度太快,生产者太慢,队列中已经没有数据了。"缓冲区" 再一次站了出来,通过wait(),让当前消费者线程进入挂起状态,等待生产者生产数据后将其唤醒当
MIN < data.size()
时,取出ArrayList中第一条数据,并尝试唤醒正在等待的生产者
上述案例完整代码 ProducerConsumer.java
上述使用最基本的Java代码实现生产者消费者模式,实际开发中我们可能会使用BlockingQueue
、ReentrantLock
、ThreadPoolExecutor
这些更成熟的轮子,但是一通百通
关于上述案例的思考
为什么缓冲区的判断条件是
while(condition)
而不是if(condition)
?
答:防止线程被错误的唤醒
举例:当有两个消费者线程wait() 时,此时生产者在队列里放入了一条数据,并调用notifyAll(), 两个消费者线程被唤醒,第一个消费者成功取出队列中数据,而第二个消费者此时就是被错误的唤醒了,程序抛出异常,所以此处使用 while(condition)循环检查Java中要求wait()方法为什么要放在同步块中?
答:防止出现Lost Wake-Up
举例:如果队列没有同步限制,消费者和生产者并发执行,很可能出现这种情况,消费者这时候检查了条件正准备wait(),这时候上下文切换到了生产者,生产者咔咔一顿操作向队列中添加了数据,并唤醒了消费者,而此时消费者并没有wait(),这个通知就丢掉了,然后消费者wait() 就这样睡去了...为什么缓冲区一定要使用阻塞队列实现?
同理就是为了防止出现Lost Wake-Up
为什么要使用生产者消费者模式
顺序执行不就可以了吗?生产者消费者到底有什么意义?
并发 (异步)
生产者直接调用消费者,两者是同步(阻塞)的,如果消费者吞吐数据很慢,这时候生产者白白浪费大好时光。而使用这种模式之后,生产者将数据丢到缓冲区,继续生产,完全不依赖消费者,程序执行效率会大大提高。解耦
生产者和消费者之间不直接依赖,通过缓冲区通讯,将两个类之间的耦合度降到最低。