c++多线程03

async

异步

  • 它的概念这里就不谈了, 本质上就是开线程, 一般 <font color=red>启动一个线程的入口函数</font>是==没有返回值的==, 但是并不是说不可以有
int t(){
    cout << "work...\n";

    return 4;
}

int main(int arg, char** args){
    // t函数被当作成了 子线程的入口函数, 在main函数中不可能
    /// 直接拿到 t的返回值, 因为t不是main调用的, 是os调用的
    thread t_a(t);
    
    t_a.join();
    return 0;
}
/** 
    如果想使 t作为一个线程函数, 并在main中拿到t()的返回值
    则可以使用全局变量
        在t()被调度后, 在该函数内部用全局变量记录
        然后在main函数中的join后, 获取全局变量

    
    但这一切不用自己来编写, STL已经写好了, 而且写的更好

    下面介绍std::async和std::future
*/


std::async的基本用法

#include<future>            //引用头文件
using namespace std;            

int func() {
    char buffer[8]{};
    sprintf(buffer, "%d\n", this_thread::get_id());
    cout.write(buffer,strlen(buffer));
    this_thread::sleep_for(chrono::milliseconds(4000));
    cout.write(buffer,strlen(buffer));

    return 88;
}


int main() {
    auto result = std::async(func);
    cout << typeid(result).name() << endl;
    cout << "main over\n";
    return 0;
}

/** 
    class std::future<int>          _1
    3224                            _2
    main over                       _3      等待4秒
    3224                            _4

    
    解析:
        _1:
            std::future是 std::async(func)返回的类型
            可以看出返回的是 future<int>
            
        _2:
            cpu调度到子线程(func)

        _3:
            回到主线程, 但func在睡眠

        _4:
            func睡眠后, 继续打印
     
     从打印可以看出, 虽然main执行到了 _3, 但还是在被cpu阻塞住,等待func完成


     问题:
        明明没有在 "main over" 后面对线程加上 join, 为什么主线程会等待子线程?

     下面看下源码(部分)  
*/


std::async的源码探究

/** 
    用户调用代码
*/
int func() {
    char buffer[8]{};
    sprintf(buffer, "%d\n", this_thread::get_id());
    cout.write(buffer,strlen(buffer));
    this_thread::sleep_for(chrono::milliseconds(4000));
    cout.write(buffer,strlen(buffer));

    return 88;
}


int main() {
    auto result = std::async(func);
    cout << typeid(result).name() << endl;
    cout << "main over\n";
    return 0;
}



/**
    开始源码(VS2019)
*/


/** 
    async函数模板
    
    需要说明的是, 这个函数是被 对外接口的async调用的
        如上面的 async(func)后, 会在内部继续调用 async(launch::async | launch::deferred, func);
*/
template <class _Fty, class... _ArgTypes>
//返回类型
////// 通过传递进来的模板参数 traits出callable的返回类型
///////     上面func的返回类型是int, 所以async, 返回的是 future<int>
_NODISCARD future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>>       

////参数列表, _ArgTypes对应到上面的调用, 此时已经是空了
async(
    launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args) {

    // traits 出返回类型(int)
    using _Ret   = _Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>;

    // 其实就是int
    using _Ptype = typename _P_arg_type<_Ret>::type;

    /// 构造_Promise<int>对象 _Pr
    _Promise<_Ptype> _Pr(_Get_associated_state<_Ret>
                            (   _Policy, 
                                _Fake_no_copy_callable_adapter<_Fty, _ArgTypes...>
                                (
                                    _STD forward<_Fty>(_Fnarg), 
                                    _STD forward<_ArgTypes>(_Args)...
                                )
                            )
                        );
    /** 
        构造_Pr对象时, 先后调用了:
            1. _Fake_no_copy_callable_adapter(func)      _tuple_adapter(对象)
            
            2. _make_tuple_adapter 作为_Get_associated_state()的第2个参数




        _Fake_no_copy_callable_adapter()的作用:(它的源码就不看了)
            它是一个类, 构造函数会将可变参数做成一个tuple, 用来记录 callable和所有的参数
                这一点很像之前 thread的ctor, 过程是一样的

            在这里 形成了 tuple<int(void)>(func)  
                其中模板指定的类型(int())是 int func_name(void) 形式的函数,这个在c++中是允许的, functinal<type> 中就有类似的用法

            同时重载了 (), 函数内部会在一系列的调用,并将tuple<int()>(func) 传递给 invoke函数
                invoke在thread在构造函数中探究过
            这要样的目的是生成 func() 的调用代码

        也就是说 可以通过 _tuple_adapter这个对象来直接  _tuple_adapter(), 就可以间接调用 func, 并且有返回值(int)



        
        _Get_associated_state()的作用:
            它是一个函数, 根据第1个参数_Policy, 决定要不要开线程
            返回一个类的实例的指针, 在这里返回的是  _Associated_state<int>*

            ps: 在这个函数的调用过程中会开线程(可能开,原因后面说), 执行func(), 获取返回值, 下面看它的源码
            
    */

    return future<_Ret>(_Pr._Get_state_for_future(), _Nil());
}









/** 
    _Get_associated_state
*/
template <class _Ret, class _Fty>
// 返回类型, 实际上是 _Associated_state<int>*
_Associated_state<typename _P_arg_type<_Ret>::type>*        

// 第2个参数是上面的 _tuple_adapter(tuple<int()>(func))
_Get_associated_state(launch _Psync, _Fty&& _Fnarg) { 
    switch (_Psync) { 
    case launch::deferred:
        return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    case launch::async: 

    // 从前面传递过来的是 deferred|async, 所以会到这里来创建线程, 接着下面的源码
    ////    实际上返回的是 _Task_async_state<int> *
    default:
        return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    }
}










/** 
    _Task_async_state<>
*/
template <class _Rx>        //_Rx是int
class _Task_async_state : public _Packaged_state<_Rx()> {
public:

    /// 即 _Packaged_state<int()>
    ////// 这里取的是 返回类型(), 和 用户的 func(类型是 int()), 没有关系
    //////// 前面是将 func的返回类型传递到这里, 直接手动加上()
    using _Mybase     = _Packaged_state<_Rx()>; 
    using _State_type = typename _Mybase::_State_type;

    /** 
        构造函数
            将 _tuple_adapter 传递给了父类, 最后的结果是 引用了 _tuple_adapter对象
            
            接着ctor的内部 调用了 全局的 异步任务, 问题就在这里

            这里并没有用 c++11本身的thread类, ::Concurrency::create_task 这个函数是调用
            内核(这里是windows)的接口, 所以在上面一节中的问题, 就是这个函数造成的
                会让main函数在结束前等待子线程


        从这里可以看出, 外界调用async后, 会调用内核的异步接口, 内核在创建任务后回调时
            会 this->_Call_immediate(); 
        这个函数是在 父类中定义的, 即 _Packaged_state<int()>中的_Call_immediate()函数
        该函数内部就会 通过 _tuple_adapter() 间接调用到 fun(), 并取得返回值
        
        ps: create_task这个函数是当前os根据整个cpu的并发环境来决定要不要开线程, 后面都是认定会开新线程

        下一个函数是_Packaged_state的_Call_immediate()函数
    */




    template <class _Fty2>
    _Task_async_state(_Fty2&& _Fnarg) : _Mybase(_STD forward<_Fty2>(_Fnarg)) {
        _Task = ::Concurrency::create_task([this]() { // do it now
            this->_Call_immediate();
        });

        this->_Running = true; 
    }

    /** 
        注意这里返回了一个 _Task, 它是一个类, 源码就不看了, 它有一个函数 wait()
        当其他线程通过当前类的对象(_Task_async_state<int>)获取 异步任务的返回值(即上面的 int func()的返回值)
        时, 可能 _Task还没有执行完毕, 所以用_Tast.wait()可以让主线程等到异步任务完成
    */
    ...

};








/** 
    _Packaged_state的_Call_immediate()函数

    注意在前面调用的时候是没有传递参数的
*/
void _Call_immediate(_ArgTypes... _Args) { // call function object
    try{        //_TRY_BEGIN
        
        /** 
            _Fn就是引用的 _tuple_adapter, 这里:
                    _Fn(_STD forward<_ArgTypes>(_Args)...)
           就是通过 _tuple_adapter来间接调用到 用户的 int func() 
           
           并且将返回值 传递给_Set_vlaue这个函数

           _Set_value又在当前类(_Packaged_state<int>)的父类(_Associated_state<int>)中定义
           下面继续源码
                
        */
        this->_Set_value(_Fn(_STD forward<_ArgTypes>(_Args)...), false);

    }catch(..){         //_CATCH_ALL

        this->_Set_exception(_STD current_exception(), false);

    }                   //_CATCH_END
}












/** 
    _Associated_state<int>的 _Set_value()
    第2个参数由上面传递的是false
    第1个参数是在本质上是 用户的 func()的返回值(int)
*/
void _Set_value(const _Ty& _Val, bool _At_thread_exit) {
    unique_lock<mutex> _Lock(_Mtx);
    _Set_value_raw(_Val, &_Lock, _At_thread_exit);
}

/** 
    由上面的_Set_value调用, 可以看到在当前异步任务中
    对 用户结果存储时, 会加锁

    并且只会存储一次

    该类有一个public的方法 
        virtual _Ty& _Get_value(bool _Get_only_once) 
    将返回值给外界(引用)
*/
void _Set_value_raw(const _Ty& _Val, unique_lock<mutex>* _Lock,
    bool _At_thread_exit) { // store a result while inside a locked block

    /// 只存储一次
    if (_Has_stored_result) {
        _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
    }

    // 存储返回值
    _Result = _Val;

    // 设置 _Has_stored_result 的true, 表示值已经存储, 通知get_value()
    _Do_notify(_Lock, _At_thread_exit);
}











/** 
    接下来回到_Get_associated_state(在前面), 为了方便, 再将源码放出来
*/
template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>*        
_Get_associated_state(launch _Psync, _Fty&& _Fnarg) { 
    switch (_Psync) { 
    case launch::deferred:
        return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    case launch::async: 

    default:
        /// 经过上面的_Task_async_state过程, 目的就是创建_Task_async_state<int>*
        ////// 只是这个过程中, 内核创建了异步线程可能开始调度了
        return new _Task_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));
    }
}










/** 
    再回到最初的async函数, 再将源码放出来
*/
template <class _Fty, class... _ArgTypes>
_NODISCARD future<_Invoke_result_t<decay_t<_Fty>, decay_t<_ArgTypes>...>> async(
    launch _Policy, _Fty&& _Fnarg, _ArgTypes&&... _Args) {

    ....


    /** 
        构造 _Promise对象:
             _Promise(_Associated_state<int>* _State_ptr) : _State(_State_ptr, false), 
                                                            _Future_retrieved(false) {}
        
             其中_Assoc_state<int*>就是上面接着的_Task_async_state<int>*
                明显是用了多态, 父类指针接收子类的指针

             _State的类型是 _State_manager<_Ty> _State, 也是future的父类
             在构造_State时, 第2个参数表示可以获取多次
    */
    _Promise<_Ptype> _Pr(
        _Get_associated_state<_Ret>(_Policy, _Fake_no_copy_callable_adapter<_Fty, _ArgTypes...>(
                                                 _STD forward<_Fty>(_Fnarg), _STD forward<_ArgTypes>(_Args)...)));

    


    /** 
        通过 _Promise对象取出 future<int>的父类_State_manager<int>, 会调用 future的拷贝构造:
            future(const _Mybase& _State, _Nil) : _Mybase(_State, true) {}

        可以看出, 第2个参数是不允许再次获取的
        future内部会引用到上面说的_Associated_state<int>*(实际是指向的是子类_Task_async_state<int>)
        
        ps:第2个参数好像在说外界使用future的get()不能超过2次, 事实的确如此, 连续调用get()会有异常, 但
        异常的原因并不是第2个参数为false, 具体看后面的 std::future章节中的成员函数get()
    */
    return future<_Ret>(_Pr._Get_state_for_future(), _Nil());
}









