#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <random>
using namespace std;
static const int g_product_max_count = 100; //生产者生产产品的最大个数
std::mutex stdoutMutex; //多线程标准输出 同步锁
struct ProductManager {
queue<int> m_product_queue;
int queue_max_size = 10; // 队列所容纳的产品最大个数
int m_produce_product_count = 0;
int m_consume_product_count = 0;
std::mutex m_mutex; // 互斥量,保护产品缓冲区
std::mutex m_produce_count_mutex;
std::mutex m_consume_count_mutex;
std::condition_variable cv_queue_notFull; // 条件变量, 指产品仓库缓冲区不为满
std::condition_variable cv_queue_notEmpty; // 条件变量, 指产品仓库缓冲区不为空
} g_productManager; // 产品库全局变量,生产者和消费者操作该变量.
void ProduceProduct(ProductManager &pManager, int product)
{
std::unique_lock<std::mutex> lock(pManager.m_mutex);
while (pManager.m_product_queue.size() >= pManager.queue_max_size) {
{
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "仓库满了,生产者等待中..." << "thread id = " << std::this_thread::get_id() << endl;
}
pManager.cv_queue_notFull.wait(lock);
}
pManager.m_product_queue.push(product); // 仓库放入产品
pManager.cv_queue_notEmpty.notify_all(); // 通知消费者仓库不为空
lock.unlock();
}
int ConsumeProduct(ProductManager &pManager)
{
int data;
std::unique_lock<std::mutex> lock(pManager.m_mutex);
while (pManager.m_product_queue.empty()) {
{
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "仓库空了,消费者等待中..." << "thread id = " << std::this_thread::get_id() << endl;
}
pManager.cv_queue_notEmpty.wait(lock);
}
data = pManager.m_product_queue.front();
pManager.m_product_queue.pop();
pManager.cv_queue_notFull.notify_all();
lock.unlock();
return data;
}
// 生产者任务
void ProducerTask(int th_ID)
{
bool readyToExit = false;
while (true) {
default_random_engine e;
uniform_int_distribution<unsigned >u(0, 9);
this_thread::sleep_for(std::chrono::seconds(u(e)));
std::unique_lock<std::mutex> lock(g_productManager.m_produce_count_mutex); // 仓库产品消费计数器保持多线程互斥
if (g_productManager.m_produce_product_count < g_product_max_count) {
++g_productManager.m_produce_product_count;
ProduceProduct(g_productManager, g_productManager.m_produce_product_count);
{
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "Thread " << th_ID << " Produce the " << g_productManager.m_produce_product_count << " th product" << endl;
}
} else {
readyToExit = true;
}
lock.unlock();
if (readyToExit)
break;
}
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "Producer Thread " << th_ID << " exit.... " << endl;
}
// 消费者任务
void ConsumerTask(int th_ID)
{
while (true) {
default_random_engine e;
uniform_int_distribution<unsigned >u(0, 9);
this_thread::sleep_for(std::chrono::seconds(u(e)));
std::unique_lock<std::mutex> lock(g_productManager.m_consume_count_mutex);
g_productManager.m_consume_product_count++;
if (g_productManager.m_consume_product_count <= g_product_max_count) {
int product = ConsumeProduct(g_productManager);
{
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "Thread " << th_ID << " Consume the " << product << " th product" << endl;
}
} else {
break;
}
}
std::lock_guard<std::mutex> lock(stdoutMutex);
cout << "Consumer Thread " << th_ID << " exit...." << endl;
}
#define PRODUCTER_NUMS 10
#define CUSTOMER_NUMS 5
int main()
{
std::thread producer[PRODUCTER_NUMS];
std::thread consumer[PRODUCTER_NUMS];
for (int i = 0; i < PRODUCTER_NUMS; i++) {
producer[i] = std::thread(ProducerTask, i + 1);
}
for (int i = 0; i < CUSTOMER_NUMS; i++) {
consumer[i] = std::thread(ConsumerTask, i + 1);
}
for (int i = 0; i < PRODUCTER_NUMS; i++) {
producer[i].join();
}
for (int i = 0; i < CUSTOMER_NUMS; i++) {
consumer[i].join();
}
system("pause");
return 0;
}
参考:
https://blog.csdn.net/zy13270867781/article/details/79231775
https://blog.csdn.net/u013390476/article/details/52067321
#include <stdio.h>
#include <mutex>
#include <queue>
#include <thread>
#include <condition_variable>
#include <atomic>
class Product;
std::mutex g_mutex;
std::condition_variable g_producter_cv;
std::condition_variable g_customer_cv;
std::queue<Product *> g_queue_product;
std::mutex m_mutex_print;
std::atomic<int> g_id = 0;
int MAX_SIZE = 10;
class Product {
public:
Product(int id) {
std::lock_guard<std::mutex> lock(m_mutex_print);
c_id = id;
printf("%ld Product is product \n", c_id);
}
void print() {
std::lock_guard<std::mutex> lock(m_mutex_print);
printf("%ld Product is custom \n", c_id);
}
private:
int c_id;
};
class Producter {
public:
void product() {
while (true) {
std::unique_lock<std::mutex> lock(g_mutex); //此处不能使用lock_guard
while (g_queue_product.size() >= MAX_SIZE) {
{
std::lock_guard<std::mutex> lock(m_mutex_print);
printf("queue is full wait \n");
}
g_producter_cv.wait(lock);
}
Product *product = new Product(++g_id);
g_queue_product.push(product);
g_customer_cv.notify_all();
lock.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
};
class Customer {
public:
void custom() {
while (true) {
{
std::unique_lock<std::mutex> lock(g_mutex);
while (g_queue_product.empty()) {
{
std::lock_guard<std::mutex> lock(m_mutex_print);
printf("queue is empty wait \n");
}
g_customer_cv.wait(lock);
}
Product *product = g_queue_product.front();
g_queue_product.pop();
if (nullptr != product) {
product->print();
delete product;
product = nullptr;
}
g_producter_cv.notify_all();
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
};
int main() {
Producter producter1;
std::thread producterThread1(&Producter::product, &producter1);
Producter producter2;
std::thread producterThread2(&Producter::product, &producter2);
Customer customer;
std::thread customerThread(&Customer::custom, &customer);
producterThread1.join();
return 0;
}