用信号量来解决同步问题
问题描述:一组生产者进程和一组消费者进程共享一个初始为空大小为n的缓冲区,只有缓冲区没满时,生产者才能给缓冲区投放信息,否则必须等待;只有缓冲区不空时,消费者才能继续取出消息,否则也必须等待。由于缓冲区是临界资源,他只允许一个进程投放资源或者一个进程取出资源。
我们先用最基本的方法来实现下这个有界缓冲区的问题。
我们假设缓冲区的数据为BUFFER_SIZE,则我们在不考虑执行并发的情况下,可以实现这样的伪代码:
typedef struct {
...
} Item;
#define BUFFER_SIZE 10
Item buffer[BUFFER_SIZE];
int count = 0; // 缓冲区中的数据量
int in = 0; // 用来标识生产者存放下一个数据位置
int out = 0; // 用来标识消费者取出下一个数据位置
// producer
while (true) {
A = producer();
// 缓冲区满,等待
while (BUFFER_SIZE == count) ;
buffer[in] = A;
in = (in + 1) % BUFFER_SIZE;
count++;
}
// cosumer
while (true) {
// 缓冲区空,等待
while (0 == count) ;
B = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count--;
}
首先我们定义了,这样的一个伪代码的算法。
接下来我们考虑下加下信号量的实现方法。
首先,缓冲区是临界资源,那么不论是生产者还是消费者访问临界资源的时候都必须是互斥的访问。所以,对于访问临界资源必须有个互斥信号量———mutex,其初始值为1,表示可以访问。
可以先把临界区加上这个限制。
// 生产者临界区
wait(mutex);
buffer[in] = A;
in = (in + 1) % BUFFER_SIZE;
count++;
signal(mutex);
// 消费者临界区
wait(mutex)
B = buffer[out];
out = (out + 1) % BUFFER_SIZE;
count--;
signal(mutex)
对于临界资源的访问不分这个生产者还是消费者,谁访问都一样,都是一个进程访问临界资源的时候其他进程得等待。
接下来我们来用信号量来替换上面的忙等待部分。
生产者与消费者是互相合作的关系,我们说,为完成某种任务而建立的多个进程,这些进程因为要在某些位置上协调他们的工作次序而等待。
比如:A进程要工作必须等待B进程的一个结果,如果仅仅是A进程单方面的需要B进程的一个结果,那这张制约关系就是单方向的(此处的单方向和下面双方向是我个人的理解而用的词汇),如果同时B进程的工作也需要A进程工作的结果,那么这就是双方向的互相制约了。
而生产者-消费者问题里的同步关系我认为是双方向的,原因如下:生产者要生产的前提是缓冲区没满,而缓冲区没满是消费者运行后的结果,同样消费者要运行的前提是缓冲区不空,而缓冲区不空是生产者不断生成的结果。所以,按本人的理解就是双方向制约的关系。
也许有人好奇为什么要搞这么细,原因很简答,在指定同步关系的信号量的时候一个制约就是一个信号量,本题的同步关系需要两个信号量。一个是消费者通知生产者是否可以生产的“有空位”信号量——empty,一个是生产者通知消费者要消费的“有信息”信号量——full。
(这一部分讲解摘自:https://blog.csdn.net/m0_38041038/article/details/80958714)
所以我们看到的具体实现逻辑为两个信号量相互制约控制。
semaphore mutex=1;//互斥信号量
semaphore empty=n;//代表的是临界区的空位
semaphore full=0;//代表的是临界区的数据。空位+数据=n
producer(){ //生产者
while(1){
produce an item in nextep;
p(empty);(想要什么p一下) //获取空的缓冲区单元,有n个单元,每p一下。empty--一次
p(muetx); //进入区。也就是进入临界区,互斥的访问。
add nextep to buffer; //临界区, 将数据装入缓冲区
v(mutex); //退出区。
v(full);(提供什么v一下) //缓冲区有了数据了,没生产一个数据 full++一次。
}
}
consumer(){ //消费者
while(1){
p(full);//获取满的缓冲单元
p(muetx);//进入区
remove an item from buffer;//临界区,取出缓冲区里的数据
v(muetx);//退出区
v(empty);//获取缓冲单元里的数据,产生一个空的缓冲单元。
consume the item;//消费了数据
}
}
下面有两种方法实现了生产者消费者问题
POSIX信号量实现
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <semaphore.h>
#define BUFFER_SIZE 10
typedef struct {
int value;
}Item;
Item buffer[BUFFER_SIZE] = {0};
int count = 0;
int in = 0;
int out = 0;
sem_t mutex;
sem_t blank;
sem_t data;
void* producer(void*) {
srand(time(NULL));
while(true) {
Item item;
item.value = rand() % 100;
sem_wait(&blank);
sem_wait(&mutex);
count++;
buffer[in] = item;
printf("Produce write a item: %d to Buffer[%d].\n", item.value, in);
in = (in + 1) % BUFFER_SIZE;
sem_post(&mutex);
sem_post(&data);
sleep(1);
}
}
void* cosumer(void*) {
while (true) {
Item item;
sem_wait(&data);
sem_wait(&mutex);
count--;
item.value = buffer[out].value;
printf("cosumer read %d from buffer[%d].\n", item.value, out);
out = (out + 1) % BUFFER_SIZE;
sem_post(&mutex);
sem_post(&blank);
sleep(1);
}
}
int main(int argc, char* argv[]) {
sem_init(&mutex, 0, 1);
sem_init(&blank, 0, BUFFER_SIZE);
sem_init(&data, 0, 0);
pthread_t pro[5];
pthread_t cosu[10];
for (int i = 0; i < 5; ++i) {
pthread_create(&pro[i], NULL, producer, NULL);
}
for (int i = 0; i < 10; ++i) {
pthread_create(&cosu[i], NULL, cosumer, NULL);
}
for (int i = 0; i < 5; ++i) {
pthread_join(pro[i], NULL);
}
for (int i = 0; i < 10; ++i) {
pthread_join(cosu[i], NULL);
}
sem_destroy(&mutex);
sem_destroy(&blank);
sem_destroy(&data);
return 0;
}
使用互斥量-条件变量实现
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#define BUFFER_SIZE 10
typedef struct {
int value;
}Item;
Item buffer[BUFFER_SIZE] = {0};
int count = 0;
int in = 0;
int out = 0;
pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
void* producer(void*) {
// 生产者,生产数据
srand(time(NULL));
while (true) {
Item item;
item.value = rand() % 100;
pthread_mutex_lock(&mutex);
// 这里的count必须在锁里面
if (count == BUFFER_SIZE) {
printf("buffer is full, producer wait for buffer has position.\n");
pthread_cond_wait(¬_full, &mutex);
}
count++;
printf("Produce write a item: %d to Buffer[%d].\n", item.value, in);
buffer[in] = item;
in = (in + 1) % BUFFER_SIZE;
pthread_cond_signal(¬_empty);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
void* cosumer(void*) {
while(true) {
Item item;
pthread_mutex_lock(&mutex);
if (count == 0) {
printf("Buffer is empty, cosumer wait for buffer not empty.\n");
pthread_cond_wait(¬_empty, &mutex);
}
count--;
item.value = buffer[out].value;
printf("cosumer read %d from buffer[%d].\n", item.value, out);
out = (out + 1) % BUFFER_SIZE;
pthread_cond_signal(¬_full);
pthread_mutex_unlock(&mutex);
sleep(1);
}
}
int main(int argc, char* argv[]) {
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(¬_empty, NULL);
pthread_cond_init(¬_full, NULL);
pthread_t pro[5];
pthread_t cosu[10];
for (int i = 0; i < 5; ++i) {
pthread_create(&pro[i], NULL, producer, NULL);
}
for (int i = 0; i < 10; ++i) {
pthread_create(&cosu[i], NULL, cosumer, NULL);
}
for (int i = 0; i < 5; ++i) {
pthread_join(pro[i], NULL);
}
for (int i = 0; i < 10; ++i) {
pthread_join(cosu[i], NULL);
}
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(¬_empty);
pthread_cond_destroy(¬_full);
return 0;
}