C++11新特性--并发、原子性、锁、条件变量

线程与C++

    在C++11之前的标准中,在C++语言层面是没有对线程的支持的,所以在特定平台编写(windows,linux等)跟线程相关的C++程序往往是要结合所在平台的线程相关API来操作线程,如在Windows上创建线程的API是CreateThread,在其他平台(mac、unix、linux等)通常可以用posixthread的API来创建线程即pthread_create。反观java语言,我们会发现Java的线程处理在不同平台是一致的,因为它在语言层面支持了线程也就是并发的一系列操作,开发者不必关心平台相关的细节同样可以编写出行为一致的跨平台程序。在C++11标准中,引入了线程以及并发操作所需的一切,这样使得C++同Java一样可以不关心平台的编写并发相关的代码,这所带来的便利我认为是C++11标准带来的最大好处。
    随着新标准而来的不仅仅是线程的支持,并发编程需要的一系列操作都有了语言层面的支持,如锁、原子性操作、条件变量。这些以往在C++编程中被视作系统能力的特性,在新的标准中统统那入了标准库的实现中。下面我们来分别看一下标准库对他们的支持以及它们的使用。

std::thread

    std::thread是一个类包装,它包装了所在平台的线程实现,隐藏了所有细节,对外提供了统一的语义。首先看一下它的构造函数和operator=函数:

default (1) 
thread() noexcept;
initialization (2)  
template <class Fn, class... Args>
explicit thread (Fn&& fn, Args&&... args);
copy [deleted] (3)  
thread (const thread&) = delete;
move (4)    
thread (thread&& x) noexcept;
///////////////////////////////////////////
move (1)    
thread& operator= (thread&& rhs) noexcept;
copy [deleted] (2)  
thread& operator= (const thread&) = delete;

    我们可以看到拷贝构造和相应的operator=函数被显示删除了,这意味着线程对象不可复制,只能在std::thread对象之间进行move语义的移动。这不难理解,在个平台上thread对象都属于资源型对象,而且具有唯一性,复制它没有意义,所以在语言层面C++11标准库中的thread类直接用语义屏蔽了复制对象的路径。其余常用的三个函数见下面

void join();
id get_id() const noexcept;
void detach();

    这里get_id()是统一对线程唯一标识的一个封装,隐藏了不同平台线程标识不同的细节。join和detach语义和posixthread中的语义相同,这里直接摘要一下函数说明。

Join thread
The function returns when the thread execution has completed.