/** 
    当外界拿到 future后, 可以调用 get()去获取值

        future自己提供了 get(), 但内部是调用父类(_State_manager<int>)的 _Get_value()
        _State_manager<int>的_Get_value内部调用:
            _Task_async_state<int>* -> _Get_value()
        这个 _Get_value是一个virtual, 所以会先调用到子类(_Task_async_state<int>), 再调用到父类(_Associated_state<int>)
*/
/// 子类_Task_async_state<int> 的 _Get_value
virtual _State_type& _Get_value(bool _Get_only_once) override {
    /** 
        调用 异步任务的 wait(), 等待异步任务结束
        也就是说, _Task.wait()后, 返回值已经拿到
        
        ps:如果异步任务已经结束, 这里wait()就直接过去了
    */
    _Task.wait();  

    /// 调用父类_Associated_state<int>的_Get_value
    return _Mybase::_Get_value(_Get_only_once);
}

/// 父类_Associated_state的 _Get_value  _Ty& 是int
virtual _Ty& _Get_value(bool _Get_only_once) { // 参数由future传递过来的为true

    unique_lock<mutex> _Lock(_Mtx);

    // 如果已经获取过(_Retrieved), 并且只能获取一次(_Get_only_once), 则抛异常
    if (_Get_only_once && _Retrieved) {
        _Throw_future_error(make_error_code(future_errc::future_already_retrieved));
    }

    //存储的过程出现问题
    if (_Exception) {
        _Rethrow_future_exception(_Exception);
    }

    _Retrieved = true;
    _Maybe_run_deferred_function(_Lock);
    while (!_Ready) {
        _Cond.wait(_Lock);
    }

    if (_Exception) {
        _Rethrow_future_exception(_Exception);
    }

    return _Result;

    /** 
        需要说明的是:
            在当前的案例中, 是通过异步任务(也就是当前类 _Associated_state<int>的子类_Task_async_state<int>)
            调用 _Get_value, _Ready一定为true, 因为在子类中调用了 _Task.wait()拿到了返回值 
            拿到返回值时, _Ready已经在存储时被设置成了true, 所以这里并不会 进入while中

            但为什么这段父类的代码要加锁, 并且要判断wait呢?

            原因是当前类(_Associated_state)可能被单独拿出去用, 而外界可能是多线程调用
            所以_Associated_state有自己的 互斥锁机制
    */
}








/** 
    总结:
        使用std::async并不是使用 thread 类去创建一个线程
        而是直接使用了内核, 创建异步任务(os自己决定要不要开,其实本质还是os的线程), 在异步任务中调用用户的函数
        将值存储起来, 返回给外界的是future对象, 可以通过future的get()获取, 但:
            > 可能阻塞, 并且获取到值后, 不能再获取


    ps: 本人就不写程序验证了, 后面是 std::futrue和std::async() 的用法
*/


验证

int func(int a) {
    char buffer[8]{};
    sprintf(buffer, "%d\n", this_thread::get_id());
    cout.write(buffer,strlen(buffer));
    this_thread::sleep_for(chrono::milliseconds(2000));
    cout.write(buffer,strlen(buffer));

    return 88;
}


int main() {
    auto result = std::async(func, 2);
    this_thread::sleep_for(chrono::milliseconds(4000));

    // 到这里来基本可以肯定, 异步任务已经返回了值, 并存储好了

    /// 第1次获取, 没有问题
    result.get();   
    cout << "main over\n";

    /// 第2次获取, 直接异常, 异常的原因在 future章节中的成员函数get()
    result.get();
    return 0;
}


指定参数的async

  • 上面调用中, 没有指定 <font color=red>async的==模式==参数(第1个)</font>, 默认是开线程的, 可以指定相关的值:
    • launch:async
    • launch:defer
    • launch:async | lauch:deferred
/** 
    其中 async | deferred是默认的
        单独传递async的效果和默认时一样
        ps: async 和 async | deferred 才会调用create_tsak
            而create_task本身开不开线程是由os自己决定的,但这里探讨的是会开新线程

    下面来看 deferred时的情况
*/
template <class _Ret, class _Fty>
_Associated_state<typename _P_arg_type<_Ret>::type>* _Get_associated_state(launch _Psync, _Fty&& _Fnarg) {
    switch (_Psync) { 
    case launch::deferred:
        return new _Deferred_async_state<_Ret>(_STD forward<_Fty>(_Fnarg));

        ...
    }
}

/** 
    代码的流程和上面的async的源码分析一样
    可以看到, 返回指针指向的是 _Deferred_async_state<int>
    _Deferred_async_state<int> 继承_Packaged_state<int>
    _Packaged_state<int> 继承_Associated_state<int>
    
    async返回的future中引用的是_Deferred_async_state<int>*
    
    当future调用 get(), 则会通过_Deferred_async_state<int>*调用
    _Get_value
    _Deferred_async_state<int>并没有_Get_value, 所以调用到父类
    即_Associated_state<int>的_Get_value
*/
virtual _Ty& _Get_value(bool _Get_only_once) {
    unique_lock<mutex> _Lock(_Mtx);
    if (_Get_only_once && _Retrieved) {
        _Throw_future_error(make_error_code(future_errc::future_already_retrieved));
    }

    if (_Exception) {
        _Rethrow_future_exception(_Exception);
    }

    _Retrieved = true;


    /** 
        这个方法中会调用到 _Deferred_async_state<int>的相关方法
        然后在当前线程中调用 func, 获取返回值
    */
    _Maybe_run_deferred_function(_Lock);




    while (!_Ready) {
        _Cond.wait(_Lock);
    }

    if (_Exception) {
        _Rethrow_future_exception(_Exception);
    }

    return _Result;
}


/** 
    总结:
        当async传递 deferred的时候
            1. 不会开线程
            2. 不会主动调用 用户的方法
            3. 外界只有通过 furture.get()后, 才会在当前线程中调用 用户的方法去获取返回值 
            4. 同样只能获取1次
*/


std::future的wait方法

  • 直接看源码吧
/** 
    当是异步任务时: 
        根据上面的源码探究, 这里的_Assoc_state实际上就是
        _Task_async_state<int>*, 所以会调用 _Task.wait()
        阻塞等待异步任务完成

    当是 deferred时:
        由于_Assoc_state指向的是_Deferred_async_state<int>*
        但_Deferred_async_state并没有实现wait(), 所以调用到了
        父类_Associated_state<int>的wait

        在父类中的wait会调用 用户的函数获取返回值, 所以后续的get()会立即返回
*/
void wait() const { // wait for signal
    if (!valid()) {
        _Throw_future_error(make_error_code(future_errc::no_state));
    }

    _Assoc_state->_Wait();
}


async的callable

/** 
    在上面传递的是普通的函数, 实际上也可以传递
        lambda, functor, 成员函数
    格式和 thread的ctor一样
*/


packaged_task

概念

  • 用来 <font color=red>包装callable</font>, 作用是 <font color=red>存储异步调用中的值(==返回值==)</font>


  • 既然它是用来 <font color=red>包装callable</font>的, 则 <font color=red>它本身就可以当作callable</font>(==不用想又是个adapter==)


  • 从上面的描述中, 它很像==std::function==, 但它内部是 <font color=red>将结果封装在 future中</font>


  • 一个<font color=red>packaged_task</font>对象包含==2部分==的内容:
    • callable
    • 异步任务的值(==通过future==)


成员函数

构造函数
    packaged_task(){}
    
    tempalte<typename Callable>
    explicit packaged_task(Callable&& callable);
    packaged_task(packaged_task&& other); 

    // 拷贝构造(左值), 已经delete了


拷贝赋值
    packaged_task& operator= (packaged_task&& rhs);
    同样的没有左值的重载


重要的成员函数
    get_future()    获取future对象
    operator()      内部通过callable调用函数
    reset()         重置future(可以再次用future的get)


源码

/** 
    外界调用
*/
int func(const A& a) {
    printf("func: %p\n", &a);

    char buffer[8]{};
    sprintf(buffer, "%d\n", this_thread::get_id());
    cout.write(buffer,strlen(buffer));

    this_thread::sleep_for(chrono::milliseconds(2000));

    cout.write(buffer,strlen(buffer));

    return 88;
}


int main() {
    printf("main thread:%d \n", this_thread::get_id());

    packaged_task<int(const A&)> callable(func);
    printf("main callable:%p \n",&callable);

    A a;
    printf("main: %p\n", &a);
    
    callable(std::move(a)); 
    //callable(std::move(a));  //连续调用error

    auto f = callable.get_future();
    cout << f.get() << endl;

    return 0;
}






class packaged_task<_Ret(_ArgTypes...)> //<int(consnt A&)>
{ 

public:
    /// traits 出返回值类型, 即int
    using _Ptype              = typename _P_arg_type<_Ret>::type;   
    using _MyPromiseType      = _Promise<_Ptype>;

    using _MyStateManagerType = _State_manager<_Ptype>;

    // 引用callable, 即 <int(const A&)>
    using _MyStateType        = _Packaged_state<_Ret(_ArgTypes...)>;




    /** 
        构造函数
            上面的main将 callable对象传递到这里, 由当前类的成员变量 _MyPromise来负责创建packaged_state来引用用户的函数

            _MyPromise内部有一个 _State_manager<int(const A&)>, 这个成员变量内部有一个_Associated_state<int(const A&)>指针
                    实际这个指针指向的是packaged_state<int(const A&)>*

            _Associated_state就是前面详细说过的类, 它负责 存储 用户的func的返回值
    */
    template <class _Fty2, enable_if_t<!is_same_v<_Remove_cvref_t<_Fty2>, packaged_task>, int> = 0>
    explicit packaged_task(_Fty2&& _Fnarg) : _MyPromise(new _MyStateType(_STD forward<_Fty2>(_Fnarg))) {}


    _NODISCARD future<_Ret> get_future() {
        return future<_Ret>(_MyPromise._Get_state_for_future(), _Nil());
    }


    /** 
        重载的 (), 调用形式和引用的 callable(即上面的 int func(const A&))的调用形式是一样的
    */
    void operator()(_ArgTypes... _Args) {


        if (_MyPromise._Is_ready()) {
            _Throw_future_error(make_error_code(future_errc::promise_already_satisfied));
        }


        
        _MyStateManagerType& _State = _MyPromise._Get_state_for_set();

        // _Ptr具体的类型是_Packaged_state<int(const A&)>*, 这个类就很清楚了, 在async中已经探讨过了
        ////// 它的作用是 调用用户的 func函数, 并继续调用父类(_Associated_state)的 _Set_value(), 将值存储起来
        ////// 存储完毕后, _Associated_state中有一个标记是_Ready, 会设置为true, 所以如果再次通过当前类的实例调用重载的(), 就会异常
        ///////////////////_MyPromise._Is_ready()访问到 _Ready的true, 就直接异常了
        _MyStateType* _Ptr          = static_cast<_MyStateType*>(_State._Ptr());

        // 会调用到_Packaged_state的 _Call_immediate函数, 继而调用 用户的func, 进而存储返回值
        _Ptr->_Call_immediate(_STD forward<_ArgTypes>(_Args)...);
    }

    /** 
        从这里可以看出, 可以直接用 当前类的实例对象调用 重载的(), 可以间接调用到用户的函数
            1. 只能调用一次
            2. 在当前线程中调用

        
        总结一下:
            前面的 async异步任务对应了 _Task_async_state(继承 _packaged_state)
            这里的 packaged_task 对应了 _packaged_state

            而 _packaged_state 继承了 _Associated_state

            所以可以将 _packaged_task 传递给用户的线程(async是os内核的线程), 而
    */

    ...

private:
    _MyPromiseType _MyPromise;
};






/** 
    packged_task传递给用户的子线程
*/
int main() {
    printf("main thread:%d \n", this_thread::get_id());

    packaged_task<int(A&&)> callable(func);
    printf("main callable:%p \n",&callable);

    A a;
    printf("main: %p\n", &a);
    
    /// 线程引用了 callable, 会生成 callable(A&&) 的调用格式, 而callable重载了()
    ///// 内部通过 packaged_task调用用户的函数, 并且将值存储起来
    thread t(std::move(callable), a);
    
    //由于是用户的线程, 所以不像async中内核线程那样, 这里必须手动调用join(), async中的线程不用管
    t.join();

    //代码运行到这里error
    auto f = callable.get_future();
    cout << f.get() << endl;
    
    return 0;



    /** 
        解释一下为什么会error
            在创建线程对象t时, 传递的参数是 std::move(callable)
            前面在说thread的ctor时, 说过thread的ctor会对所有的参数做copy
            而且thread在拷贝时, 传递的是右值, 所以 callable会被右值拷贝

            在 packaged_task中, 有一个右值拷贝ctor
            将内部的promise对象移动到了新的对象中, 所以在运行时:
                main函数中的callable内部的promise已经空了, 所以报的是野指针
            
            解决方案:
                1. callable.get_future()的调用写在 thread t(...) 前面
                    但这样不好, 不推荐

                2. 不要传递callable本身到线程对象中, 用 std::ref
                    thread t(std::ref(callable))

    */
}




