生产者和消费者问题是线程模型中的经典问题:生产者和消费者在同一时间段内共用同一个存储空间,生产者往存储空间中添加产品,消费者从存储空间中取走产品,当存储空间为空时,消费者阻塞,当存储空间满时,生产者阻塞。
举例说明:
- 你把信写好——相当于生产者制造数据
- 你把信放入邮筒——相当于生产者把数据放入缓冲区
- 邮递员把信从邮筒取出——相当于消费者把数据取出缓冲区
- 邮递员把信拿去邮局做相应的处理——相当于消费者处理数据
具体实现方式
为什么要使用生产者和消费者模式
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者和消费者问题的不同实现方式
- 不完善的实现(会导致死锁)
int itemCount = 0;//总数量
procedure producer() {//生产者
while (true) {
item = produceItem();//生产一个
if (itemCount == BUFFER_SIZE) {//生产满则睡眠
sleep();
}
putItemIntoBuffer(item);//缓冲区放入一个
itemCount = itemCount + 1;
if (itemCount == 1) {
wakeup(consumer);//唤醒消费者
}
}
}
procedure consumer() {//消费者
while (true) {
if (itemCount == 0) {//消费完则睡眠
sleep();
}
item = removeItemFromBuffer();//缓冲区减少一个
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1) {
wakeup(producer);//唤醒生产者
}
consumeItem(item);//消费一个
}
}
上面代码中的问题在于它可能导致竞争条件,进而引发死锁。考虑下面的情形:
消费者把最后一个 itemCount 的内容读出来,注意它现在是零。消费者返回到while的起始处,现在进入 if 块;
就在调用sleep之前,CPU决定将时间让给生产者,于是消费者在执行 sleep 之前就被中断了,生产者开始执行;
生产者生产出一项数据后将其放入缓冲区,然后在 itemCount 上加 1;
-由于缓冲区在上一步加 1 之前为空,生产者尝试唤醒消费者;
-遗憾的是,消费者并没有在休眠,唤醒指令不起作用。当消费者恢复执行的时候,执行 sleep,一觉不醒。出现这种情况的原因在于,消费者只能被生产者在 itemCount 为 1 的情况下唤醒;生产者不停地循环执行,直到缓冲区满,随后进入休眠。
由于两个线程都进入了永远的休眠,死锁情况出现了。因此,该算法是不完善的。
2. 使用信号灯的算法
semaphore fillCount = 0; // 生产的项目 总存量
semaphore emptyCount = BUFFER_SIZE; // 剩余空间
procedure producer() {
while (true) {
item = produceItem();//生产
down(emptyCount);//减少剩余空间
putItemIntoBuffer(item);//缓冲区增加
up(fillCount);//增加存量
}
}
procedure consumer() {
while (true) {
down(fillCount);//减少存量
item = removeItemFromBuffer();//缓冲区减少
up(emptyCount);//增加剩余空间
consumeItem(item);//消费
}
}
上述方法在只有一个生产者和一个消费者时能解决问题。对于多个生产者或者多个消费者共享缓冲区的情况,该算法也会导致竞争条件,出现两个或以上的进程同时读或写同一个缓冲区槽的情况。
为了解决这个问题,需要在保证同一时刻只有一个生产者能够执行 putItemIntoBuffer()。也就是说,需要寻找一种方法来互斥地执行临界区的代码。为了达到这个目的,可引入一个二值信号灯 mutex,其值只能为 1 或者 0。如果把线程放入 down(mutex) 和 up(mutex) 之间,就可以限制只有一个线程能被执行。多生产者、消费者的解决算法如下
semaphore mutex = 1;
semaphore fillCount = 0;
semaphore emptyCount = BUFFER_SIZE;
procedure producer() {
while (true) {
item = produceItem();
down(emptyCount);
down(mutex);//获取锁
putItemIntoBuffer(item);
up(mutex);//释放锁
up(fillCount);
}
}
procedure consumer() {
while (true) {
down(fillCount);
down(mutex);
item = removeItemFromBuffer();
up(mutex);
up(emptyCount);
consumeItem(item);
}
}
3. 使用管程的算法
monitor ProducerConsumer {
int itemCount
condition full;
condition empty;
procedure add(item) {
while (itemCount == BUFFER_SIZE)
wait(full);
putItemIntoBuffer(item);
itemCount = itemCount + 1;
if (itemCount == 1)
notify(empty);
}
procedure remove() {
while (itemCount == 0)
wait(empty);
item = removeItemFromBuffer();
itemCount = itemCount - 1;
if (itemCount == BUFFER_SIZE - 1)
notify(full);
return item;
}
}
procedure producer() {
while (true) {
item = produceItem()
ProducerConsumer.add(item)
}
}
procedure consumer() {
while (true) {
item = ProducerConsumer.remove()
consumeItem(item)
}
}
注意代码中 while 语句的用法,都是用在测试缓冲区是否已满或空的时候。当存在多个消费者时,有可能造成竞争条件的情况是:某一消费者在一项数据被放入缓冲区中时被唤醒,但是另一消费者已经在管程上等待了一段时间并移除了这项数据。如果 while 语句被改成 if,则会出现放入缓冲区的数据项过多,或移除空缓冲区中的元素的情况。