基于环形缓冲区的生产者消费者模式实现

一、生产者消费者模式

生产者消费者模式是一种用于解决多个模块之间数据通信问题的高效机制。通过在数据生产者和数据消费者之间设立数据缓冲区,实现地低耦合度的数据通信。

图1 生产者/消费者模式的结构

这样的一个结构就像是流水线上两道工序和他们之间的货架。前道工序上有若干工人,他们会将本工序的产品放到货架上,而后立即回归到自己的生产工作中;同样地,后道工序上的若干工人们,可以直接从货架上收取上道工序的产品,直接开始自己的生产工作。

相比于直接调用的数据通信方式,生产者/消费者模式虽然多了一个数据缓冲区,但是优势也非常明显:

  1. 支持模块并发
    使用直接调用的通信方式,最明显的弊端就是,调用关系是阻塞的,调用者必须中断自己的任务,等待被调者的返回再继续执行,这样就大大降低了程序运行效率。特别是当被调用的模块中涉及网络通信、文件读写这样的耗时操作时,主调一直在阻塞等待将是不可接受的性能损失。使用生产者/消费者模式,生产者模块(也就是原先的主调函数)只需要将数据放进缓冲区,这一个周期的任务就完成了,它可以立即返回,去执行下一个周期的任务。同样的,消费者模块(对应原先的被调函数)也不必苦苦等待上一个流程的结束,只要数据缓冲区不是空的,他就可以立即开始消费操作。
    对应到上面流水线的例子里,直接调用的通信方式好比是前道工序的工人必须苦苦等候隔壁组的兄弟忙完手上的产品,腾出手来接过去自己的完成品;又或者是后道工序的工人要跑到隔壁去打听有没有处理完的产品。无论哪一种情况,都是巨大的效率损失。而中间摆上一个货架,前道工序的工人只需要将产品摆上货架(只要货架不是满的),就可以继续生产,而对于后道工序的工人,只要货架上还有产品,他们就直接可以从上面取出产品进行操作。

  2. 支持忙闲不均
    在很多业务场景中,数据生产者的生产速度和数据消费者的消费速度有可能在一定范围内波动,如果使用直接调用的方式,则无论主调侧还是被调侧进入忙碌状态,两侧都必须将速度减慢,来协调数据同步的速度。而如果通过数据缓冲区进行交互,某一方进入忙碌状态时,可以利用缓冲区中的数据或空间进行调节(如消费者处理速度放缓,则生产者也不必随之减缓速度,产生数据可以利用缓冲区中剩余的部分存储)。
    同样也可以对应到流水线的例子里,如果前道工序的工人去一趟洗手间,如果采用的是直接交接产品的方式,那么后道工序的工人也就不得不暂时停工了;而如果双方通过货架交接,那么前道工序的工人离开的这段时间,货架上的产品也足够后道工序工人来操作了。

  3. 降低耦合程度
    如果使用直接调用的方式,如果未来双方任何一边代码改变(如收发数据的频率和数据单元大小改变),则双方的代码都需要变化。而在生产者/消费者模式中,如果一方的数据存取方法变化,只需要将他使用缓冲区的部分即可。
    对应到流水线的例子里,如果后道工序上来了个新人,新人有自己的干活节奏,习惯一次拿两块零件,那前道工序的人还得改变自己的节奏,做好两块在给隔壁送过去;而如果两边通过货架交接,不管隔壁的人是什么工作习惯,都可以根据自己的想法从货架上取任意个零件,前道工序的生产节奏不需要进行任何的调整。

二、环形缓冲区结构

图2 环形缓冲区的结构

环形缓冲区使用两个指针分别用于读操作和写操作,两者以相同的方向转动,就像在操场上相互竞逐的两个人一样。读指针身前写指针身后,这部分为数据区块,这部分区域的状态为可读不可写;而写指针身前读指针身后,这部分为空闲区块,这部分区块的状态为可写不可读。当写指针追上读指针时,缓冲区满,需要写指针暂停写入;而当读指针追上写指针时,缓冲区空,需要读指针暂停读取。
相比于普通的队列结构(FIFO),环形缓冲区的所有读写操作都在一个相对固定的存储区域内完成,这样如果程序涉及频繁的读写,就可以省去大量的空间申请释放操作。