/** 

    思考一个问题:
        当子线程在运行过程中时, 调用future的get()时, 会是什么情况?
*/
int main() {
    printf("main thread:%d \n", this_thread::get_id());

    packaged_task<int(A&&)> callable(func);
    printf("main callable:%p \n",&callable);

    A a;
    printf("main: %p\n", &a);
    
    thread t(std::ref(callable), a);

    auto f = callable.get_future();
    cout << f.get() << endl;    /// 此时子线程正在sleep, 这里调用get()没有问题

    t.join();
    
    return 0;
}

/** 
    上面的这段测试并没有问题

    当调用 f.get()获取函数的返回值时, 因为_packaged_state还没执行完子线程
    所以 值还没有存储到_packaged_state中(实际是它的父类_Associated_state)执行的存储

    f.get()会调用到_Associated_state的_Get_value中, 发现值没有存储, 于是就wait()
    当子线程执行完毕, 返回到_packaged_state时, 它在调用父类(_Associated_state)的set_value
    set_value会上锁存储, 存储完毕后, 会notify_all, 所以此时f.get()会收到通知, 醒过来取值, 最后返回
*/


promise

概念

  • 内部 <font color=red>存储一个值(==set_value==)</font>, 这个值是给 <font color=red>future(==get_future==)</font>使用
    • future的get()会阻塞住当前线程, 等待 <font color=red>promise的set_value()</font>


使用和探究

void print_int (future<int>& fut) {
    cout << "thread enter\n";

    // 会阻塞当前线程, 等待唤醒
    //// 因为此fut是main中的promise关联的future, 所以会等待promise的set_value
    int x = fut.get();
    cout << "value: " << x << '\n';
}

int main ()
{
  // 定义一个 能存储 int的promise
  promise<int> prom;            


  // 通过 promise对象关联一个 future<int>
  future<int> fut = prom.get_future();  

  // 将关联的future发送给别的线程
  thread th1 (print_int, ref(fut)); 

  // promise的set_value会, 并将值存储起来,
  ////// 并唤醒关联的future对象的get()方法
  prom.set_value (10);
                     
  th1.join();

  return 0;
}


/** 
    猜想:
        类比前面的 async_task_state 以及 packaged_state
        所以这里的promise也会有一个 xxx_state
        并且 xxx_state 继承 _Associated_state

    原理:
        promise的无参构造函数会在内部创建一个指针_State(类型是_Associated_state<int>*)

        promise的 get_future会将 _State 传递给 future的构造函数
        所以关联的future内部会引用 _State

        当 future.get() 时, 会调用到 _Associated_state<int>*->_Get_value
        这个函数上面已经提到过很多次了, 内部发现
            _Ready = false(还没有获取到用户函数的运回值)
        就wait

        当调用 promise的set_value()时, 会调用到_Associated_state<int>*->
        set_value()
        这个函数已经在前面提过多次了, 所以会上锁将值存储起来
        然后设置_Ready=true, 并notify_all()
            所以 _Associated_state<int*> -> _Get_value()会被唤醒
            所以future.get()会被唤醒获取到返回值
*/


promise的注意

  • 不可以在同一个线程中 调用 future.get(), 因为会一直阻塞,没有别的线程会唤醒
void print_int(future<int>& fut) {
    cout << "enter...\n";
    int x = fut.get();
    cout << "value: " << x << '\n';
}

int main()
{
    promise<int> prom;  

    future<int> fut = prom.get_future();    

    // 在当前线程中调用 print_int()
    ///// fut会调用get(), 从而等待其他线程唤醒, 但prom的set_value没有在其他线程中调用
    /// 解决方案: set先调用, 但体现不出promise的作用, 所以promise用在多线程中比较适合
    print_int(fut);

    cout << "main sleep\n";

    this_thread::sleep_for(chrono::milliseconds(2333));

    prom.set_value(10);                      
                                            
    return 0;
}


std::future

继承体系

  • 至于为什么到这里才来说 <font color=red>future</font>,明明前面的章节中已经使用多次了?


  • 放到这里是因为, 前面的重点是 <font color=red>async, packaged_task, promise</font>的使用, 也暗示了 <font color=red>future</font>==不要自己创建==, 由对应的==async, packaged_task,promise==来返回


  • future继承自_State_Manager, 而_State_manager的作用是 <font color=red>管理 XXX_state类型的指针的</font>
    • XXX_state就是前面的:
      • _Task_async_state对应==async==
      • _Packaged_state对应==packaged_task==
      • _Associated_state对应==promise==

      ps: 这3个从上到下是依次继承的关系, 注意 <font color=red>以==下划线开头的类都是STL内部辅助类,不对外提供接口调用==</font>


  • future内部会有一个 _Associated_state* 的指针, 前面讲过多次:
    • _Associated_state 用来 ==set_value==, 存储用户函数的返回值
    • _Packaged_state 用来==调用用户的函数==, 所以==对应的packaged_task==重载了()来<font color=red>间接调用用户的函数</font>调用完后, 由_Associated_state存储值
    • 至于_Task_async_state是用异步的方式来调用用户的函数, 只不过线程是调用内核, 不是用户自己创建的
      • 所以==可以将packaged_task包装到用户的线程中==(<font color=red>上面有例子</font>)


  • future的目的是获取==用户任务的返回值==, 但本质上是通过内部的指针(==XXX_state==)来操作的,因为可能是异步的方式执行用户的任务,所以==XXX_state==内部有自己的==锁机制==
    • 所以future会有 <font color=red>condition_variable相关的调用接口</font>
      • wait
      • wait_for
      • wait_until

      下面开始介绍它的成员方法

成员方法探究

/** 
    future是一个类模板, 它有2个特化版本的类模板
*/
template <class R&> future<R&>;     // specialization : T is a reference type (R&)
template <>         future<void>;   // specialization : T is void 

/** 
    构造方法
*/
/// 无参的ctor, 会调用到父类_State_manager, 内部的指针将是nullptr
future() noexcept {}        

// 移动ctor, 会调用到父类, 从而将内部的XXX_state指针指向this, other将会是空
future(future&& _Other) noexcept : _Mybase(_STD move(_Other), true) {}

/// 拷贝ctor
future(const _Mybase& _State, _Nil) : _Mybase(_State, true) {}


/**
    future本身没有成员变量, 所以对应实例的大小是父类_State_manager的大小
    也就是说, 通过 async, promise, packaged_task的get_future()实际上真正
    的类型是 _State_manager<retType>* 
*/









/** 
    成员方法 get()
*/
generic template (1)            T get();
reference specialization (2)    R& future<R&>::get();      
void specialization (3)         void future<void>::get();
/** 
    这里说一下get()的处理
        首先future的get源码是:
            Type& get() {
                /// 构造一个临时的 future对象, 会调用到移动构造
                //// 之后this内部的 xxx_state就为空了
                future _Local{_STD move(*this)};

                // 获取值, _Get_value后续会通过 xxx_state*->_Get_value()去拿值
                return *_Local._Get_value();
            }
        
        很明显, 这里用了c++11的移动语义, 结合前面的理论, 所以:
            会将内部的XXX_state* 赋值给 _Local

        什么意思呢?
            假设现在有一个promise对象, 则:  
                auto f = pormise.get_future();      _code_a
                auto value = f.get();               _code_b
                value = f.get();                    _code_c

                code_a时, f内部的指针 ptr = xxx_state*
                code_b时, f.get()会在内部将 f的ptr赋值给 _Local的ptr, 然后f的ptr就为空了
                code_c时, f.get()由于内部的ptr的空, 所以后续报的错误会是野指针

            ps: 在前面说过, future连续调用get(), 会出错, 原因就是这里的野指针
*/










/** 
    阻塞等待 xxx_state* 调用 set_value后notify_all
    它内部会通过 xxx_state* 调用到xxx_state的mutex对象的wait()

    和get()相同的是, 它会等待任务完成
    但不同的是, 它没有返回值, 并且可以多次wait, 但get()只能1次
*/
wait()


/** 
    底层会调用到 xxx_state*的condition_variable的wait_for()

    future.wait_for(time)会立即返回,返回值是 枚举类型 std::future_status
        timeout:
            在time时间内没有返回, 视为超时

        ready:
            拿到了值, 后续调用get()不会等待, 但也只能调用1次

        deferred:
            这个值用的场景是 async()的第1个参数是launch::deferred时
            这个时候, async返回的future内部的指针是_Deferred_async_state*
                _Deferred_async_state是_Packaged_state的子类
                并且async并不会开线程
            
            当:
                auto f = async(..).get_future();
            时, 有这几种调用会触发用户的函数被调用:
                1. f.get()

                2. f.wait() 没有返回值
                
            当:
                f.wait_for(xxx)
            返回deferred时, 表示没有调用过get(), 所以可以手动get()获取值

*/
wait_for(time)



/** 
    阻塞到date这个日期到来时

    返回值和返回值的意义和wait_for一样
*/
wait_until(date)



/** 
    返回值是bool
        true: 可以调用get获取返回值 
        false: 不可以再调用get
*/
valid()


/** 
    将自己的内部的指针(xxx_state*)移动到 临时的shared_future<>中
    自己将不能再用, shared_future在后面的小节中
*/
_NODISCARD shared_future<_Ty&> share() noexcept {
        return shared_future<_Ty&>(_STD move(*this));
}


future的一个问题

  • future的get()只能用一次, 但是在开发中, 可能future被当作参数传递给其他函数, 其他函数可能都会调用get(), 这个时候就会出问题
void print_a(std::future<int>& fut) {
    cout << "enter print_a ...\n";
    printf("print_a fun: %p\n", &fut);
    int x = fut.get();
    std::cout << "print_a value: " << x << '\n';
}

void print_b(future<int>& fut) {
    cout << "enter print_b ...\n";
    printf("print_b fun: %p\n", &fut);
    this_thread::sleep_for(chrono::milliseconds(4000));

    /// 异常
    int x = fut.get();
    std::cout << "print_b value: " << x << '\n';
}

int main(){
    std::promise<int> prom;                      
    std::future<int> fut = prom.get_future();    
    printf("main fun: %p\n", &fut);
    thread t_a(print_a, std::ref(fut));
    thread t_b(print_b, std::ref(fut));
    
    this_thread::sleep_for(chrono::milliseconds(2222));
    prom.set_value(22);

    t_a.join();
    t_b.join();
    return 0;
}

/** 
main fun: 0055F96C
enter print_a ...
print_a fun: 0055F96C
enter print_b ...
print_b fun: 0055F96C
print_a value: 22

    上面的测试程序从侧面说明了future的不足(只能一次get)
    STL提供了 shared_future
    
*/


shared_future

继承

  • 它并不继承 <font color=red>future</font>, 而是和==future==一样, 继承_State_manager


  • 它的作用和future一样, 但可以多次get
/** 
    一般通过一个 future&&(右值) 来构造 shared_future
        这个构造函数会将 原future中的指针(xxx_state*)移动到shared_future中
        所以调用完后, 原future不能再使用

    _Mybase就是shared_future的类型, 父类本来的构造函数有2个参数
    但这里只传递了第1个, 第2个参数默认是false
        false的意义在前面也说过, 表示 _get_once
        也就是说, 可以 xxx_state*->_Get_value多次

        而future则只能获取1次:
            一方面是future的get()函数的移动语义机制(上面讲过)
            另一方面就是 future构造时, _get_once的true(只能 _Get_value一次)
*/
shared_future(future<_Ty>&& _Other) noexcept : _Mybase(_STD forward<_Mybase>(_Other)) {}


/** 
    不同于future的get, 这里的get直接去拿值, 可以拿任意次
    但获取的值是 const, 不能改
*/
const _Ty& get() const {
    return this->_Get_value();
}











/** 
    使用shared_future修改上面future程序的问题
*/

void print_a(shared_future<int>& fut) {
    cout << "enter print_a ...\n";

    printf("print_a fun: %p\n", &fut);

    int x = fut.get();
    cout << "print_a value: " << x << '\n';
}

