生产消费者模型

condition_variable条件变量可以用来实现线程同步,它必须与互斥量mutex配合使用。
条件变量适用场景:一个线程先对某一条件进行判断, 如果条件不满足则进入等待, 条件满足的时候, 该线程被通知条件满足, 继续执行任务
在wait()之前,必须先lock相关联的mutex, 因为假如目标条件未满足,wait()实际上会unlock该mutex, 然后block,在目标条件满足后再重新lock该mutex, 然后返回

使用条件变量实现生产者消费者的简单例子如下:

#include <iostream>
#include<thread>
#include<mutex>
#include<condition_variable>
#include<queue>

using namespace std;

//任务队列
queue<int>products ;
mutex m ;
condition_variable cond ;
bool notify = false ;
bool done = false ;

void producer() ;
void consumer(){
    
    while(!done){
        
        //上锁保护共享资源,unique_lock一次实现上锁和解锁
        unique_lock<mutex>lk(m);
        //等待生产者者通知有资源
        while(!notify){
            
            cond.wait(lk);
        }

        //要是队列不为空的话
        while(!products.empty()){

            cout<<"consumer..."<<products.front()<<endl;
            products.pop();
            //通知生产者仓库容量不足,生产产品
            notify = false ;
            cond.notify_one();
        }
    }
}

void producer(){
    
    int i ;
    for(i=0;i<10;i++){
        //主动让出cpu,不参与cpu 的本次调度,让其他线程使用,等一秒后再参与调度
        //      this_thread::sleep_for(chrono::seconds(1));
        unique_lock<mutex>lk(m);
        cout<<"producer..."<<i<<endl;
        //如果仓库中有产品,就等待消费者消费完后在生产
        while(notify||!products.empty()){
            cond.wait(lk);
        }
        //当前仓库里面没有东西了,就将产品装入仓库
        products.push(i);
        //设置有产品的通知
        notify = true ;
        //通知消费者可以取产品了
        cond.notify_one();
        
    }   
    

    //通知消费者端不生产了
    done = true ;
    cond.notify_one();
}   

int main()
{

    thread t1(producer);
    thread t2(consumer);
    t1.join();
    t2.join();
    return 0;
}


下面实现了维护了缓冲区的结构体,并每次返回相应的位置,可以循环写入的生产者消费者模型:

#include <unistd.h>

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize  = 10; // Item buffer size.
static const int kItemsToProduce  = 1000;   // How many items we plan to produce.

struct ItemRepository {
    int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
    size_t read_position; // 消费者读取产品位置.
    size_t write_position; // 生产者写入产品位置.
    std::mutex mtx; // 互斥量,保护产品缓冲区
    std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
    std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
} gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, int item)
{
    std::unique_lock<std::mutex> lock(ir->mtx);
    while(((ir->write_position + 1) % kItemRepositorySize)
        == ir->read_position) { // item buffer is full, just wait here.
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
    }

    (ir->item_buffer)[ir->write_position] = item; // 写入产品.
    (ir->write_position)++; // 写入位置后移.

    if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
    lock.unlock(); // 解锁.
}

int ConsumeItem(ItemRepository *ir)
{
    int data;
    std::unique_lock<std::mutex> lock(ir->mtx);
    // item buffer is empty, just wait here.
    while(ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
    }

    data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
    (ir->read_position)++; // 读取位置后移

    if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
        ir->read_position = 0;

    (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
    lock.unlock(); // 解锁.

    return data; // 返回产品.
}


void ProducerTask() // 生产者任务
{
    for (int i = 1; i <= kItemsToProduce; ++i) {
        // sleep(1);
        std::cout << "Produce the " << i << "^th item..." << std::endl;
        ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
    }
}

void ConsumerTask() // 消费者任务
{
    static int cnt = 0;
    while(1) {
        sleep(1);
        int item = ConsumeItem(&gItemRepository); // 消费一个产品.
        std::cout << "Consume the " << item << "^th item" << std::endl;
        if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
    }
}

void InitItemRepository(ItemRepository *ir)
{
    ir->write_position = 0; // 初始化产品写入位置.
    ir->read_position = 0; // 初始化产品读取位置.
}

int main()
{
    InitItemRepository(&gItemRepository);
    std::thread producer(ProducerTask); // 创建生产者线程.
    std::thread consumer(ConsumerTask); // 创建消费之线程.
    producer.join();
    consumer.join();
    return 0;
}

condition_variable条件变量线程同步与mutex互斥变量配合使用

每个线程的同步互斥控制流程如下:
A. 进入后加互斥锁
unique_lock<mutex> lck(mtx);
B.判断此时是否能进行读写,能则立刻进行生产或消费,如不能则等待且释放互斥锁,等到能够生产消费时,再加锁进行生产消费操作。操作结束后通知生产者或者消费者,然后进入D。
while(q.size() == maxSize) {produce.wait(lck);} task();consume.notify_all();
D.释放互斥锁
lck.unlock()

  • C++11中加入了新的atomic原子性,可以用来进行互斥操作。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。