基于条件变量的生产者消费者问题

本文摘录至《操作系统导论》第30章的相关内容

生产者消费者问题

所谓的生产者消费者(producer consumer)问题,也被称为有界缓存区(bounded buffer)问题,生产者将生成的数据放入缓冲区,消费者从缓冲区取走数据,以某种方式消费。

在很多实际的系统中存在这样的场景,比如多线程网络服务器中,一个生产者将HTTP请求放入工作队列,多个消费者线程从队列中取走请求进行处理。再比如在shell中使用管道连接不同程序的输入与输出,比如grep foo file.txt | wc -l命令,grep是生产者,而wc是消费者。

因为有界缓冲区是共享资源,所以我们必须通过某种同步机制来访问有界缓冲区,以避免产生竞态条件。

有问题的方案

我们首先从一个简陋的方案开始,需要一个共享缓冲区,以便生产者存放数据,消费者取出数据,这里以一个整数作为缓冲区,两个函数分别实现将数值放入缓冲区,以及从缓冲区取走数值,相关代码为:

int buffer
int count = 0;

void put(int value)
{
    assert(count == 0);
    count = 1;
    buffer = value;
}

int get()
{
    assert(count == 1);
    count = 0;
    return buffer;
}

在以上的代码中,count为1表示缓冲区已满,count为0表示缓冲区为空。

现在我们需要一些操作,知道何时可以访问缓冲区,以便将数据放入缓冲区,或者从缓冲区读取数据。条件很明显,仅在count为0时,即缓冲区为空时,才将数据放入缓冲区,仅在count为1时,即缓冲区已满时,才从缓冲区取走数据,如果让生产者将数据放入已满的缓冲区,或者让消费者从已空的缓冲区获取数据,将发生错误。

生产者线程与消费者线程分别向缓冲区放入数据与取走数据,以下代码模拟了生产者线程与消费者线程:

void *producer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        put(i);
    }
}

void *consumer(void *arg)
{
    int i;
    while(1)
    {
        int tmp = get();
        printf("%d\n", tmp);
    }
}

以上的生产者线程与消费者线程,显然没有考虑缓冲区作为共享资源会存在临界区的问题,给代码加锁是没有用的,需要条件变量,修改后的代码为:

cond_t cond;
mutex_t mutex;

void *producer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        if(count == 1)
        {
            pthread_cond_wait(&cond, &mutex);
        }
        put(i);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
}

void *consumer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        if(count == 0)
        {
            pthread_cond_wait(&cond, &mutex);
        }
        int tmp = get();
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        printf("%d\n", tmp);
    }
}

以上的方案,存在两个严重的问题:

问题1

假设有两个消费者(消费者1与消费者2),一个生产者。

首先,消费者1开始执行,获取锁后,检查缓冲区为空,然后执行pthread_cond_wait等待,进入睡眠,注意在此函数中会释放锁。

接着,生产者开始运行,其获取锁,检查缓冲区,发现缓冲区未满,则向缓冲区放入数值,然后生产者执行pthread_cond_signal,发出信号,表示缓冲区已满,此时这个信号使得之前的消费者1不再睡眠,消费者1进入就绪队列,生产者发现缓冲区满后,进入睡眠。

此时,如果消费者2如果抢先得到运行,消费了缓冲区中的数据,然后消费者2进入睡眠,消费者1得到运行,消费者1则从pthread_cond_wait中返回(注意,从此函数返回时,会获得锁),执行get操作,但是此时缓冲区已经为空了,问题出现了!

显然,应该设法阻止消费1去读取缓冲区的数据,因为消费者2抢先将缓冲区的数据消费掉了。

产生问题的原因也很简单,在消费者1被生产者唤醒后,但是在消费者1运行之前,缓冲区的状态已经发生了改变(消费者2抢先消费了数据),因此生产者发出信号,只是唤醒了消费者1,暗示缓冲区的状态发生了变化,但是并不会保证在消费者1运行之前的状态一直都是期望的情况。

对此问题的解决方案很简单,将if语句改为while语句,表示当消费者1被唤醒后,应该立即再次检查共享变量,如果缓冲区此时为空,则消费者1应该回去继续睡眠,生产者需同样的修改,代码为:

cond_t cond;
mutex_t mutex;

void *producer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        while(count == 1)
        {
            pthread_cond_wait(&cond, &mutex);
        }
        put(i);
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
    }
}

void *consumer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        while(count == 0)
        {
            pthread_cond_wait(&cond, &mutex);
        }
        int tmp = get();
        pthread_cond_signal(&cond);
        pthread_mutex_unlock(&mutex);
        printf("%d\n", tmp);
    }
}

问题2

假设两个消费者先运行,缓冲区为空,因此消费者1与消费者2都进入睡眠,生产者开始运行,在缓冲区放入数值后,发现信号,生产者进入睡眠。发出的信号唤醒了一个消费者,假设唤醒的是消费者1,此时消费者1处于就绪队列,而消费者2与生成者都处于睡眠中,并且都等待在同一个条件变量上。

消费者1得到运行,发现缓冲区为满,则消费者1消费了数据,然后消费者1发出了信号,在其睡眠之前,唤醒了一个正在等待的线程,即消费者2或者生产者。

问题的关键是,唤醒的是哪个线程?因为消费者1已经消费掉了缓冲区的数据,理论上应该唤醒生产者,但是如果唤醒的是消费者2呢?此时问题出现了,消费者2被唤醒后,发现缓冲区为空,又回去睡眠了,此时,生产者,消费者1,消费者2都处于睡眠状态了,大家都睡着了,显然这是个缺陷。

通过以上分析可知,消费者不应该唤醒消费者,而应该只唤醒生产者。对此问题的解决方案,就是使用两个条件变量,而不是一个。代码为:

cond_t empty;
cond_t fill;
mutex_t mutex;

void *producer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        while(count == 1)
        {
            pthread_cond_wait(&empty, &mutex);
        }
        put(i);
        pthread_cond_signal(&fill);
        pthread_mutex_unlock(&mutex);
    }
}

void *consumer(void *arg)
{
    int i;
    int loops = (int)arg;
    for(i = 0; i < loops; i++)
    {
        pthread_mutex_lock(&mutex);
        while(count == 0)
        {
            pthread_cond_wait(&fill, &mutex);
        }
        int tmp = get();
        pthread_cond_signal(&empty);
        pthread_mutex_unlock(&mutex);
        printf("%d\n", tmp);
    }
}

以上代码就可以解决了基于条件变量的生产者消费者问题。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容