void print_b(shared_future<int>& fut) {
    cout << "enter print_b ...\n";
    printf("print_b fun: %p\n", &fut);

    this_thread::sleep_for(chrono::milliseconds(4000));

    int x = fut.get();
    cout << "print_b value: " << x << '\n';
}

int main() {
    promise<int> prom;
    future<int> fut = prom.get_future();

    printf("main fun: %p\n", &fut);

    /// 必须是右值, fut中的指针会移动到fut_s中, 后续不能再用fut
    shared_future<int> fut_s(std::move(fut));

    thread t_a(print_a, std::ref(fut_s));
    thread t_b(print_b, std::ref(fut_s));

    this_thread::sleep_for(chrono::milliseconds(2222));
    prom.set_value(22);

    t_a.join();
    t_b.join();
    return 0;
}

atomic引导

一点汇编知识

  • 这里只给出几条汇编指令(==后面讲解用==), 使用的是 <font color=red>AT&T(32位)</font>
/// 以下的操作大小都是 4个字节(.long)

movl $2, %eax   // 移动字面量2到寄存器eax中
movl $2, b      // 将字面量2移动到变量b中
movl %eax, b    // 将寄存器eax中的值移动到变量b中

incl %eax       //寄存器eax加1
incl b          //对变量b加1

add $5, b       //变量b加5
add b, %eax     //将变量b的值加到eax中

movl $b, %eax   //将变量b的内存地址copy到寄存器eax中
movl (%eax), b  //将eax中的值当作内存, 然后将对应内存的值加到变量b中
incl (b)        // 将b的值当作内存, 找到内存后, 在这个位置+1
add $4, (b)     // 将字面量4加到 b指向的内存(b中的值是内存地址)


引导

int g_a, g_b = 4;

void test(){
    g_a = 2;
    g_b = a;
    
    assert(2 == g_b);  //当g_b!=2的时候会 异常
}

/** 
    如果我们自己来生成汇编, 则大多数人会编译成下面:
*/
movl $2, (g_a)          _code_1
movl (g_a), %eax        _code_2
movl %eax, (g_b)        _code_3
... call assert
/** 
    需要3条指令, 那有人就会问:
        为什么不直接将 _code_2改成
            movl (g_a), (g_b)

        原因是mov指令中的操作数必须有一个是寄存器或字面量, 所以必须有3条指令
*/



/**********************************************************************/
/**********************************************************************/
/**********************************************************************/


/**
    另一种编译
*/
movl $2, %eax
movl %eax, (g_b)
movl %eax, (g_a)

... call assert
/** 
    这段汇编和源码中的逻辑顺序虽然不一样, 但结果是一样的
*/



/**********************************************************************/
/**********************************************************************/
/**********************************************************************/


/** 
    需求改变
    
    初始值:
        g_a = 2;
        g_b = 4;
*/
void test_a(){
    g_b = 0;
    g_a = 0;
}

void test_b(){
    while(g_a)
        continue;
    assert(0 == g_b)
}

/** 
    汇编
*/
_test_a:
    movl $0, (g_b)        _code_1
    movl $0, (g_a)        _code_2

_test_b:
_loop:
    cmpl $0, (g_a)
    jne _loop               _code_3  //如果g_a不为0, 就循环

    call assert             _code_4

/** 
    这里的意思是:
        当g_a为0时才断言 assert, 愿意是通过 g_a来监听g_b被赋值为0



    场景:
        有2个线程, t_a, t_b
        t_a执行test_a
        t_b执行test_b


    先来说一下结论
        如果是单核cpu, 没有问题
        如果是多核cpu, core_a执行t_a, core_b执行t_b, 可能就有问题

    我们代码的逻辑:
        从代码流程上看, t_b中assert的执行必然是g_a已经为0了
        而g_a为0, 则一定是t_a已经执行了 g_a=0的操作

    在讲解问题前先简单介绍一下关于现代多核cpu体系的内容
        明白这些理论是说明上述程序存在问题的关键, 也是后续深刻理解atomic的前提
*/


多核体系结构

为什么要有cache

现代的cpu, 访问内存时已经不再是直接访问, 原因是cpu本身执行的速度 远远大于 内存io速度
造成的现象是 cpu秒速完成任务, 却要无限等待总线返回

举例:
    whlie(1000000次的循环)
        a = a + 1;
    
    正常的逻辑是:
            movl $1000000 ecx   ;循环次数
        _Loop:
            movl (a), %eax      ;将a的值copy到eax中, 假设a是int             _code_a
            addl $1, %eax       ; 对eax+1                                   _code_b
            movl %eax, (a)      ; 加完后copy到a地址                         _code_c
            loop _Lopp          ; ecx = ecx - 1; if(ecx != 0) goto _Loop;   _code_d
        
        大量访问内存(_code_a, _code_c)消耗时间, 但cpu内的操作(_code_b, code_d)相比前面的可以忽略

    
    那可以做编译优化
            movl (a), %eax      ;将a的值copy到eax中, 假设a是int             _code_a
            movl $1000000 ecx   ;循环次数
        _Loop:
            addl $1, %eax       ; 对eax+1                                   _code_b
            loop _Lopp          ; ecx = ecx - 1; if(ecx != 0) goto _Loop;   _code_d
            movl %eax, (a)      ; 加完后copy到a地址                         _code_c
        

    其中只访问内存2次(_code_a, code_c, 注意和上面汇编的位置是不同的)
    寄存器中做1000000的加法, 大大节约时间


优化后还存在的问题:
    cpu不可能一直独占eax,因为这个core可能执行多个线程, 所以eax有上下文切换
    
解决
    基于各种原因, cpu出现了缓存(cache)
        将内存a地址的data放到缓存中, 和缓存进行交互, 虽然没有register快, 但也差不了多少
        目的是减少core对memory的访问次数



cache结构

  • 这里没有用图来阐述硬件结构, 原因是本人觉得, 如果能用文字来描述清楚, 则理解的更为深入, 当然前提是有耐心认真看, 在脑海中有逐渐构建自己的蓝图
定位:
    介于 register和memory之间

core访问速度:
    远大于访问memory的速度
    略小于访问regiter的速度

大小:
    远小于 memory
    大于   register



目前cache的架构
    当前架构的已经发展到了L1,L2,L3 ...
    
    访问速度:
        L1 > L2 > L3

    大小:
        L1 < L2 < L3
  

多核core的cache(这里以 4核)
    core0 有自己的 L1   c_0_L_1
    core1 有自己的 L1   c_1_L_1

    core0, cor1 共用一个L2  c_01_L_2

    core2 有自己的 L1   c_2_L_1
    core3 有自己的 L1   c_3_L_1
    
    core3, cor4 共用一个L2  c_23_L_2

    core0, core1, core2, core3 共用L3

    L3连接到memory


也就是说, core0和core1是一组, core2, core3是一组, 所有的core之间是连通的, 可以互相通信


多core访问内存的过程

    当core0需要访问一个0x44(内存地址)时:
        1. 先到c_0_L_1中查找有没有0x44
            1.1 有就copy到c_0_L_1中, 然后直接返回到 core0

            1.2 没有, 就到第2步

        2. 到c_01_L_2中找
            2.1 有就copy到c_0_L_1中, 然后直接返回到 core0
            2.2 没有就到第3步

        3. 到c_1_L_1中找 
            3.1 有就copy到c_0_L_1中, 然后直接返回到 core0
            3.2 没有就到第4步

        4. 接着到 core_2, core_3重复同样的步骤

        5. 如果上面4步没有找到, 则core0会向总线请示, 到memory的0x44中
            将0x44和 0x44后面数据 copy到 c_0_L_1这个cache中


细节:
    1. core在copy时, 也就是在缓存中找到0x44地址, 不是随便就copy, 面是看当前这个地址的状态
        这涉及到一致性协议(后面说)

    2. 从内存读取时, 并不是只copy一个字节, 而是copy一段内存
        这涉及到cacheline
        同时, 也有cache的一致性机制


cacheline

    cache大致长的是这样:
        cache = N_g * group
        group = N_c * cacheline
        cacheline = N * Byte
    即: 有N_g个组, 每个组有N_c个cacheline, 每个cacheline有N个字节

    至于memory的中地址是怎么映射到cache中,这里就不谈了
    需要说明的是, 每次从memory中copy的时候是以cacheline的大小为单位
    一般cacheline的大小是2的幂,常见的是64字节


一致性

因为memory中的同一个地址会存在不同的 L1中, 所以必须有同步数据的机制
    毕竟从理论上内存的数据应该是正确的, 因为它是数据的来源和目的地

但现实是 core会优先访问cache, rw也会在cache并不时时同步到memory, 所以core之间有相同内存的data时
读取时要保持数据的有效性


操作
    local read(LR):读本地cache中的数据;
  local write(LW):将数据写到本地cache;
  remote read(RR):读取内存中的数据;
  remote write(RW):将数据写通到主存;



失效(Invalid)cacheline
    要么已经不在缓存中,要么它的内容已经过时。
    为了达到缓存的目的,这种状态的段将会被忽略。一旦缓存段被标记为失效,那效果就等同于它从来没被加载到缓存中。

共享(Shared)cacheline
    它是和主内存内容保持一致的一份拷贝,在这种状态下的缓存段只能被读取,不能被写入。
    多组缓存可以同时拥有针对同一内存地址的共享缓存段,这就是名称的由来。

独占(Exclusive)cacheline
    和S状态一样,也是和主内存内容保持一致的一份拷贝
    区别在于,如果一个处理器持有了某个E状态的缓存段,那其他处理器就不能同时持有它,所以叫“独占”
    这意味着,如果其他处理器原本也持有同一缓存段,那么它会马上变成“失效”状态。

已修改(Modified)cacheline
    属于脏段,它们已经被所属的处理器修改了。
    如果一个段处于已修改状态,那么它在其他处理器缓存中的拷贝马上会变成失效状态,这个规律和E状态一样。
    此外,已修改缓存段如果被丢弃或标记为失效,那么先要把它的内容回写到内存中——这和回写模式下常规的脏段处理方式一样上面的状态也是缓存一致性协议, 

ps:
    这里只需要理解到多core的cache一致性在逻辑上 相当于多个core在互斥读写 同一块memory
    至于更细节的,可以看:https://kb.cnblogs.com/page/504824/

需要说明的是, 数据的一致性都是cpu自动完成的, 属于cpu架构方面, 与编程无关
也就是说 cashline 和 后面的 atomic没有必然关系
对于一个变量, 它的值在 cache中的过程是一致的
之所以多线程操作中共享变量的值表现的不一致是软件层面的, 原因是 共享变量的读写操作
在汇编层面是分步的, cpu调度可能发生在 分步的中间(造成整个操作没有一下子完成, 从而导致其他线程访问出现不加预料的结果) 


指令重排

  • 了解这个, 是深入理解atomic原理的前提
举一个直白的例子:(只有举例可能有这种情况)
    有2个顺序操作:
        operator 1
        operator 2

        ...
    
    流程:   
        cpu可能先执行operator 2, 再执行operator 1
    
    原因可能有:
        1. 其中operator2操作可以直接从cache中抓取到
            cpu讲究效率, 所以先执行operator 2, 再空闲时回过头执行operator 1
        
        2. 编译器在编译时做了优化, operator 2 的操作不依赖 operator 1
            同时编译器认为operator 1更花时间, 所以生成汇编的时候, operator 2到前面了
    

ps: cpu本身有自己的优化机制, 编译器在编译的时候也有自己的优化
    编译器指令重排的优化是分析源码, 并不会在改变当前线程中的执行结果, 其他线程就可能会受到影响


好了重排就到这里, 只要明白cpu在执行时, 顺序可能和源码不一致


解释上面 引导 中问题的可能原因:
    core_b先执行t_b线程
        1. 访问了g_a变量, g_a被cache
        
        这个时候 core_a执行t_a线程, 比core_b快, 毕竟是多核, 这里只有一种情况

    core_a执行t_a
        1. 执行 g_b = 0, 但先寻址g_b时并不在cache中

        2. 于是先去执行 g_a = 0
            因为g_a在cache中, 所以cpu为了效率可能就乱序执行了, 这个是cpu动态乱序

        3. 就在这时, core_b发现g_a为0, 跳过while, 执行assert
                g_b从内存读到4, 直接异常
    



