一、生产者消费者模式
生产者消费者模式是一种用于解决多个模块之间数据通信问题的高效机制。通过在数据生产者和数据消费者之间设立数据缓冲区,实现地低耦合度的数据通信。
这样的一个结构就像是流水线上两道工序和他们之间的货架。前道工序上有若干工人,他们会将本工序的产品放到货架上,而后立即回归到自己的生产工作中;同样地,后道工序上的若干工人们,可以直接从货架上收取上道工序的产品,直接开始自己的生产工作。
相比于直接调用的数据通信方式,生产者/消费者模式虽然多了一个数据缓冲区,但是优势也非常明显:
支持模块并发
使用直接调用的通信方式,最明显的弊端就是,调用关系是阻塞的,调用者必须中断自己的任务,等待被调者的返回再继续执行,这样就大大降低了程序运行效率。特别是当被调用的模块中涉及网络通信、文件读写这样的耗时操作时,主调一直在阻塞等待将是不可接受的性能损失。使用生产者/消费者模式,生产者模块(也就是原先的主调函数)只需要将数据放进缓冲区,这一个周期的任务就完成了,它可以立即返回,去执行下一个周期的任务。同样的,消费者模块(对应原先的被调函数)也不必苦苦等待上一个流程的结束,只要数据缓冲区不是空的,他就可以立即开始消费操作。
对应到上面流水线的例子里,直接调用的通信方式好比是前道工序的工人必须苦苦等候隔壁组的兄弟忙完手上的产品,腾出手来接过去自己的完成品;又或者是后道工序的工人要跑到隔壁去打听有没有处理完的产品。无论哪一种情况,都是巨大的效率损失。而中间摆上一个货架,前道工序的工人只需要将产品摆上货架(只要货架不是满的),就可以继续生产,而对于后道工序的工人,只要货架上还有产品,他们就直接可以从上面取出产品进行操作。支持忙闲不均
在很多业务场景中,数据生产者的生产速度和数据消费者的消费速度有可能在一定范围内波动,如果使用直接调用的方式,则无论主调侧还是被调侧进入忙碌状态,两侧都必须将速度减慢,来协调数据同步的速度。而如果通过数据缓冲区进行交互,某一方进入忙碌状态时,可以利用缓冲区中的数据或空间进行调节(如消费者处理速度放缓,则生产者也不必随之减缓速度,产生数据可以利用缓冲区中剩余的部分存储)。
同样也可以对应到流水线的例子里,如果前道工序的工人去一趟洗手间,如果采用的是直接交接产品的方式,那么后道工序的工人也就不得不暂时停工了;而如果双方通过货架交接,那么前道工序的工人离开的这段时间,货架上的产品也足够后道工序工人来操作了。降低耦合程度
如果使用直接调用的方式,如果未来双方任何一边代码改变(如收发数据的频率和数据单元大小改变),则双方的代码都需要变化。而在生产者/消费者模式中,如果一方的数据存取方法变化,只需要将他使用缓冲区的部分即可。
对应到流水线的例子里,如果后道工序上来了个新人,新人有自己的干活节奏,习惯一次拿两块零件,那前道工序的人还得改变自己的节奏,做好两块在给隔壁送过去;而如果两边通过货架交接,不管隔壁的人是什么工作习惯,都可以根据自己的想法从货架上取任意个零件,前道工序的生产节奏不需要进行任何的调整。
二、环形缓冲区结构
环形缓冲区使用两个指针分别用于读操作和写操作,两者以相同的方向转动,就像在操场上相互竞逐的两个人一样。读指针身前写指针身后,这部分为数据区块,这部分区域的状态为可读不可写;而写指针身前读指针身后,这部分为空闲区块,这部分区块的状态为可写不可读。当写指针追上读指针时,缓冲区满,需要写指针暂停写入;而当读指针追上写指针时,缓冲区空,需要读指针暂停读取。
相比于普通的队列结构(FIFO),环形缓冲区的所有读写操作都在一个相对固定的存储区域内完成,这样如果程序涉及频繁的读写,就可以省去大量的空间申请释放操作。
三、环形缓冲区在生产者消费者模式中的应用(代码实现)
以下代码在linux环境下编译运行,其他环境可能会略有不同。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#define SIZE 10
int product_idx = 0; //生产者指针
int consume_idx = 0; //消费者指针
int data = 0;
int ring[SIZE]; //环形缓冲区
sem_t blankSem; //控制空闲块的信号量
sem_t dataSem; //控制数据块的信号量
pthread_mutex_t product_lock = PTHREAD_MUTEX_INITIALIZER; //各生产者之间的互斥锁
pthread_mutex_t consume_lock = PTHREAD_MUTEX_INITIALIZER; //各消费者之间的互斥锁
pthread_t product_1_tid, product_2_tid, consume_1_tid, consume_2_tid;
void* product_1(void *arg) //生产者线程1
{
while(1)
{
pthread_mutex_lock(&product_lock); //抢占生产者互斥锁
sem_wait(&blankSem); //获取一个空闲块资源
ring[product_idx] = data++;
sem_post(&dataSem); //释放一个数据块资源
printf("product_1 put data : %d\n",ring[product_idx++]);
product_idx = product_idx%SIZE; //环形缓冲区的寻址方法
pthread_mutex_unlock(&product_lock); //释放生产者互斥锁
sleep(1);
}
return NULL;
}
void* product_2(void *arg) //生产者线程2
{
while(1)
{
pthread_mutex_lock(&product_lock);
sem_wait(&blankSem);
ring[product_idx] = data++;
sem_post(&dataSem);
printf("product_2 put data : %d\n",ring[product_idx++]);
product_idx = product_idx%SIZE;
pthread_mutex_unlock(&product_lock);
sleep(1);
}
return NULL;
}
void* consume_1(void *arg) //消费者线程1
{
int consume_data = 0;
while(1)
{
pthread_mutex_lock(&consume_lock); //抢占消费者互斥锁
sem_wait(&dataSem); //获取一个数据块资源
consume_data = ring[consume_idx];
sem_post(&blankSem); //释放一个空闲块资源
printf("consume_1 get data : %d\n",consume_data);
sleep(1);
consume_idx++;
consume_idx = consume_idx%SIZE; //环形缓冲区的寻址方法
pthread_mutex_unlock(&consume_lock); //释放消费者互斥锁
sleep(2);
}
return NULL;
}
void* consume_2(void *arg) //消费者线程2
{
int consume_data = 0;
while(1)
{
pthread_mutex_lock(&consume_lock);
sem_wait(&dataSem);
consume_data = ring[consume_idx];
sem_post(&blankSem);
printf("consume_2 get data : %d\n",consume_data);
sleep(1);
consume_idx++;
consume_idx = consume_idx%SIZE;
pthread_mutex_unlock(&consume_lock);
sleep(2);
}
return NULL;
}
int main()
{
sem_init(&blankSem, 0, SIZE); //初始化信号量
sem_init(&dataSem, 0, 0);
int ret = 0;
ret = pthread_create(&product_1_tid, NULL, (void *) product_1, NULL); //创建线程
if (ret) {
printf("pthread_create product_1 error\n");
exit(0);
}
ret = pthread_create(&product_2_tid, NULL, (void *) product_2, NULL);
if (ret) {
printf("pthread_create product_2 error\n");
exit(0);
}
ret = pthread_create(&consume_1_tid, NULL, (void *) consume_1, NULL);
if (ret) {
printf("pthread_create consume_1 error\n");
exit(0);
}
ret = pthread_create(&consume_2_tid, NULL, (void *) consume_2, NULL);
if (ret) {
printf("pthread_create consume_2 error\n");
exit(0);
}
pthread_join(product_1_tid, NULL); //让主线程等待各个子线程结束
pthread_join(product_2_tid, NULL);
pthread_join(consume_1_tid, NULL);
pthread_join(consume_2_tid, NULL);
sem_destroy(&blankSem); //销毁信号量
sem_destroy(&dataSem);
}
代码编译运行之后,运行效果如下图所示:
可以看到生产者产生的数始终大于消费者取到的数,而且也不会比消费者的数大10,这体现了生产者的指针既不能被消费者超过,也不能把消费者套圈。
四、定速生产者模式下的设计思想
生产者/消费者模式中非常重要的一个原则是,需要时刻监控缓冲区的状态,当缓冲区满时,就要让生产者停止生产操作;而当缓冲区空时,就要让消费者停止消费操作。
然而在实际开发中,可能会出现生产者或消费者某一方是一个定速的状态(比如,生产者是一个数据采集模块,他采集的数据需要一个不少的被消费掉)。这种情况下流水线的例子好像就不适用了,我们可以用一个新的具象模型:天花板漏水。
天花板以固定的速度向地板漏水,地上放了一个小水桶,住户偶尔会从卧室里出来一趟把水倒掉。这里,天花板漏水充当生产者,住户是消费者,而这个小水桶就是二者之间的缓冲区。在这种情况下,生产者以固定的速度不间断的产生数据,而即便缓冲区满,他也不会停止滴水,那这种情况下要如何保护缓冲区不发生溢出呢?我的想法是,给缓冲区再加一个缓冲区,也就是在水桶下面再放个盆。实际开发中使用的是读写文件的方式,缓冲区满就去写文件。这样的话消费者策略也要变更为缓冲区非空时读缓冲区,缓冲区空时读文件。
落实到代码层面上,可以考虑使用sem_trywait(sem_t sem)替代sem_wait(sem_tsem)。当sem_wait想要获取的资源量为0时,线程会一直阻塞在这里等待其他线程释放该资源,而sem_trywait则不会等待,当资源量为0时,也会继续向下执行,通过返回值来判断是否成功获取了该资源,获取成功返回0,并将信号量减1;获取失败返回-1,信号量不变。
这样我们可以将上文中的生产者消费者代码做如下修改:
void* product(void *arg) //生产者线程
{
while(1)
{
pthread_mutex_lock(&product_lock); //抢占生产者互斥锁
if (sem_trywait(&blankSem) != 0) //非阻塞尝试获取一个空闲块资源
{
/* 此处补充写文件代码 */
}
else //获取成功
{
ring[product_idx] = data++;
sem_post(&dataSem); //释放一个数据块资源
printf("product_1 put data : %d\n",ring[product_idx++]);
product_idx = product_idx%SIZE;
}
pthread_mutex_unlock(&product_lock); //释放生产者互斥锁
sleep(1);
}
return NULL;
}
void* consume(void *arg) //消费者线程
{
int consume_data = 0;
while(1)
{
pthread_mutex_lock(&consume_lock); //抢占消费者互斥锁
if (sem_trywait(&dataSem) != 0) //非阻塞尝试获取一个数据块资源
{
/* 此处补充读文件代码 */
}
else //获取成功
{
consume_data = ring[consume_idx];
sem_post(&blankSem); //释放一个空闲块资源
printf("consume_1 get data : %d\n",consume_data);
sleep(1);
consume_idx++;
consume_idx = consume_idx%SIZE;
}
pthread_mutex_unlock(&consume_lock); //释放消费者互斥锁
sleep(2);
}
return NULL;
}