C++并发编程之 thread

1 并发编程介绍

1.1 thread

C++11 标准引入了 thread,用于创建和控制线程,引入 mutex 和 condition_variable 用于线程间资源访问的互斥和同步,以及 lock_guard、unique_lock管理互斥锁的加锁解锁。C++开发者终于不用自己去封装Posix 线程库了(笑)。

1.2 jthread

在 C++20 标准中引入了 jthread(joining thread),这是对标准库中的 thread 的补充。

jthreadthread 做了什么补充?
  • 自动 joining
    • std::thread 需要显式调用 join()detach() 方法来管理线程的生命周期。如果忘记调用 join(),则会导致资源泄漏,因为线程将在程序退出前一直运行。
    • jthread 在对象销毁时自动调用 join() 方法,确保线程在其作用域结束时被正确终止。
  • 异常安全
    • std::thread 在线程抛出异常时需要手动处理,否则异常可能会丢失。
    • jthread 自动捕获并重新抛出线程中的异常,确保异常不会丢失,并且可以被主线程捕获和处理。
  • 同步机制
    • std::thread 没有内置的同步机制,需要配合其他同步原语(如互斥锁、条件变量等)来实现线程间的同步。
    • jthread 提供了内置的同步机制,使用 stop_tokenstop_source管理线程,使得线程之间的同步更加方便。

2 适用场景

2.1 thread

  • 线程池,线程直接用 thread,使用C++20的 stop_tokenstop_source管理线程运行。
  • 简单的线程,线程执行任务完成后自动退出(非循环)。

2.2 jthread

  • 需要线程外部停止此线程。
  • 简单的线程,线程执行任务完成后自动退出(非循环)。 // 最好还是用 thread

需要外部停止线程运行的就直接交给 jthread,不用再用 atomic变量表示线程是否停止(不用自己造轮子了)。
如果不需要外部主动停止,最好还是用 thread,用不到 jthread的同步机制.

3 使用场景代码示例

实际代码使用时,有两点需要注意:

  • 如果只需要加锁和解锁,用 lock_guard即可,需要互斥量和条件变量搭配使用时,才需要 unique_lock,因为条件变量的 wait参数需要 unique_lock
  • 如果jthread需要外部停止,最好线程函数带 stop_token 参数,该参数放在线程函数的第一个位置。

3.1 线程执行完任务自动停止

两个线程交替打印a和b,各打印10次,输出结果:abababababababababab.

#include <mutex>
#include <thread>
#include <iostream>
#include <condition_variable>

std::mutex mtx;
std::condition_variable cv_thread_a;
std::condition_variable cv_thread_b;
bool is_thread_a = true;

void print_a(int count)
{
    for(int i = 0; i < count; ++i)
    {
        std::unique_lock<std::mutex> lock(mtx);
        cv_thread_a.wait(lock, []{ return is_thread_a;});
        std::cout << 'a';
        is_thread_a = false;
        cv_thread_b.notify_one();
    }
}

void print_b(int count)
{
    for(int i = 0; i < count; ++i)
    {
        std::unique_lock<std::mutex> lock(mtx);
        cv_thread_b.wait(lock, []{ return !is_thread_a;});
        std::cout << 'b';
        is_thread_a = true;
        cv_thread_a.notify_one();
    }
}

int main(int argc, char *argv[])
{
    int count = 10;
    std::thread thread_a(print_a, count);
    std::thread thread_b(print_b, count);
    thread_a.join();
    thread_b.join();

    std::cout << std::endl;
    return 0;
}

3.2 线程外部可以停止当前线程

生产者消费者模型,演示在主线程停止生产者、消费者线程。

#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <iostream>
#include <stop_token>
#include <condition_variable>

template <typename T>
class BlockingQueue
{
public:
    BlockingQueue(int size) : m_size(size) {}

    void push(T&& t)
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        m_not_full.wait(lock, [this]{return m_queue.size() < m_size;});
        std::cout << "push data: " << t << std::endl;
        m_queue.push(t);
        m_not_empty.notify_one();
    }

    void pop()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        if(m_not_empty.wait_for(lock, std::chrono::milliseconds(30), [this]{return !m_queue.empty();}))
        {
            T t = m_queue.front();
            std::cout << "pop data: " << t << std::endl;
            m_queue.pop();
            m_not_full.notify_one();
        }
    }

    bool empty()
    {
        std::unique_lock<std::mutex> lock(m_mutex);
        return m_queue.empty();
    }