防止某些指令重排
    cpu有提供指令, 控制重排, 这也是后面atomic中的内存模型底层原理



总线锁定和cashline锁
    当多核访问同一块内存时, 为了保证数据的一致性
    有2种方案
        1. 总线上锁
            此时, 只有发出寻址的core能访问, 其他的core等待释放, 它们不能访问任何内存
            这样效率太低, 相当于锁了所有的内存

        2. cacheline锁
            后来cpu的优化, 效果是只对共享内存上锁
            如果要操作的数据没有在cacheline中, 则会调用总线锁

    cpu中有一部分指令在形式上是1句, 但执行的时候是分步的, 所以在多core的情况下,或多线程时
    操作就个被分开, 所以cpu提供了对这种操作锁定的汇编指令, 保证整个操作在执行的过程中不会被中断


atomic

原子操作

  • 通俗的说就是 <font color=red>独占操作</font>, 以最常见的后++为例
int i = 0;
i++;

/** 
    这种写法天天在写, 但它实际是有2步流程
*/
movl (i), %ebx          // int tmp = i
incl (i)                //i++, 本人也忘记了incl是否可以直接操作内存, 暂时就这样


/** 
    一个常见的线程安全问题
*/
int count = 0;
void test(){
    for (int i = -1; ++i < 100;)
        count++;
}
int main(int arg, char** args){
    thread t_a(test);
    thread t_b(test);

    t_a.join();
    t_b.join();

    cout << cout << endl;
    return 0;
}
/** 
    这里有2个线程同时对 count进行后++,
    至于它的结果为什么有时候不是 200, 就不说了

    本质原因是 count++两步的操作可能会被打断

     
     如果count++的2步能被cpu锁定, 执行的时候要么++成功返回
     要么失败重新加,++的过程不允许中断

     这种操作就是所谓的原子操作

     原子性:
        每一条指令本身是 atomic, 中断不会发生在 指令执行期间
        
     某些指令虽然形式上只有1句, 但实际cpu内部有多步
        cmpxchgl (g_a), %ebx
        g_a与 ax类寄存器(ah,ax,eax,rax)比
            如果相等, 则g_a = ebx
            如果不等, 则 ax类寄存器 = g_a
        这一种指令就不是atomic(因为该指令分多步), 并发或并行时它可能在执行的过程中, 就被中断

      cpu提供的有指令(lock 前缀)来使 cmpxchgl 变为所谓的atomic操作 
      也就是cacheline锁机制(同一块内存访问是互斥的)

      基于这一点, count++也可以被变为atomic操作, 这就是高级语言中
      封装的,如接下来c++中的atomic类模板的特化类
*/


atomic_flag

  • 它是将bool类型的操作封装为原子操作
/** 
    原子性读取操作:
        虽然是read, 但根据前面的 cacheline lock理论: 
            read的值一定一致性的, 也就是read的时候一定是最新的值, 
            并且read的过程不会其他core在write

    以下的操作都是 atomic操作

    bool test_and_set()
        设置新值为true, 并返回旧值
        原子性:
            1. read旧值
            2. write
            3. store
            4. test并jmp
                test相当于if判断返回值,并跳转
            也就是说这4步是不可分割的, 执行时不会中断
                当前core应该独占共享内存, 其他core不能同时在test_and_set或clear

        
    clear();
        设置为false
        原子性:
            1 read
            2 write
            3 store
        这3步是不可分割的, 同理其core不可能同时在 test_and_set()或clear()

    不能:
        1. copy ctor
        2. copy assignment
        3. assignment
*/
void test(){
    // 或 atomic_flag tmp ATOMIC_FLAG_INIT; 
    /// ATOMIC_FLAG_INIT 是一个macro: {}
    atomic_flag tmp{}; 

    void* b = &tmp;

    cout << *(bool*)(b) << endl;            //原始值 false
    cout << tmp.test_and_set() << endl;     //设置为true, 返回原始值false
    cout << *(bool*)(b) << endl;            //打印test后的值(true)

    cout << tmp.test_and_set() << endl;     //再设置true, 但返回是原始值(true)
    cout << *(bool*)(b) << endl;            //打印设置后的值

    tmp.clear();                            //设置false
    cout << *(bool*)(b) << endl;            //打印clear后的值

    cout << tmp.test_and_set() << endl;     //设置true, 返回原始值(false)
    cout << *(bool*)(b) << endl;            //打印设置之后的值(true)
}


运用场景---同步锁

  • 可以利用 atomic的特性, 模拟lock, 但先要说一点的是, <font color=red>memory order要有约束(==后面会说==这里先不管)</font>
#include<atomic>
atomic_flag g_b ATOMIC_FLAG_INIT;
void t_a() {
    while (g_b.test_and_set())      
        this_thread::yield();
_lock_start:
    cout << "thread: " << this_thread::get_id() << " ";
    cout << "共享代码...\n";
    this_thread::sleep_for(chrono::seconds(2));
    
    g_b.clear();
_lock_end:
}
int main()
{
    thread a(t_a);
    thread b(t_a);
    thread c(t_a);
    thread d(t_a);
    
    a.join(), b.join(), c.join(), d.join();
}
/** 

    先不考虑cpu或编译器的指令排序问题

    结果是:
        在 _lock_start 和 _lock_end之间只会有一个线程在运行

    下面来说一下原理:
        g_b.test_and_set()中
            1. read g_b     旧值
            2. g_b = true   设置新值为true
            3. test并jmp    就是whlie条件是否成立
        所以while()相当于上了锁, 当前的core独占了, 其他的core只能等待它做完这3步操作

        流程:
            当g_b为false时, while(g_b.test_and_set()) 这句代码
            只有1个core会判断通过, 通过后g_b已经是true, 其他线程再抢到时再test_and_set()
            执行完后, 返回的是旧值(true)判断是不通过的, 只有已经通过的core再clear, 后面的core才可以再
            互斥抢到再判断

        ps: 这里所谓的锁其实是cpu级别的, 比线程的mutex效率高, 但它们的
            意义是一样的, 就算是线程并发, 那么 test_and_set() 本身还是atomic
            当cache命中率高的时候效率才会高
*/


基于atomic_flag的自旋锁

class spinlock_mutex
{
  std::atomic_flag flag;
public:
  spinlock_mutex():
    flag(ATOMIC_FLAG_INIT)
  {}
  void lock()
  {
    while(flag.test_and_set(std::memory_order_acquire));
  }
  void unlock()
  {
    flag.clear(std::memory_order_release);
  }
};

/** 
    这里就不多做解释了, 其实上面的同步锁就是自旋锁的原理
    test_and_set()和clear中有参数
        这个参数涉及到指令重排, 后面说
*/


atomic整型 ++

  • atomic是一个模板, STL特化了许多版本, 这里就是int为例
/** 
    下面的例子中:
        如果用普通的int, 则最后的值基本上不是30000000

    用 atomic<int> 则对 ++g_c的操作:
        read, write, store
    是不可分割的, 所以整个++的过程是线程安全的(别的线程此时不能操作g_c这一块内存)

*/
atomic<int> g_c;
//int g_c;
void test() {
    for (int i = -1; ++i < 10000000; )
        ++g_c;
}


int main(){
    thread t_a(test);
    thread t_b(test);
    thread t_c(test);
    
    t_a.join();
    t_b.join();
    t_c.join();
    
    cout << g_c << endl;
    return 0;
}






/** 
    不可以用 ++ 来做线程间的同步锁(和atomic_flag的test_and_set不一样)
    因为虽然 ++ 的过程是独占内存的, 但++完后, if的判断不是独占的
*/
atomic<int> g_c{-1};
void t_a() {
    cout << "enter thread: " << this_thread::get_id() << " \n";

    /** 
        ++g_c的过程是atomic, 但while判断的过程是线程竞争的
            所以当有一个线程将++g_c变为0, 但自己可能没有抢到if立即判断
            又被别的线程++, 就导致死循环 
    */
    while (++g_c);
    cout << "thread: " << this_thread::get_id() << " ";
    cout << "共享代码...\n";
    this_thread::sleep_for(chrono::seconds(2));
    g_c = -1;
}
int main(){
    thread a(t_a);
    thread b(t_a);
    thread c(t_a);
    thread d(t_a);

    a.join(), b.join(), c.join(), d.join();

    //编译器会自己加上 return 0
}


atomic整型存取

/** 
    原子操作赋值
        store(value)    写
        type load()     读

    注意, 只是演示, 并不是线程安全的
    store本身的 r-w-s是atomic
*/
void test(int arg) {
    for (int i = -1; ++i < 10000000;)
        g_c.store(g_c.load() + 1);
}


atomic整型交换

/**
    exchange用于 交接数据, 和 atomic_flag的test_and_set一样的过程
    返回旧值, 设置新值
    可以用作同步锁
    
    加10亿次, 结果没有问题
*/
atomic<int> g_c{0};
long count_c = 0;
void t_a() {
    for (long i = 0; i < 500000000;) {
        if (g_c.exchange(1)) {
            this_thread::yield();
            continue;
        }
        ++count_c;
        g_c.exchange(0);
        ++i;
    }
}


int main(){
    thread a(t_a);
    thread b(t_a);

    a.join();
    b.join();

    cout << count_c << endl;
}


atomic比较交换

  • 其实它的底层是==cmpxchg==指令(前面也讲过), 只有要加lock前缀, 它是所有原子操作的基石
    • 它们是bit比较, 不是 <font color=red>重载的对象比较方法</font>


  • 这种新型操作叫做“比较/交换”,它的形式表现为==compare_exchange_weak()==和==compare_exchange_strong()==。

    • “比较/交换”操作是原子类型编程的基石,它比较原子变量的当前值和期望值:
      • 当两值相等时,存储所提供值。
      • 当两值不等,期望值就会被更新为原子变量中的值。
      • 返回bool变量,当返回true时执行存储操作,false则更新期望值。也就是说操作成功是返回true,失败时返回false。

      ps: weak在某些平台上性能更好(==只是某些平台==), 当自定义的atomic类型时, 如果类型"更容易copy(即本身的结构不复杂)", 使用weak

  • 对于compare_exchange_weak(),当原始值与预期值一致时,存储也可能会不成功。

    • 在这种情况中变量的值不会发生改变,并且compare_exchange_weak()的返回值是false。

    这最可能发生==没有cmpxchg指令的cpu中==,当处理器不能保证这个操作能够原子的完成——可能因为线程的操作执行到必要操作的中间时被切换,并且另一个线程将会被操作系统调度(这里线程数多于处理器数量),称为“伪失败”(spurious failure),因为造成这种情况的是时间,而不是变量值。

广义的atomic

  • 编译器并不总是将atomic整型都用cpu级别的锁来实现(==看架构是否支持==), 所以还可能内部使用的mutex机制, 所以STL提供了 <font color=red>is_lock_free()</font>成员函数
    • 返回==true==, 表示使用的cpu级别的锁
    • 返回==false==, 表示==内部用的锁结构==


  • 通常情况下==自定义的atomic==, 则内部是由mutex的机制实现的
struct A{
    char a,b,c,d;
};  

structe B{
    char a[8];
}

struct C{
    int a,b;
}

// 如果 atomic<int>.is_lock_free是true, 则下面的模板类会是cpu锁
// 基本大小 == int 或 int的2倍, 但如果想在运行时确定模板类是不是无锁(不是mutex实现的), 则调用 is_always_lock_free()
atomic<A> a;  
atomic<B> b;    
atomic<C> c;


内存模型

  • 这一部分是针对 <font color=red>指令重排</font>的, 可以在汇编层面通过cpu的指令(==内存屏障==)来防止某些指令重排, 所以c++在高级语言层面也提供了对应的语法
/** 
    回顾一下场景
*/
int b = 0;
void test(){
    cout >> b;      //_code_b
    int c = 2;      //_code_c
    int d = c + 3;  //_code_d

    ..
}
/** 
    编译器可能优化为:
        int d = 5;
        cout >> b;

     删除了 int c = 2
     d直接赋值为常量5
     b的cout操作在最后执行
        因为这是一个耗时操作, 而且 d 的操作不依赖b, 速度快, 所以
        编译器重排了指令

     至于cpu内部的动态重排这里就不探讨了

     单线程中没有问题, 问题是多线程
*/



