1 并发编程介绍
1.1 thread
C++11 标准引入了 thread,用于创建和控制线程,引入 mutex 和 condition_variable 用于线程间资源访问的互斥和同步,以及 lock_guard、unique_lock管理互斥锁的加锁解锁。C++开发者终于不用自己去封装Posix 线程库了(笑)。
1.2 jthread
在 C++20 标准中引入了 jthread(joining thread),这是对标准库中的 thread 的补充。
jthread 对 thread 做了什么补充?
-
自动 joining:
-
std::thread需要显式调用join()或detach()方法来管理线程的生命周期。如果忘记调用join(),则会导致资源泄漏,因为线程将在程序退出前一直运行。 -
jthread在对象销毁时自动调用join()方法,确保线程在其作用域结束时被正确终止。
-
-
异常安全:
-
std::thread在线程抛出异常时需要手动处理,否则异常可能会丢失。 -
jthread自动捕获并重新抛出线程中的异常,确保异常不会丢失,并且可以被主线程捕获和处理。
-
-
同步机制:
-
std::thread没有内置的同步机制,需要配合其他同步原语(如互斥锁、条件变量等)来实现线程间的同步。 -
jthread提供了内置的同步机制,使用stop_token和stop_source管理线程,使得线程之间的同步更加方便。
-
2 适用场景
2.1 thread
- 线程池,线程直接用
thread,使用C++20的stop_token和stop_source管理线程运行。 - 简单的线程,线程执行任务完成后自动退出(非循环)。
2.2 jthread
- 需要线程外部停止此线程。
- 简单的线程,线程执行任务完成后自动退出(非循环)。 // 最好还是用 thread
需要外部停止线程运行的就直接交给 jthread,不用再用 atomic变量表示线程是否停止(不用自己造轮子了)。
如果不需要外部主动停止,最好还是用 thread,用不到 jthread的同步机制.
3 使用场景代码示例
实际代码使用时,有两点需要注意:
- 如果只需要加锁和解锁,用
lock_guard即可,需要互斥量和条件变量搭配使用时,才需要unique_lock,因为条件变量的wait参数需要unique_lock。 - 如果jthread需要外部停止,最好线程函数带
stop_token参数,该参数放在线程函数的第一个位置。
3.1 线程执行完任务自动停止
两个线程交替打印a和b,各打印10次,输出结果:abababababababababab.
#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>
std::mutex mtx;
std::condition_variable cv_thread_a;
std::condition_variable cv_thread_b;
bool is_thread_a = true;
void print_a(int count)
{
for(int i = 0; i < count; ++i)
{
std::unique_lock<std::mutex> lock(mtx);
cv_thread_a.wait(lock, []{ return is_thread_a;});
std::cout << 'a';
is_thread_a = false;
cv_thread_b.notify_one();
}
}
void print_b(int count)
{
for(int i = 0; i < count; ++i)
{
std::unique_lock<std::mutex> lock(mtx);
cv_thread_b.wait(lock, []{ return !is_thread_a;});
std::cout << 'b';
is_thread_a = true;
cv_thread_a.notify_one();
}
}
int main(int argc, char *argv[])
{
int count = 10;
std::thread thread_a(print_a, count);
std::thread thread_b(print_b, count);
thread_a.join();
thread_b.join();
std::cout << std::endl;
return 0;
}
3.2 线程外部可以停止当前线程
生产者消费者模型,演示在主线程停止生产者、消费者线程。
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
#include <stop_token>
#include <condition_variable>
template <typename T>
class BlockingQueue
{
public:
BlockingQueue(int size) : m_size(size) {}
void push(T&& t)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_not_full.wait(lock, [this]{return m_queue.size() < m_size;});
std::cout << "push data: " << t << std::endl;
m_queue.push(t);
m_not_empty.notify_one();
}
void pop()
{
std::unique_lock<std::mutex> lock(m_mutex);
if(m_not_empty.wait_for(lock, std::chrono::milliseconds(30), [this]{return !m_queue.empty();}))
{
T t = m_queue.front();
std::cout << "pop data: " << t << std::endl;
m_queue.pop();
m_not_full.notify_one();
}
}
bool empty()
{
std::unique_lock<std::mutex> lock(m_mutex);
return m_queue.empty();
}
private:
BlockingQueue(const BlockingQueue&) = delete;
BlockingQueue& operator=(const BlockingQueue&) = delete;
private:
int m_size;
std::queue<T> m_queue;
// 一个互斥锁,两个条件变量
std::mutex m_mutex;
std::condition_variable m_not_full;
std::condition_variable m_not_empty;
};
void producer(std::stop_token stop_token, BlockingQueue<int> &q)
{
int data = 0;
while(!stop_token.stop_requested())
{
q.push(data++);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
}
}
void consumer(std::stop_token stop_token, BlockingQueue<int> &q)
{
while(!stop_token.stop_requested())
{
q.pop();
}
// 接收到stop请求时,可能q里还有数据,现在直接丢掉不处理了
// 实际业务里, 如果消费者线程只有一个,则while(!q.empty())非空直接取完即可
// 如果有多个消费者线程,则需要把q.empty() 和 q.pop()搞成原子操作才行,不然这里取数据有数据竞争
}
int main(int argc, char *argv[])
{
BlockingQueue<int> q(10);
std::jthread p1(producer, std::ref(q));
std::jthread p2(producer, std::ref(q));
std::jthread c1(consumer, std::ref(q));
std::this_thread::sleep_for(std::chrono::seconds(5));
p1.request_stop(); // std::stop_source ssp1 = p1.get_stop_source(); ssp1.request_stop();
p2.request_stop(); // std::stop_source ssp2 = p1.get_stop_source(); ssp2.request_stop();
c1.request_stop(); // std::stop_source ssc1 = p1.get_stop_source(); ssc1.request_stop();
return 0;
}
3.3 线程池
线程池代码示例:
#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <vector>
#include <string>
#include <iostream>
#include <stop_token>
#include <functional>
#include <condition_variable>
class ThreadPool
{
public:
ThreadPool(int size)
{
for(int i = 0; i < size; ++i)
{
threads.emplace_back(&ThreadPool::threadFunc, this, stopper.get_token());
}
}
void threadFunc(std::stop_token token)
{
while(!token.stop_requested())
{
std::function<void()> task;
{
std::unique_lock lock(mtx);
if(cv.wait_for(lock, std::chrono::milliseconds(30), [this]{ return !tasks.empty(); }))
{
task = std::move(tasks.front());
tasks.pop();
}else
{
continue;
}
}
task();
}
dealRemainTask(); // 接收到stop请求后,处理剩余任务
}
void dealRemainTask()
{
std::function<void()> task;
{
std::unique_lock lock(mtx);
std::cout << "deal remain task!" << std::endl;
while(!tasks.empty())
{
task = std::move(tasks.front());
tasks.pop();
task(); // 处理最后的任务就不考虑锁的时间长了
}
}
}
void enqueueTask(std::function<void()> &&task)
{
if(!stopper.stop_requested())
{
std::unique_lock lock(mtx);
tasks.push(task);
}
}
bool request_stop() { return stopper.request_stop(); }
~ThreadPool()
{
if(!stopper.stop_requested())
{
stopper.request_stop();
}
cv.notify_all();
for(auto &thread: threads)
{
thread.join();
}
}
private:
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool &operator=(ThreadPool &&) = delete;
private:
std::mutex mtx;
std::condition_variable cv;
std::stop_source stopper;
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
};
void task(const std::string &name)
{
std::cout << "hello, " << name << "!" << std::endl;
}
int main(int argc, char *argv[])
{
ThreadPool threadPool(5);
for(int i = 0; i < 100; ++i)
{
std::string name = "name+" + std::to_string(i);
threadPool.enqueueTask(std::bind(task, name));
}
threadPool.request_stop();
return 0;
}