三、环形缓冲区在生产者消费者模式中的应用(代码实现)

以下代码在linux环境下编译运行,其他环境可能会略有不同。

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <semaphore.h>
#include <pthread.h>
#define SIZE 10

int product_idx = 0;        //生产者指针
int consume_idx = 0;        //消费者指针

int data = 0;
int ring[SIZE];             //环形缓冲区

sem_t blankSem;             //控制空闲块的信号量
sem_t dataSem;              //控制数据块的信号量
pthread_mutex_t product_lock = PTHREAD_MUTEX_INITIALIZER;       //各生产者之间的互斥锁
pthread_mutex_t consume_lock = PTHREAD_MUTEX_INITIALIZER;       //各消费者之间的互斥锁

pthread_t product_1_tid, product_2_tid, consume_1_tid, consume_2_tid;   

void* product_1(void *arg)                      //生产者线程1
{
    while(1)
    {
        pthread_mutex_lock(&product_lock);      //抢占生产者互斥锁
        sem_wait(&blankSem);                    //获取一个空闲块资源
        ring[product_idx] = data++;
        sem_post(&dataSem);                     //释放一个数据块资源
        printf("product_1 put data : %d\n",ring[product_idx++]);      
        product_idx = product_idx%SIZE;         //环形缓冲区的寻址方法
        pthread_mutex_unlock(&product_lock);    //释放生产者互斥锁
        sleep(1);
    }
    return NULL;
}

void* product_2(void *arg)                      //生产者线程2
{
    while(1)
    {
        pthread_mutex_lock(&product_lock);  
        sem_wait(&blankSem);                    
        ring[product_idx] = data++;
        sem_post(&dataSem);
        printf("product_2 put data : %d\n",ring[product_idx++]);      
        product_idx = product_idx%SIZE;
        pthread_mutex_unlock(&product_lock);
        sleep(1);
    }
    return NULL;
}

void* consume_1(void *arg)                      //消费者线程1
{
    int consume_data = 0;
    while(1)
    {
        pthread_mutex_lock(&consume_lock);      //抢占消费者互斥锁
        sem_wait(&dataSem);                     //获取一个数据块资源
        consume_data = ring[consume_idx]; 
        sem_post(&blankSem);                    //释放一个空闲块资源
        printf("consume_1 get data : %d\n",consume_data);
        sleep(1);
        consume_idx++;
        consume_idx = consume_idx%SIZE;         //环形缓冲区的寻址方法
        pthread_mutex_unlock(&consume_lock);    //释放消费者互斥锁
        sleep(2);
    }
    return NULL;
}

void* consume_2(void *arg)                      //消费者线程2
{
    int consume_data = 0;
    while(1)
    {
        pthread_mutex_lock(&consume_lock);
        sem_wait(&dataSem);
        consume_data = ring[consume_idx]; 
        sem_post(&blankSem);
        printf("consume_2 get data : %d\n",consume_data);
        sleep(1);
        consume_idx++;
        consume_idx = consume_idx%SIZE;
        pthread_mutex_unlock(&consume_lock);
        sleep(2);
    }
        return NULL;
}

int main()
{
    sem_init(&blankSem, 0, SIZE);       //初始化信号量
    sem_init(&dataSem, 0, 0);
    
    int ret = 0;
    
    ret = pthread_create(&product_1_tid, NULL, (void *) product_1, NULL);   //创建线程
    if (ret) {
        printf("pthread_create product_1 error\n");
        exit(0);
    }
    
    ret = pthread_create(&product_2_tid, NULL, (void *) product_2, NULL);
    if (ret) {
        printf("pthread_create product_2 error\n");
        exit(0);
    }
    
    ret = pthread_create(&consume_1_tid, NULL, (void *) consume_1, NULL);
    if (ret) {
        printf("pthread_create consume_1 error\n");
        exit(0);
    }
    
    ret = pthread_create(&consume_2_tid, NULL, (void *) consume_2, NULL);
    if (ret) {
        printf("pthread_create consume_2 error\n");
        exit(0);
    }
    
    pthread_join(product_1_tid, NULL);      //让主线程等待各个子线程结束
    pthread_join(product_2_tid, NULL);
    pthread_join(consume_1_tid, NULL);
    pthread_join(consume_2_tid, NULL);
    
    sem_destroy(&blankSem);     //销毁信号量
    sem_destroy(&dataSem);
}