/*********************************************************/
int b = 1;
int d = 1;
void f_a(){
    for(int i = -1; ++i < 3000;)
        b = 0;      //_code_a

    d = 0;          //_code_b
}

void f_b(){
    while(d);       //_code_c

    assert(b == 0); //_code_d
}
int main(int arg, char** args){
    thread t_a(f_a);
    thread t_b(f_b);


    t_a.join();
    t_b.join();
}

/** 
    这里先假设 _code_a, _code_b, code_c, _code_d 操作是atomic的
    

    上面的程序可能会error在 _code_d

    因为 编译器可能会对t_a的代码重排
        d = 0;
        for(..) ...

    所以 t_a线程被cpu先执行 d = 0后, 可能刚好被挂起(此时b为1), t_b执行跳过while, assert(1==0) 直接error 

    解决方案是:
        在源码d = 0这一行加上内存屏障, 保证 for(...).. 的操作在 _code_b前完成

        这样虽然是多线程, 但逻辑上的顺序是没有问题的, 在assert前,b一定为0
    

    c++在语言上提供了参数, 来控制指令的重排, 但这些参数是配合atomic来使用的

    ps:这里不会说一大堆的专业术语, 什么 happen before, 什么 synchronize with, 个人感觉没什么用
        只要知道这些参数是怎么指定 指令的顺序
        因为你一定能在代码层面上知道你的代码运行的流程, 接下来就是防止编译器或cpu来破坏你的代码顺序
*/









/** 
    所有模型的一个测试程序, 网上都用这个, 那就是这个来说明
*/
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x(){
    x.store(true);          _code_a
}
void write_y(){
    y.store(true);          _code_b
}
void read_x_then_y(){
    while (!x.load());      _code_c

    if (y.load()) ++z;      _code_d
}

void read_y_then_x(){
    while (!y.load();       _code_f

    if (x.load()) ++z;      _code_g
}

int main(int arg, char** args){
    x = false;
    y = false;
    z = 0;
    std::thread a(write_x);
    std::thread b(write_y);
    std::thread c(read_x_then_y);
    std::thread d(read_y_then_x);
    a.join();
    b.join();
    c.join();
    d.join();
    assert(z.load() != 0);      _code_h
}




/** 
    上面的程序依照代码的顺序, 即使是并行或并发也不会在 _code_h的地方 异常
    原因就不说了事实上也不会有异常(默认是顺序一致性)

    
        但根据前面的指令重排, _code_f, code_g这2个操作可能被编译器调换了位置, 导致执行的时候顺序
        和代码逻辑顺序不一样, 最终可能产生 z在整个进程中没有被++的情况

        这里就不分析因为指令重排可能产生的结果了, 你只要明白, 代码在逻辑上是没有问题的
        出问题的原因是 指令位置被改变了
*/



/** 
    以下是个人理解, 因为所有的资料都没说清楚
    这里是参考网上,cpprerence, 以及c++并发的作者相关资料总结出的
    个人理解, 也只能这样解释
*/







/** 
    memory_order_seq_cst
        1. 严格顺序

    上面的程序 _code_h不会error

    因为 laod()有一个默认参数值是memory_order_seq_cst
         store()第2个默认参数值是memory_order_seq_cst

    memory_order_seq_cst的意义是:
        宏观上是代码怎么走, 指令怎么走, 所以代码逻辑是对的, 则执行就没有问题

        从微观上是
            它前面的所有指令(这里可以理解为代码), 不会排到它后面
            它后面的所有指令, 不会排到它前面
            至于它前面的指令怎么排, 它不管
            至于它后面的指令怎么排, 它不管
                个是理解, 网络上各说各的
            
            如一个代码的顺序:
                _code_a
                _code_b
                _code_c (memory_order_seq_cst)
                _code_d
                _code_e
                
            则 _code_a, _code_b不会排到 _code_c后面, _code_d,_code_e不会排到_code_c前面
            至于 _code_a和_code_b怎么排,_code_c不管
                _code_d, code_e怎么排序, _code_c不管

            所以上面的 _code_c不会在_code_d后面, _code_f不会在_code_g后面
        
        ps: 暂时就这么总结
            
*/












/** 
   memory_order_release 和 memory_order_acquire

    memory_order_release修饰write(即 store())
    memory_order_acquire修饰read (即 load())

    ps:如果对一个读(load())操作施加 release, 则没有同步效果
           对一个写(write())施加 acquire, 则没有同步效果


    memory_order_release表示:
        它前面的指令(代码), 不能排到它后面
        至于 它后面的指令能不能排到它前面, 它不管
        至于 它前面的指令怎么排, 它不管
   

    memory_order_acquire表示:
        它后面的指令(代码), 不能排到它前面
        至于 它前面的指令能不能排到它后面, 它不管
        至于 它后面的指令怎么排, 它不管
   
    这2个组合可以做同步锁
*/
list<int> g_msg;
atomic<bool> g_f{ false };
void t_a() {
    for (; ;) {
        if (!g_f.load(memory_order_acquire)) {      //_code_a
            this_thread::yield();
            continue;
        }       
        auto tmp = g_msg.front();
        cout << tmp << endl;
        g_msg.pop_front();
        g_f.store(false, memory_order_release);     //_code_b
    }
}


void t_b() {
    for (int i = 1; i < 10000001;)
    {
        if (g_f.load(memory_order_acquire))         //_code_c
            continue;
        g_msg.push_back(i++);
        g_f.store(true, memory_order_release);      //_code_d
    }
}


int main(){
    thread a(t_a);
    thread b(t_b);

    a.join();
    b.join();
}
/** 
    上面的
        _code_a是 acquire, 表示它后面的代码 不能重排到 _code_a前面 
        _code_b是 release, 表示它前面的代码 不能重排到 _code_b后面
        所以相当于 _code_a 和 _code_b之间的代码上了一层锁(当多个线程访问t_a函数时)
            ps: 这里是对一个内存做atomic, 所以 load并且if的操作是一个整体, 操作时不会对这块内存store
                同理store时, 不可能同时load
                t_a, t_b2个线程会因为 if被互斥, 只要有一个过了if, 另1个会被阻塞在自己的if处(等待store后再抢)



     将上面的例子改成 acquire-release
*/
include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;
void write_x()
{
  x.store(true,std::memory_order_release);
}
void write_y()
{
  y.store(true,std::memory_order_release);
}
void read_x_then_y()
{
  while(!x.load(std::memory_order_acquire));
  if(y.load(std::memory_order_acquire))  // 1
    ++z;
}
void read_y_then_x()
{
  while(!y.load(std::memory_order_acquire));
  if(x.load(std::memory_order_acquire))  // 2
    ++z;
}
int main()
{
  x=false;
  y=false;
  z=0;
  std::thread a(write_x);
  std::thread b(write_y);
  std::thread c(read_x_then_y);
  std::thread d(read_y_then_x);
  a.join();
  b.join();
  c.join();
  d.join();
  assert(z.load()!=0); // 3
}
/** 
    写c++并发的 大神说z可能最后为0, 但我始终没搞明白为什么z可能会是0?
    难道前面的 内存参数理解都是错的?
    本人在学习这段内存相关的知识时, 翻阅了大量的资料
    acquire和relesae是从cppreference的网站上总结来的, 而且上面的
    同步锁(1千万次)的并发, 测试也没有问题, 解释的原理就是基于对 acquire-release的定义
*/










/** 
    memory_order_acq_rel
    其中 acq表示 读取的 acquire
         rel表示 存储的 release
    所以这个内存排序参数, 是来为 (read-write-store)的原子操作添加
    acquire和release两种内存约束
*/
std::atomic<int> sync(0);
void thread_1(){
  // ...
  sync.store(1,std::memory_order_release);
}

void thread_2(){
  int expected=1;
  while(!sync.compare_exchange_strong(expected,2,
              std::memory_order_acq_rel))
    expected=1;


    ...
}
void thread_3(){
  while(sync.load(std::memory_order_acquire)<2);
  // ...
}
/** 
    在thread_2中, compare_exchange_strong 函数
    对 原子变量的 sync 同时施加了 acquire 和 release两种内存约束
    编译器会在 这个函数内部 发生写的动作的地方加上release, 
                            发生读的动作的地方加上acquire

    这样的目的是形成 2对 acquire-relase, 如果难以理解,则上面thread_2的代码
    相当于:
*/
std::atomic<bool> sync1(false),sync2(false);
void thread_1(){
  sync1.store(true,std::memory_order_release);  // 1.设置sync1
}

void thread_2(){
  while(!sync1.load(std::memory_order_acquire));  
  sync2.store(true,std::memory_order_release);  
}
void thread_3(){
    while(!sync2.load(std::memory_order_acquire));   
}
/** 
    也就是说 acq_rel可以用同一个原子变量
    利用 compare_exchange_weak 或 compare_exchange_strong
    取代只有2种情况的bool的判断
    
    compare_exchange_weak要注意:
        当 sync的值和 expected相同时, 并不一定成功(前面说过)
        所以用 weak的时候,代码应该是:
        while(!sync.compare_exchange_weak(expected,2,
              std::memory_order_acq_rel) && (expected == 1))
            expeted = 1;

*/











/** 
    memroy_order_consume
    这里就不说了, 很少用, 而是也没明白, 以后有时间再来完善
*/


内存栅栏

  • 这个就比较容易理解了, 在 <font color=red>当前代码的地方</font>画了一条任何代码都无法跨越的线一样
#include <atomic>
#include <thread>
#include <assert.h>

std::atomic<bool> x,y;
std::atomic<int> z;

void write_x_then_y()
{
  x.store(true,std::memory_order_relaxed);  // 1
  std::atomic_thread_fence(std::memory_order_release);  // 2
  y.store(true,std::memory_order_relaxed);  // 3
}

void read_y_then_x()
{
  while(!y.load(std::memory_order_relaxed));  // 4
  std::atomic_thread_fence(std::memory_order_acquire);  // 5
  if(x.load(std::memory_order_relaxed))  // 6
    ++z;
}

int main()
{
  x=false;
  y=false;
  z=0;
  std::thread a(write_x_then_y);
  std::thread b(read_y_then_x);
  a.join();
  b.join();
  assert(z.load()!=0);  // 7
}

/** 
    注意, 1, 3, 4, 6 的内存排序参数都是relaxed, 表示
    不限制cpu或编译器重排指令
    但 2, 5 相当于 划开了一条线:
        1不可以越过2
        3不可以在2的前面
        4不可以越过5
        6不可以在5的前面
    并且 有acquire-release配对, acquire在读线程中适当的位置划线
                                release在写线程中适当的位置划线
*/


时间操作

概念

  • 时间是一个类, 提供了4种不同的信息:
    • 当前时间
    • 时间类型
    • 时钟节拍
    • 稳定时钟


时钟周期(节拍)

  • 所谓周期就是 <font color=red>一次动作要多长时间</font>, 在STL中用 <font color=red>std::ratio</font>表示
#include<chrono>
/** 
    ratio<_count_a, _count_b>
        两个模板参数是 非类型参数, 所以必须是一个常量(整数型)

    它只是单纯的表示 _count_a与_count_b的比率关系, 并不表示任何具体的意义
*/ 

/** 
    可以表示:
        > 1分钟60秒
        > 1小时60分
        > 1秒60次
        > 1个人60本书
        > 1块钱60辆奔驰
        ...
*/
ratio<1,60> 


CSL关于时间的ratio

#include<ratio>

using atto  = ratio<1, 1000000000000000000LL>;
using femto = ratio<1, 1000000000000000LL>;
using pico  = ratio<1, 1000000000000LL>;
using nano  = ratio<1, 1000000000>;
using micro = ratio<1, 1000000>;
using milli = ratio<1, 1000>;
using centi = ratio<1, 100>;
using deci  = ratio<1, 10>;
using deca  = ratio<10, 1>;
using hecto = ratio<100, 1>;
using kilo  = ratio<1000, 1>;
using mega  = ratio<1000000, 1>;
using giga  = ratio<1000000000, 1>;
using tera  = ratio<1000000000000LL, 1>;
using peta  = ratio<1000000000000000LL, 1>;
using exa   = ratio<1000000000000000000LL, 1>;


时间段

  • 在c++中是一个类, 直接上代码
#include<chrono>
/** 
    表示时间段的类是 std::duration<_Integer, ratio<_second,_unit>>

    其中 _Integer 可以是:
        int
        long
        long long 
        float
        dauble
        ...

        ratio用作第2个模板, 就有了具体的含义, 表示
            每_unit个单元,要用多少秒(_second)
*/
// 如1分钟, short表示储存的类型
// 表示每1个单位(分钟)要用60秒
duratoin<short, ratio<60,1>>        
//当然也可以用 ratio<120,2>(每2个单位(分钟)要用120秒), 只要比例一样, 并且足够存储


/// 如1毫秒, 不能用short(不够存储)
//// 表示 1000个单位(毫秒) 要用 1秒
duration<double, ratio<1,1000>>




/** 
    duration的值存储的多少个单位

    所以tmp在构造的时候的实参2表示 对应的 ratio中的 有2个单位(即2分钟), 也就是120秒
*/
std::chrono::duration<short, std::ratio<60,1>> tmp(2);

void test(){
    //以下 ratio具体成 分钟 来演示

    using _DD = std::chrono::duration<short, std::ratio<60, 1>>;
    _DD tmp(2);
    cout << _DD::period::den << endl;       //获取ratio中的单位, 即1
    cout << _DD::period::num << endl;       //获取ratio中的秒  , 即60

    cout << tmp.count() << endl;            //tmp有多少个单位(2个)

    // 获取多少秒(120)
    cout << tmp.count() * _DD::period::num / _DD::period::den << endl;
}


标准库提供的便利类(duration-time)

using nanoseconds  = duration<long long, nano>;
using microseconds = duration<long long, micro>;
using milliseconds = duration<long long, milli>;
using seconds      = duration<long long>;
using minutes      = duration<int, ratio<60>>;
using hours        = duration<int, ratio<3600>>;


c++14中的chrono_literals

  • 这个类是为了方便表示时间
#include<typeinfo>

void test(){
    using namespace std::chrono_literals;

    auto one_day=24h;
    cout << typeid(one_day).name() << endl;     
    //类型是hours  duration<int, ratio<3600>> ratio第2个模板参数省略了,默认是1


    auto half_an_hour=30min;
    auto max_time_between_messages=30ms;
}
/** 
    这c++真是强大, 这个居然不是编译器特性, 是操作符重载

    _NODISCARD constexpr chrono::hours operator"" h(unsigned long long _Val) noexcept {
        return chrono::hours(_Val);
    }
*/


duration_cast

  • duration可以转换
/** 
    就像int 和 short之间, 可以隐式转换, 也可以显示转换
    但转换方向不一样

    duration从大到小转换是隐式的
        时变秒
        分变秒
        时变分
        秒变毫秒/微秒/纳秒
*/

/** 
    其实第1就相当于隐式类型转换
        从 seconds到milliseconds
*/
std::chrono::milliseconds s(std::chrono::seconds(2));    //正确
//std::chrono::seconds s(std::chrono::milliseconds(2000));    //编译错误, 不能转换过去



/** 
    强制转换
*/
using _S = std::chrono::seconds;
using _M = std::chrono::milliseconds;
_S s(std::chrono::duration_cast<std::chrono::seconds>(_M(2000)));


c++17

/** 
    c++17后, 重载了算术操作符, 可以对 duration 作算术操作

    本质是内部存储的单位在运算
*/
std::chrono::duratoin<short,ratio<120,2>> tmp_a(4);
std::chrono::duratoin<short,ratio<120,2>> tmp_b(2);
cout << (tmp_a + tmp_b).count() << endl; // 6个单位
tmp_a++;    //tmp_a变为5个单位


colck

  • c++中有3个类来获取当前时间
    • system_clock
    • steady_clock
    • high_resolution_clock(==其实就是steady_clock==)

    <font color=red>这2个类(==最后1个不算==)都不是模板</font>

/** 
    它们内部都有
        duration
        time_point(下一节内容)

    system_clock的 周期(ratio)不稳定(因为它可以调整系统时间)
    steady_clock的 周期是稳定的精确到纳秒, 它们的用法是不一样的 
*/

/** 
    获取当前os的时间戳(秒)
*/
auto t_a = std::chrono::system_clock::now();

// 通过函数调用获取
cout << std::chrono::system_clock::to_time_t(t_a) << endl;

// 自己计算
auto c_a = (t_a.time_since_epoch().count() * chrono::system_clock::period::num / chrono::system_clock::period::den);

cout << c_a << endl;





/** 
    获取程序运行的时间(秒)
*/
auto begin = std::chrono::steady_clock::now();
this_thread::sleep_for(chrono::seconds(2));
auto end = std::chrono::steady_clock::now();
cout << std::chrono::duration_cast<chrono::microseconds>(end - begin).count() << endl;
/// 测试结果是 2000671 微秒


时间点

  • 对应的类是 <font color=red>std::chrono::time_point</font>


  • 时间点就是时间戳
/** 
    std::time_point<_Clock, _duration = typename _Clock::druation>
    
    第1个模板参数是:
        system_clock
        steady_clock
        high_resolution_clock
    第2个模板参数是:
        duration


    一般直接使用 clock_XXX的now()
*/


自定义的atomic

成为atomic的条件

  • 必须满足以下全为true
    • is_trivially_copyable<T>::value
    • is_copy_constructible<T>::value
    • is_move_constructible<T>::value
    • is_copy_assignable<T>::value
    • is_move_assignable<T>::value
class A {
public:
    int a;
};

int main(int arg, char** args){
    cout << is_trivially_copyable<A>::value;    
    cout << is_copy_constructible<A>::value;
    cout << is_move_constructible<A>::value;
    cout << is_copy_assignable<A>::value;
    cout << is_move_assignable<A>::value;

    atomic<A> tmp;  // _code_a  
    /** 
        只有上面的输出全部为true的时候_code_a才不会报错
    */
}


通用的原子操作

成员函数
    atomic_obj.store()
    atomic_obj.load()
    atomic.exchange()
    atomic_obj.compare_exchange_weak
    atomic_obj.compare_exchange_strong

全局函数:
    atomic_XXX
        eg:
            atomic_store(atomic_address, value)
            atomic_load(atomic_address)
            ...


自定义的atomic的锁

class A {
public:
    // 类型转换构造函数
    A(int _a) :a(_a) {}
private:
    int a;
};
int g_c = 0;
atomic<A> g_a{1};
void f_a() {
    A e{1};
    for (int i = 1;i < 20000001;) {

        /** 
            这里想用 compare_exchange_weak 来实现锁
            设计 if条件是:
                调用函数设置g_a的值为0时(当然也可以设置为其他的值)
            所以g_a的初始值必须与e相同, 当独占的线程抢到设置为0时, 其他线程调用com..._weak时
            就不可能设置成功
                因为第1个进入的线程已经存储了g_a为1, 其他线程调用时g_a与期望值(e)不同,所以会返回false
                当第1个进入的线程对共享操作完毕时, 会调用g_a.store设置g_a为1, 后续的线程才能跳过if

            ps: g_a.compare_exchange_weak(...) 是atomic操作, 它本身有返回值, 返回值虽然不是共享的
                但每个线程对返回值的判断其实是合理的, 也就是说 ! 的操作不需要atomic
        */
        if (!g_a.compare_exchange_weak(e, 0, memory_order_acq_rel)) {
            e = 1;  //atomic<A> 有 = 的重载, 1会调用类型转换ctor, 本身是atomic操作(重载中调用的是store)
                    // 但本身是局部变量, 其实没有必要atomic
            this_thread::yield();
            continue;
        }
        ++g_c;
        g_a.store(1);
        ++i;
    }
}




int main(int arg, char** args){
    {

        cout << atomic_is_lock_free(&g_a) << endl;  //返回true, 所以内部用的是atomic
        thread t_a(f_a);
        thread t_b(f_a);
        thread t_c(f_a);

        t_a.join();
        t_b.join();
        t_c.join();

        cout << g_c << endl;
        return 0;
    }
}


mutex数据结构

mutex的栈

/**
    基于mutex
*/
struct empty_stack: std::exception{
  const char* what() const throw();
};

template<typename T>
class threadsafe_stack{
private:
  std::stack<T> data;
  mutable std::mutex m;
public:
  threadsafe_stack(){}

  /** 
        拷贝构造的时候, 要保证 other只能有1个线程在操作
        所以要 锁住other这个对象
  */
  threadsafe_stack(const threadsafe_stack& other){
    std::lock_guard<std::mutex> lock(other.m);
    data=other.data;
  }

  threadsafe_stack& operator=(const threadsafe_stack&) = delete;

  /** 
    锁住当前栈对象
  */
  void push(T new_value){
    std::lock_guard<std::mutex> lock(m);
    data.push(std::move(new_value));  // 1
  }


  std::shared_ptr<T> pop(){
    std::lock_guard<std::mutex> lock(m);

    if(data.empty()) 
        throw empty_stack();  // 2

    
    // 移动ctor, 将当前栈的top迭代器(元素的指针), 转移到share_ptr中
    ///// 后面接着删除top元素(pop())
    const std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data.top())));  // 3

    data.pop();  // 4
    return res;
  }

  /// pop的另1个版本
  void pop(T& value){
    std::lock_guard<std::mutex> lock(m);
    if(data.empty()) throw empty_stack();
    value=std::move(data.top());  // 5
    data.pop();  // 6
  }


  /// 因为 获取元素是否为空的时候, 可能其他的线程正在push或pop
  ////// 所以这里也要同步
  bool empty() const{
    std::lock_guard<std::mutex> lock(m);
    return data.empty();
  }
};