This synchronizes the moment this function returns with the completion of all the operations in the thread: This blocks the execution of the thread that calls this function until the function called on construction returns (if it hasn't yet).

After a call to this function, the thread object becomes non-joinable and can be destroyed safely.
Detach thread
Detaches the thread represented by the object from the calling thread, allowing them to execute independently from each other.

Both threads continue without blocking nor synchronizing in any way. Note that when either one ends execution, its resources are released.

After a call to this function, the thread object becomes non-joinable and can be destroyed safely.

    下面直接看一段代码,很直观的可以明白std::thread的用法。

class STDThreadTest
{
public:
    static void test_thread(void *pVoid)
    {
        std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
        STDThreadTest::g_thread_unum = 2;
        std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
    }

    static void execute()
    {
//        auto lambdafunc = [](void *param) -> void
//        {
//            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
//            STDThreadTest::g_thread_unum = 2;
//            std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
//        };

        std::thread thread_test([](void *param) -> void
                                {
                                    std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
                                    STDThreadTest::g_thread_unum = 2;
                                    std::cout << "work thread:" << STDThreadTest::g_thread_unum << std::endl;
                                }, (void *) nullptr);
        thread_test.join();//如果没有join或者detach线程的话,会报异常
        std::cout << "main thread:" << STDThreadTest::g_thread_unum << std::endl;
    }

    static thread_local int g_thread_unum;
};

thread_local int STDThreadTest::g_thread_unum = 1;

    上面代码展示了std::thread的基本用法,在它构造的时候传入相应的回调函数(可以是函数、functor、lambda)和回调函数所需要的参数(任意数量)。可以看出来结合lambda函数来使用std::thread异常的方便。这个例子还用到了C++11引入的thread_local关键字,它直接封装了平台相关的TLS实现,简单好用,用法一目了然不再做解释了。

std::atomic

    这个类封装了原子操作所需要的所有操作,通过模板参数对所有基本类型进行了封装

typedef atomic<bool>               atomic_bool;
typedef atomic<char>               atomic_char;
typedef atomic<signed char>        atomic_schar;
typedef atomic<unsigned char>      atomic_uchar;
typedef atomic<short>              atomic_short;
typedef atomic<unsigned short>     atomic_ushort;
typedef atomic<int>                atomic_int;
typedef atomic<unsigned int>       atomic_uint;
typedef atomic<long>               atomic_long;
typedef atomic<unsigned long>      atomic_ulong;
typedef atomic<long long>          atomic_llong;
typedef atomic<unsigned long long> atomic_ullong;
typedef atomic<char16_t>           atomic_char16_t;
typedef atomic<char32_t>           atomic_char32_t;
typedef atomic<wchar_t>            atomic_wchar_t;

typedef atomic<int_least8_t>   atomic_int_least8_t;
typedef atomic<uint_least8_t>  atomic_uint_least8_t;
typedef atomic<int_least16_t>  atomic_int_least16_t;
typedef atomic<uint_least16_t> atomic_uint_least16_t;
typedef atomic<int_least32_t>  atomic_int_least32_t;
typedef atomic<uint_least32_t> atomic_uint_least32_t;
typedef atomic<int_least64_t>  atomic_int_least64_t;
typedef atomic<uint_least64_t> atomic_uint_least64_t;

typedef atomic<int_fast8_t>   atomic_int_fast8_t;
typedef atomic<uint_fast8_t>  atomic_uint_fast8_t;
typedef atomic<int_fast16_t>  atomic_int_fast16_t;
typedef atomic<uint_fast16_t> atomic_uint_fast16_t;
typedef atomic<int_fast32_t>  atomic_int_fast32_t;
typedef atomic<uint_fast32_t> atomic_uint_fast32_t;
typedef atomic<int_fast64_t>  atomic_int_fast64_t;
typedef atomic<uint_fast64_t> atomic_uint_fast64_t;

typedef atomic< int8_t>  atomic_int8_t;
typedef atomic<uint8_t>  atomic_uint8_t;
typedef atomic< int16_t> atomic_int16_t;
typedef atomic<uint16_t> atomic_uint16_t;
typedef atomic< int32_t> atomic_int32_t;
typedef atomic<uint32_t> atomic_uint32_t;
typedef atomic< int64_t> atomic_int64_t;
typedef atomic<uint64_t> atomic_uint64_t;

typedef atomic<intptr_t>  atomic_intptr_t;
typedef atomic<uintptr_t> atomic_uintptr_t;
typedef atomic<size_t>    atomic_size_t;
typedef atomic<ptrdiff_t> atomic_ptrdiff_t;
typedef atomic<intmax_t>  atomic_intmax_t;
typedef atomic<uintmax_t> atomic_uintmax_t;

    所以平时我们常用的基本类型的原子操作都可以通过标准库中的定义找到。来看一下它提供的函数:

Operations supported by certain specializations (integral and/or pointer, see below)
fetch_add
Add to contained value (public member function )
fetch_sub
Subtract from contained value (public member function )
fetch_and
Apply bitwise AND to contained value (public member function )
fetch_or
Apply bitwise OR to contained value (public member function )
fetch_xor
Apply bitwise XOR to contained value (public member function )
operator++
Increment container value (public member function )
operator--
Decrement container value (public member function )
atomic::operator (comp. assign.)
Compound assignments (public member function )

    绝大多数编程场景中我们只需调用operator++和operaotr--完成原子递增递减即可。这里虽然语义上是保证原子性操作的,但是还涉及到不同平台的内存模型,即所谓的“顺序一致性(memory_order)”,std::atomic提供了不同内存模型下memory_order的参数选择,不过默认值即可达到大部分我们预期的效果,有兴趣可以查阅相关的文档。更复杂的多线程变量读写可以应用锁来实现。下面来看一下C++11中的锁。

std::mutex、std::lock_guard、std::unique_lock

    std::mutex是一个互斥量的包装类,它来封装平台相关的互斥量实现,它还有一些高级实现如timed_mutex等,我们这里只介绍一下mutex。它相当于一个锁头,但是锁上它以及开启它还需要其它类的帮忙,它们组合在一起就能实现对程序某些代码进行锁互斥的作用。锁定以及开锁的工具类就是std::lock_guard和std::unique_lock。我们直接看一段代码看看他们是如何工作的。

#include <thread>
#include <mutex>
#include <iostream>


class MutexLock
{
private:
    std::mutex m_mutex;
    int m_count;
public:
    MutexLock() : m_count(0)
    {

    }
    ////////
#define MAX_COUNT 50

    static void execute()
    {
        MutexLock lockobj;
        std::thread tobj(
                [&]() -> void
                {
                    while (lockobj.m_count < MAX_COUNT)
                    {
                        try
                        {
                            std::lock_guard<std::mutex> lock(lockobj.m_mutex);
                            std::cout << "thread1" << std::endl;
                            if (lockobj.m_count >= MAX_COUNT)
                            {
                                break;
                            }
                            ++lockobj.m_count;
                        }
                        catch (...)
                        { std::cout << "lock_guard exp" << std::endl; }
                    }
                }
        );

        std::thread tobj2(
                [&]() -> void
                {
                    while (lockobj.m_count < MAX_COUNT)
                    {
                        try
                        {
                            std::unique_lock<std::mutex> lock(lockobj.m_mutex);
                            std::cout << "thread2" << std::endl;
                            if (lockobj.m_count >= MAX_COUNT)
                            {
                                break;
                            }
                            ++lockobj.m_count;
                            lock.unlock();
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                            std::cout << "unlock" << std::endl;
                        }
                        catch (...)
                        { std::cout << "unique_lock exp" << std::endl; }
                    }
                }
        );
        tobj.join();
        tobj2.join();
    }
};

    上面代码很简单,两个线程同时竞争同一个锁,得到锁的线程会检查退出条件确保退出条件在无锁判断的时候没有被其他线程改变,然后对计数器进行加1。这里两个线程用到了两种锁,首先看一下std::lock_guard。它是一个模板类,接受一个模板参数指明用的是哪一种锁(前面提到的mutex或者timed_mutex等),它不提供加锁解锁函数,加锁解锁完全靠其对象的声明周期,上面使用的例子就是while循环体内。如果想自己控制它的加锁范围可以人为加代码块({})。后面一个线程用到的是std::unique_lock,用法和std::lock_guard一样,只不过再它的基础上加入了lock和unlock函数,更灵活的控制加锁解锁的时机。

条件变量std::condition_variable

    熟悉posixthread的人应该很清楚条件变量的用法,C++11中的条件变量使用跟posix中的基本一直,不过更简单一些,有了它就可以实现对某一种状态的等待以及对满足某一种状态的唤醒。它需要跟锁一起搭配使用,这点跟posixthread是一致的。我们直接看一段代码来了解它的使用。

class ConditionalTest
{
private:
    std::mutex m_mtx;
    std::condition_variable m_cv;
    int n_counter = 0;
public:
#define COUNTER 5

   void execute()
    {
        auto lambdafunc1 = [this]() -> void
        {
            try
            {


                std::unique_lock<std::mutex> lock(m_mtx);
//            while (n_counter < 5)
//            {
//                m_cv.wait(lock);
//            }
                m_cv.wait(lock, [this]
                {
                    if (n_counter >= COUNTER)
                    {
                        return true;
                    } else
                    {
                        std::cout << "aweek but condition does not satisfied" << std::endl;
                        return false;
                    }
                });
                std::cout << "waitthread runs" << std::endl;
                std::this_thread::sleep_for(std::chrono::seconds(2));
                std::cout << "waitthread runs end" << std::endl;
            }
            catch (...)
            {}
        };

        auto lambdafunc2 = [this]() -> void
        {
            try
            {
                std::unique_lock<std::mutex> lock(m_mtx, std::defer_lock_t());
                do
                {
                    lock.lock();
                    ++n_counter;
                    std::cout << "plusthread plus one " << std::endl;
                    m_cv.notify_all();
                    lock.unlock();
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                } while (n_counter < COUNTER);

                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
                std::cout << "plusthread notify end" << std::endl;
            } catch (...)
            {}
        };

        std::thread t1(lambdafunc1), t2(lambdafunc2);
        t1.join();
        t2.join();
        std::cout << "all done counter is:" << n_counter << std::endl;
    }
};

    上面代码需要注意的地方是每次条件变量notify的时候需要unlock锁,否则锁是不会立即释放的,这样等待的条件变量就不会即时的拿到锁执行下面的语句。另外线程1条件变量等待的地方是有一层额外的判断注释的部分和两个参数版本的wait部分,这里的语义跟posix是完全一致的,就是有可能存在虚假的唤醒,所以要加一层额外的判断,C++11库提供了我们现在使用的双参数版本进行条件比对,如果不满足会继续释放锁等待条件的满足。

Generally, the function is notified to wake up by a call in another thread either
 to member notify_one or to member notify_all. But certain implementations 
may produce spurious wake-up calls without any of these functions being 
called. Therefore, users of this function shall ensure their condition for 
resumption is met.

    上面是wait函数的解释的一部分,这个防御完全同posixthread中的条件变量防御完全一致,我猜应该就是适应posixthread中条件变量的这个问题。总之我推荐总是使用wait函数两个参数的版本,这样就不会出现问题了。

最后

    通过对前面一些C++11标准库提供的关于并发编程的类的介绍,我们应该可以看出在语言层面上C++已经完全具备了并发编程需要的一切要素,在它们的基础上编写跨平台代码将更加便利,操作线程进行并发编程将变得更加容易。结合前面的介绍,最后看一段典型的生产者消费者的代码作为结束。

class ConsumerAndProducer
{
private:
    std::deque<std::time_t> m_deque; //things to be consumed
    std::condition_variable m_cv;
    std::mutex m_mutex;
    bool m_quit;
public:
    ConsumerAndProducer() : m_quit(false)
    {

    }

    static std::time_t getTimeStamp()
    {
        std::chrono::time_point<std::chrono::system_clock, std::chrono::milliseconds> tp =
                std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now());
        auto tmp = std::chrono::duration_cast<std::chrono::milliseconds>(tp.time_since_epoch());
        std::time_t timestamp = tmp.count();
        //std::time_t timestamp = std::chrono::system_clock::to_time_t(tp);
        return timestamp;
    }

    static void execute()
    {
        ConsumerAndProducer obj;

        auto consumer = [&]() -> void
        {
            while (!obj.m_quit)
            {
                try
                {
                    std::this_thread::sleep_for(std::chrono::milliseconds(1));
                    std::unique_lock<std::mutex> lock(obj.m_mutex);
                    obj.m_cv.wait(lock, [&]
                    {
                        return obj.m_deque.size() > 0;
                    });
                    while (obj.m_deque.size() > 0)
                    {
                        std::cout << "Consumed :" << obj.m_deque.front() << std::endl;
                        obj.m_deque.pop_front();
                    }
                }
                catch (...)
                {
                    std::cout << "consumer exp" << std::endl;
                }
            }
        };

        auto producer = [&]() -> void
        {
            while (!obj.m_quit)
            {
                try
                {
                    std::unique_lock<std::mutex> lock(obj.m_mutex);
                    std::time_t time = getTimeStamp();
                    std::cout << "Produce :" << time << std::endl;
                    obj.m_deque.push_back(time);
                    while (time % 3)
                    {
                        std::this_thread::sleep_for(std::chrono::milliseconds(100));
                        time = getTimeStamp();
                        std::cout << "Produce :" << time << std::endl;
                        obj.m_deque.push_back(time);
                    }

                    obj.m_cv.notify_all();
                    lock.unlock();
                }
                catch (...)
                {
                    std::cout << "producer exp" << std::endl;
                }
            }
        };

        auto timer = [&]() -> void
        {
            std::this_thread::sleep_for(std::chrono::seconds(3));
            obj.m_quit = true;
        };

        std::thread t1(consumer), t2(producer), t3(timer);
        t1.join();
        t2.join();
        t3.join();
        std::cout << "all done remain:" << obj.m_deque.size() << std::endl;
    }
};

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,723评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,003评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,512评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,825评论 1 290
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,874评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,841评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,812评论 3 416
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,582评论 0 271
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,033评论 1 308
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,309评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,450评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,158评论 5 341
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,789评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,409评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,609评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,440评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,357评论 2 352

推荐阅读更多精彩内容