代码编译运行之后,运行效果如下图所示:


图3 demo程序的运行效果

可以看到生产者产生的数始终大于消费者取到的数,而且也不会比消费者的数大10,这体现了生产者的指针既不能被消费者超过,也不能把消费者套圈。

四、定速生产者模式下的设计思想

生产者/消费者模式中非常重要的一个原则是,需要时刻监控缓冲区的状态,当缓冲区满时,就要让生产者停止生产操作;而当缓冲区空时,就要让消费者停止消费操作。
然而在实际开发中,可能会出现生产者或消费者某一方是一个定速的状态(比如,生产者是一个数据采集模块,他采集的数据需要一个不少的被消费掉)。这种情况下流水线的例子好像就不适用了,我们可以用一个新的具象模型:天花板漏水。
天花板以固定的速度向地板漏水,地上放了一个小水桶,住户偶尔会从卧室里出来一趟把水倒掉。这里,天花板漏水充当生产者,住户是消费者,而这个小水桶就是二者之间的缓冲区。在这种情况下,生产者以固定的速度不间断的产生数据,而即便缓冲区满,他也不会停止滴水,那这种情况下要如何保护缓冲区不发生溢出呢?我的想法是,给缓冲区再加一个缓冲区,也就是在水桶下面再放个盆。实际开发中使用的是读写文件的方式,缓冲区满就去写文件。这样的话消费者策略也要变更为缓冲区非空时读缓冲区,缓冲区空时读文件。

落实到代码层面上,可以考虑使用sem_trywait(sem_t sem)替代sem_wait(sem_tsem)。当sem_wait想要获取的资源量为0时,线程会一直阻塞在这里等待其他线程释放该资源,而sem_trywait则不会等待,当资源量为0时,也会继续向下执行,通过返回值来判断是否成功获取了该资源,获取成功返回0,并将信号量减1;获取失败返回-1,信号量不变。
这样我们可以将上文中的生产者消费者代码做如下修改:

void* product(void *arg)                        //生产者线程
{
    while(1)
    {
        pthread_mutex_lock(&product_lock);      //抢占生产者互斥锁
        if (sem_trywait(&blankSem) != 0)        //非阻塞尝试获取一个空闲块资源
        {
            /* 此处补充写文件代码 */
        }
        else                                    //获取成功
        {
            ring[product_idx] = data++;
            sem_post(&dataSem);                 //释放一个数据块资源
            printf("product_1 put data : %d\n",ring[product_idx++]);
            product_idx = product_idx%SIZE;
        }                   
        pthread_mutex_unlock(&product_lock);    //释放生产者互斥锁
        sleep(1);
    }
    return NULL;
}

void* consume(void *arg)                        //消费者线程
{
    int consume_data = 0;
    while(1)
    {
        pthread_mutex_lock(&consume_lock);      //抢占消费者互斥锁
        if (sem_trywait(&dataSem) != 0)         //非阻塞尝试获取一个数据块资源
        {
            /* 此处补充读文件代码 */
        }
        else                                    //获取成功
        {
            consume_data = ring[consume_idx]; 
            sem_post(&blankSem);                //释放一个空闲块资源
            printf("consume_1 get data : %d\n",consume_data);
            sleep(1);
            consume_idx++;
            consume_idx = consume_idx%SIZE;
        }
        pthread_mutex_unlock(&consume_lock);    //释放消费者互斥锁
        sleep(2);
    }
    return NULL;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 218,284评论 6 506
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,115评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,614评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,671评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,699评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,562评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,309评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,223评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,668评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,859评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,981评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,705评论 5 347
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,310评论 3 330
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,904评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,023评论 1 270
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,146评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,933评论 2 355

推荐阅读更多精彩内容