/** 
    这里说一下, pop的设计不能通过返回值(元素)来给外界
        即 T& pop() 或 T pop() 或 T* pop()
   
    当返回是 T pop()时:
        T pop(){
            std::lock_guard<std::mutex> lock(m);

            if(data.empty()) 
                throw empty_stack();

            // 会调用T的拷贝构造或移动构造(如果定义)
            auto value = std::move(data.top());   //_code_a

            data.pop(); 

            //返回T, 所以这里会产生临时对象给外界, 会调用T的拷贝构造
            return value;
        }
     
   问题可能出现在:
        T的拷贝构造或移动构造不能保证是否异常(内存不够), 如果异常, 则pop是失败的


   当返回是 T& pop()
        T& pop(){
            std::lock_guard<std::mutex> lock(m);

            if(data.empty()) 
                throw empty_stack();

            auto& value = data.top();   //_code_a

            data.pop();                 //_code_b

            return value;
        }
    问题:
        data.top()返回的是元素的引用, 但code_b已经释放了


    当返回的 T* pop()时, 问题和T& pop()一样



    解决方案有2种:
        > 传递1个引用(如上面的void pop(T&)), 根据上面的语义, 想调用A的移动赋值(所以A要有这些成员函数)
          比 T pop() 版本好的是原因是不会调用拷贝构造或移动构造(所以不会产生新内存, 不会有内存不够的异常)
          
          
        > 使用智能计数指针, 这种好处是原始对象不会被破坏
*/


mutex的queue

template<typename T>
class threadsafe_queue
{
private:
  mutable std::mutex mut;
  std::queue<T> data_queue;
  std::condition_variable data_cond;

public:
  threadsafe_queue()
  {}

  void push(T data)
  {
    std::lock_guard<std::mutex> lk(mut);
    data_queue.push(std::move(data));
    data_cond.notify_one();  // 1
  }

  void wait_and_pop(T& value)  // 2
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});
    value=std::move(data_queue.front());
    data_queue.pop();
  }

  std::shared_ptr<T> wait_and_pop()  // 3
  {
    std::unique_lock<std::mutex> lk(mut);
    data_cond.wait(lk,[this]{return !data_queue.empty();});  // 4
    std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    return res;
  }

  bool try_pop(T& value)
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return false;
    value=std::move(data_queue.front());
    data_queue.pop();
    return true;
  }

  std::shared_ptr<T> try_pop()
  {
    std::lock_guard<std::mutex> lk(mut);
    if(data_queue.empty())
      return std::shared_ptr<T>();  // 5
    std::shared_ptr<T> res(
      std::make_shared<T>(std::move(data_queue.front())));
    data_queue.pop();
    return res;
  }

  bool empty() const
  {
    std::lock_guard<std::mutex> lk(mut);
    return data_queue.empty();
  }
};