private:
    BlockingQueue(const BlockingQueue&) = delete;
    BlockingQueue& operator=(const BlockingQueue&) = delete;

private:
    int m_size;
    std::queue<T> m_queue;
    // 一个互斥锁,两个条件变量
    std::mutex m_mutex;
    std::condition_variable m_not_full;
    std::condition_variable m_not_empty;
};

void producer(std::stop_token stop_token, BlockingQueue<int> &q)
{
    int data = 0;
    while(!stop_token.stop_requested())
    {
        q.push(data++);
        std::this_thread::sleep_for(std::chrono::milliseconds(300));
    }
}

void consumer(std::stop_token stop_token, BlockingQueue<int> &q)
{
    while(!stop_token.stop_requested())
    {
        q.pop();
    }
    // 接收到stop请求时,可能q里还有数据,现在直接丢掉不处理了
    // 实际业务里, 如果消费者线程只有一个,则while(!q.empty())非空直接取完即可
    // 如果有多个消费者线程,则需要把q.empty() 和 q.pop()搞成原子操作才行,不然这里取数据有数据竞争
}

int main(int argc, char *argv[])
{
    BlockingQueue<int> q(10);
    std::jthread p1(producer, std::ref(q));
    std::jthread p2(producer, std::ref(q));
    std::jthread c1(consumer, std::ref(q));

    std::this_thread::sleep_for(std::chrono::seconds(5));
    p1.request_stop(); // std::stop_source ssp1 = p1.get_stop_source(); ssp1.request_stop();
    p2.request_stop(); // std::stop_source ssp2 = p1.get_stop_source(); ssp2.request_stop();
    c1.request_stop(); // std::stop_source ssc1 = p1.get_stop_source(); ssc1.request_stop();

    return 0;
}

3.3 线程池

线程池代码示例:

#include <queue>
#include <mutex>
#include <thread>
#include <chrono>
#include <vector>
#include <string>
#include <iostream>
#include <stop_token>
#include <functional>
#include <condition_variable>

class ThreadPool
{
public:
    ThreadPool(int size)
    {
        for(int i = 0; i < size; ++i)
        {
            threads.emplace_back(&ThreadPool::threadFunc, this, stopper.get_token());
        }
    }

    void threadFunc(std::stop_token token)
    {
        while(!token.stop_requested())
        {
            std::function<void()> task;
            {
                std::unique_lock lock(mtx);
                if(cv.wait_for(lock, std::chrono::milliseconds(30), [this]{ return !tasks.empty(); }))
                {
                    task = std::move(tasks.front());
                    tasks.pop();
                }else
                {
                    continue;
                }
            }
            task();
        }
        dealRemainTask(); // 接收到stop请求后,处理剩余任务
    }

    void dealRemainTask()
    {
        std::function<void()> task;
        {
            std::unique_lock lock(mtx);
            std::cout << "deal remain task!" << std::endl;
            while(!tasks.empty())
            {
                task = std::move(tasks.front());
                tasks.pop();
                task(); // 处理最后的任务就不考虑锁的时间长了
            }
        }
    }

    void enqueueTask(std::function<void()> &&task)
    {
        if(!stopper.stop_requested())
        {
            std::unique_lock lock(mtx);
            tasks.push(task);
        }
    }

    bool request_stop() { return stopper.request_stop(); }

    ~ThreadPool()
    {
        if(!stopper.stop_requested())
        {
            stopper.request_stop();
        }
        cv.notify_all();
        for(auto &thread: threads)
        {
            thread.join();
        }
    }

private:
    ThreadPool(const ThreadPool &) = delete;
    ThreadPool &operator=(const ThreadPool &) = delete;
    ThreadPool(ThreadPool &&) = delete;
    ThreadPool &operator=(ThreadPool &&) = delete;

private:
    std::mutex mtx;
    std::condition_variable cv;
    std::stop_source stopper;
    std::vector<std::thread> threads;
    std::queue<std::function<void()>> tasks;
};

void task(const std::string &name)
{
    std::cout << "hello, " << name << "!" << std::endl;
}

int main(int argc, char *argv[])
{
    ThreadPool threadPool(5);

    for(int i = 0; i < 100; ++i)
    {
        std::string name = "name+" + std::to_string(i);
        threadPool.enqueueTask(std::bind(task, name));
    }

    threadPool.request_stop();
    return 0;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容