/** 
    这里设计了2套取值方案
        > wait
        > try

    之所以有wait是因为有的线程可能用当前类的对象(假如是_mm) pop获取值时, 另一个
    线程可能同时通过_mm正在push, 所以pop时会等push完毕, 但要注意的是
    如果一直没有push的操作, pop可能一直wait, 所以要看情况使用
*/


  • 利用上面的queue实现抢一等奖
#define _DEBUG_ 1
#define _T_ID(_v) (*(unsigned int*)&(_v))
threadsafe_queue<int> t_q;

// threadsafe_queue<int>
using _T_Q = remove_cv<remove_reference<decltype(t_q)>::type>::type;

//threadsafe_queue<int>*
using _T_Q_P = _Add_pointer<_T_Q>::type;

class production {
public:
    void fun(promise<_T_Q_P>& _promise) {

#ifdef _DEBUG_
        printf("s-thread:%u s-q: \t%p\n", _T_ID(this_thread::get_id()), &_promise);
#endif 
        _promise.set_value(&t_q);       //_code_a
        t_q.push(2);                    //_code_b
    }
};

class consumer {
public:
    consumer() {
        /// 每个consumer产生自己的号码
        srand(static_cast<unsigned int>(chrono::steady_clock::now().time_since_epoch().count()));
        a = rand() % 3;

#ifdef _DEBUG_
        printf("this:%p \t号码:%d\n", this, a);
#endif
    }

    /** 
        抽奖
    */
    void fun(shared_future<_T_Q_P>& _share_future) {
        auto _id = _T_ID(this_thread::get_id());
#ifdef _DEBUG_
        printf("s-thread:%u s-f: \t%p\n", _id, &_share_future);  
        printf("s-thread:%u s-q: \t%p\n", _id, _share_future.get());  
#endif 
        
        /// 获取 threadsafe_queue<int>*, 会等待 _code_a完成
        auto _queue = _share_future.get();

        /// 因为要调用 try_pop(int&), 所以这里traits出 queue中的元素类型
        using _tmp_vaule_type = typename remove_pointer<remove_cv<remove_reference<decltype(_queue)>::type>::type>::type;
        
        /// 结果值
        _tmp_vaule_type::_value_type result{};

#ifdef _DEBUG_
        printf("s-thread:%u this:%p \t号码:%d\n", _id, this, a);
#endif

        /// try_pop是上锁同步的, 内部会取出top()
        if (_queue->try_pop(result)) {
#ifdef _DEBUG_
            printf("s-thread:%u this:%p \t号码:%d\n", _id, this, a);
#endif
            /// 如果自己重奖了, 就结束了, 后续的consumer在try_pop时, 会false
            if (result == a) {
                printf("s-thread:%u this:%p 种奖了\n", _id, this);
                return;
            }

            /// 如果自己没有重奖, 则要将抽的号码放回去
            _queue->push(result);
        }
        printf("s-thread:%u 谢谢惠顾\n", _id);
    }
private:
    int a;
};


int main(int arg, char** args) {
    production p;
    consumer b, c, d, e;

    
    promise<_T_Q_P> m_p;
    shared_future<_T_Q_P> share(m_p.get_future());
    cout << "m-thread m-p: \t" << &m_p << endl;
    cout << "m-thread m-f: \t" << &share << endl;
    cout << "m-thread m-q: \t" << &t_q << endl;

    /** 
        这里传递的全部是 ref, 也就是说子线程中引用的就是main函数些局部变量

        其中 future必须用 share_future, 因为有多个线程会调用future.get()

    */
    thread t_a(&production::fun,std::ref(p), std::ref(m_p));

    /// 因为本demo没有用wait_pop, 所以, 这里为了测试, 睡眠一下, 保证 t_q已经push入值了
    this_thread::sleep_for(chrono::seconds(2));

    thread t_b(&consumer::fun, std::ref(b), std::ref(share));
    thread t_c(&consumer::fun, std::ref(c), std::ref(share));
    thread t_d(&consumer::fun, std::ref(d), std::ref(share));
    thread t_e(&consumer::fun, std::ref(e), std::ref(share));

    t_a.join();
    t_b.join();
    t_c.join();
    t_d.join();
    t_e.join();   
}
/** 
    这里的demo只是熟悉怎么用上面的 queue, 以及回顾之前的promise
*/


atomic的栈

设计

  • 栈的操作主要有 <font color=red>push,pop,top</font>, 综合起来有2个操作:
    • 添加
    • 删除


整个stack的结构

/** 
    用list实现
*/

namespace lb {
    template<typename T>
    struct node {
        T data;
        node* next;
        node(const T& data_) :data(data_) {}
    };

    template<typename T>
    class stack{
    public:
        using _Node = node<T>;
        using _Node_pointer = typename std::add_pointer<_Node>::type;

        // 注意head是一个对象, 不是指针, 它内部存储的值是node*
        std::atomic<_Node*> head;

        // 压栈
        void push(const T&);
    };
}

push的设计

/** 
    接上面
*/
namespace{
#define no_operation ;

template<typename T>
void stack<T>::push(const T& _value) {
    //1. 创建新节点
    _Node_pointer new_node = new _Node(_value);

    //2. 新节点指向原来的head, head值是node*, 直接将head当前node*
    //// 必须用 load去访问head的空间, 因为是多线程的, 所以head这块共享空间必须是独占式访问
    new_node->next = head.load(); 


    //3. 原来的头部指向新节点 
    while (!head.compare_exchange_weak(new_node->next, new_node))
        no_operation
}

}
/** 
第3步的目的是 将head中存储的值(node*), 替换成第2步的new_node

但因为是多线程的环境, 所以必须保证 head这块共享空间为独占访问
并且理论上 第2步和第3步这两步应该是合起来的atomic操作, 但
这里没有用锁将这2步保护起来

这里使用了 compare_exchange_weak 函数:
    第1个参数是 期望值 new_node, 其中next中存储的是当前线程读取到的
    当前线程认为的head
    
    第2个参数是 new_node本身的地址


    只有 当前的head和new_node中存储的head是同一个时, 才会将当前
    head中存储的值指向new_node, 原因就是保证当前线程的第2步和第3步
    全被当前线程独占

    否则就说明当前线程准备执行第3步时, 被其他线程抢到,并且执行了其他线程的第3步
    导致共享的head修改了(也可能pop,总之导致head指向和当前new_node的next不是同一个) 
        这种情况下应该将 第2步的new_node尝试指向到最新值的head, 然后在下次的循环中再尝试交换
        一直到交换成功

    举个例子:
        2个线程 _a, _b
        同1个head, 初始指向3
    
    流程:
        1. _a.push(1)                   _b.push(6)
        2. _a>new_node(1)               _b>new_node(6)
        3. _a>new_node.next = head(3)   _b>new_node.next = head(3) 
        4. _a被暂停                        _b>head(3).c_e_w(new_node.next(3), new_node(6))
                                            head(6)
        5. _a醒了执行: _a>head(6).c_e_w(new_node.next(3), new_node(1))
                            head(6), new_node.next(6)

        可以看到流程4时, 出现了_a被暂停的情景(当然也可能_b暂停), 因为
        c_e_w是atomic, 只能有1个独占, 所以当_a抢到c_e_w不应该直接更新head,
        而是更新 期望值(new_node.next), 一直到(while)真正的head, 这个时候才可以更新

        ps: 如果不判断, 直接换, 则会造成内存泄露(例如_b的new_node(6))
*/


pop

  • pop的代码流程是:
    1. 获取head
    2. 获取head->next的old_head
    3. 设置head->next = old_head->next
    4. 返回 old_head的data
    5. 删除old_head 删除掉这1步, 最后再来完善
/**
    多线程下的问题:
        由于第5步, 所以第4步获取的data等于是野指针, 所以目前为止先不要第5步
*/
namespace lb{
template<typename T>   //记得要先在stack中声明
void stack<T>::pop(T& result){
    node* old_head = head.load();
    while(!head.compare_exchange_weak(old_head, old_head->next))
        no_operation
    
    result = old_head->data;    ///拷贝赋值
}

}

/** 
    问题:
        1. head本身就是空的时候, old_head->next 野指针 

        2. 使用引用获取pop的返回值, 最好是 shared_ptr, 因要stack有一个功能是top
            当通过top()获取head, 它在逻辑上应该和pop中获取的head是同一个, 所以不能拷贝

    
    思路:
        基于第2点, 前面设计的stack应该改成智能计数指针
*/
namespace lb {
    template<typename T>
    struct node {
        shared_ptr<T> data; //结点的本身的数据就是 计数指针

        node* next;

        node(const T& data_) :data(make_shared<T>(data_)) {}
    };

    template<typename T>
    class stack{
    public:
        using _Node = node<T>;
        using _Node_pointer = typename std::add_pointer<_Node>::type;

        std::atomic<_Node_pointer> head;

        // 压栈
        void push(const T&);

        shared_ptr<_Node> pop();
    };
}

#define no_operation ;


//// push
template<typename T>
void stack<T>::push(const T& _value) {
    _Node_pointer new_node = new _Node(_value);

    new_node->next = head.load(); 

    while (!head.compare_exchange_weak(new_node->next, new_node))
        no_operation
}



//// pop, 这个函数的作用就不解释了
template<typename T>
shared_ptr<T> stack<T>::pop() {
    _Node_pointer old_head = head.load();
    while (old_head && !head.compare_exchange_weak(old_head, old_head->next))
        no_operation;

    return old_head ? old_head->data : shared_ptr<T>();
}


/** 
    最后的问题, old_head没有被释放(即pop最开始的第5个操作)
*/
template<typename T>
shared_ptr<T> stack<T>::pop() {
    _Node_pointer old_head = head.load();
    while (old_head && !head.compare_exchange_weak(old_head, old_head->next))
        no_operation;

    shared_ptr<T> result;

code_release:
    if (old_head) {
        ++_pop;
        result.swap(old_head->data);
        delete (old_head);
    }

    return result;
}
/** 
    while的语句可以保证 old_head是不可能多个线程同时获得到指向同一个head的情况
    但本人看的 c++并发的书籍, 作者说可能有 多个线程指向同一个head的情况
    但这里并没有top相关的方法, 所以本人很疑惑, 特意附加了一个文件 附录1, 里面是
    原作者的探讨


    这里再给一个测试
        注意内存排序, 最好是默认的
*/
namespace lb {
    template<typename T>
    struct node {
        shared_ptr<T> data;
        node* next;
        node(const T& data_) :data(make_shared<T>(data_)) {}
    };

    template<typename T>
    class stack{
    public:
        using _Node = node<T>;
        using _Node_pointer = typename std::add_pointer<_Node>::type;

        std::atomic<_Node*> head;

        void push(const T&);

        shared_ptr<T> pop();
    };

#define no_operation 

atomic<int> _push{ 0 };
atomic<int> _pop{ 0 };

    template<typename T>
    void stack<T>::push(const T& _value) {
        _Node_pointer new_node = new _Node(_value);

        new_node->next = head.load(); 

        while (!head.compare_exchange_weak(new_node->next, new_node),memory_order_release,memory_order_relaxed)
            no_operation;

        ++_push;
    }

    template<typename T>
    shared_ptr<T> stack<T>::pop() {
        _Node_pointer old_head = head.load();
        while (old_head && !head.compare_exchange_weak(old_head, old_head->next),memory_order_release,memory_order_relaxed)
            no_operation;

        shared_ptr<T> result;

        if (old_head) {
            ++_pop;
            result.swap(old_head->data);
            delete (old_head);
        }
    
        return result;
    }

}

void t_push(lb::stack<int>& st, int i) {
    for (int j = 100; ++j < 111;) {
        st.push(i * 100 + j);
    }
}
int main(int arg, char** args) {
    vector<thread> vec;
    vec.reserve(40);

    lb::stack<int> st;

    for (int i = -1; ++i < 20;) {
        vec.emplace_back(&t_push, std::ref(st), i);
    }



    for (auto& i : vec) {
        i.join();
    }

    cout << lb::_push << endl;

    vec.clear();
    vec.reserve(40);
    for (int i = -1; ++i < 20;) {
        vec.emplace_back([&st]() {
            for (int i = -1; ++i < 10;) {
                auto tmp = st.pop();
                printf("t_id:%u value: %d\n", this_thread::get_id(), *tmp);
            }

            cout << "************************\n";
        });
    }
    for (auto& i : vec) {
        i.join();
    }

    cout << lb::_pop << endl;
}


©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容