{ 4 }CPP_使用互斥量保护共享数据

当一个线程使用特定互斥量锁住共享数据时,其他线程想要访问锁住的数据,都必须等到之前那个线程堆数据进行解锁后,才能进行访问。这就保证了所有线程能看到共享数据,而不破坏“不变量”。

C++中使用互斥量

通过实例化std::mutex创建互斥量,再通过成员函数lock()进行上锁,unlock()进行解锁。不过,这些步骤太过麻烦(需要再每个函数出口处调用unlock(),并且要考虑到异常的情况)。

因此,C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard(RAII:资源获取即初始化,拥有自动回收资源的功能)。它在构造的时候提供已锁的互斥量,并在析构的时候进行解锁。

#include <list>
#include <mutex>
#include <algorithm>

std::list<int> _list; // 1
std::mutex _mutex; //全局变量被一个全局的互斥量保护

// List_Find() 和 ADD都是互斥的,双方都不可能看到对方
void ADD(int new_value)
{
  std::lock_guard<std::mutex> guard(_mutex); 
  _list.push_back(new_value);
} 
bool List_Find(int value_to_find)
{
  std::lock_guard<std::mutex> guard(_mutex); 

  return
    std::find(_list.begin(), _list.end(), value_to_find) !=
        some_list.end();
}

不过,互斥量通常会与保护的数据放在同一个类中。因可以让互斥量和被保护的变量联系在一起,同时也可对类的功能进行封装,并进行数据保护。


精心的保护共享数据

除了检查成员函数不将 指针或引用 传递给它们的调用方之外。还有一点很重要,就是要检查它们是否把指针或引用传递给它们调用的、不在你控制下的函数。因为这时,函数可能没在互斥量保护的区域内,它可能会存储着指针或者引用,这样就很危险。更危险的是:将保护数据作为一个运行时参数。

class s_data
{
  int a;
  std::string b;
public:
  void do_something();
};

class wrapper
{ 
private:
  s_data data; 
  std::mutex m;
public:
  template<typename Function>
  void process_data(Function func)
  {
    std::lock_guard<std::mutex> l(m);
    func(data); // 2 malicious_function(data) 传递“保护”数据给用户函数
  }
};

some_data* unprotected; //入侵保护数据的变量
void malicious_function(s_data& protected_data) //极度危险将保护数据作为一个运行时参数
{
  unprotected = &protected_data;//得到保护数据
} 

wrapper x; 
void foo()//入口函数
{
  /*
  * foo能够绕过保护机制将函数 malicious_function 传递进去,
  * 在没有锁定互斥量的情况下调用 do_something()。
  */
  x.process_data(malicious_function); // 1 传递一个恶意函数
  unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}

怎么解决?
无论是函数返回值、存储在外部可见内存,还是以参数的形式传递到用户提供的函数中,都不要将受保护数据的 指针或引用 传递到互斥锁作用域之外


接口内在的条件竞争

即使在一个很简单的接口中,依旧可能遇到条件竞争。
比如下面的例子,构建一个类似于 std::stack 结构的堆栈容器类,除了构造函数和swap()以外,需要对 std::stack 提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素

template<typename T,typename Container=std::deque<T> >
class stack
{ 
public:
  explicit stack(const Container&);
  explicit stack(Container&& = Container());
  template <class Alloc> explicit stack(const Alloc&);
  template <class Alloc> stack(const Container&, const Alloc&);
  template <class Alloc> stack(Container&&, const Alloc&);
  template <class Alloc> stack(stack&&, const Alloc&);

  bool empty() const;
  size_t size() const;
  T& top();
  T const& top() const;
  void push(T const&);//push()一个新元素进堆
  void push(T&&);
  void pop();
  void swap(stack&&);
};

stack<int> s;
if (! s.empty()){ 
  int const value = s.top();
  s.pop();
  do_something(value);
}

在多线程当中,这段代码是有条件竞争的。
(因为当栈实例时共享时,empty和size返回的结果,会因为其他线程访问push或者pop函数,导致增加或减少元素,从而使得empty和size的返回结果出问题。还有一个隐藏的条件竞争,在调用top()和pop()之间。假设两个线程运行着前面的代码,并且都引用同一个栈对象s,这时也会产生。)

另外,假设有一个 stack<vector<int>> ,vector是一个动态容器,当你拷贝一个vetcor,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。所以当pop() 函数返回移除的值时,会有一个潜在的问题,这个值要返回到调用pop() 的时候,堆才会被移除堆顶元素。由于中间会拷贝这个数据作为返回值,此时拷贝构造函数抛出了一个异常,那么拷贝将会失败,虽然它已经从堆中移除了。

因此std::stack 的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从堆中移除(pop())。不幸的是,这样的分割却制造了本想避免或消除的条件竞争。

难道我们就没有办法了吗?

幸运的是,我们还有其他做法:

1. 传入一个引用

将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:

std::vector<int> result;
some_stack.pop(result);

不过,他有明显的缺点:需要构造出一个堆中类型的实例,用于接收目标值。由于它会临时构造一个实例,这会让消耗的时间和资源会比较大,所以不值得这么做。

另外,即使支持移动构造,甚至是拷贝构造(从而允许返回一个值)。但对于可赋值的存储类型,很多用户自定义类型可能都不支持赋值操作。

2. 无异常抛出的拷贝构造函数或移动构造函数

对于有返回值的pop()函数来说,只有当返回值时抛出异常是担忧。不过,很多类型都有拷贝构造函数,它们不会抛出异常,并且新标准中对“右值引用”的支持,让很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。

尽管能在编译时可使用 std::is_nothrow_copy_constructiblestd::is_nothrow_move_constructible 类型特征,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。虽然在用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型。但是有抛出异常的拷贝构造函数,又没有移动构造函数的自定义类型往往更多。

3. 返回指向弹出值的指针

指针的优势是自由拷贝,并且不会产生异常,缺点就是返回一个指针需要给对象的内存分配进行管理,对于选择这个方案的接口,使用 std::shared_ptr 是个不错的选择:不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。

4. “选项1 + 选项2”“选项1 + 选项3”

以下例子实现了选项1和选项3:


一份线程安全的封装.png

关于粗粒度锁和细粒度锁的思考:
粗粒度锁:一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,从而抵消了并发带来的性能提升。

细粒度锁:使用多个互斥量保护所有的数据,细粒度锁也有问题。如前所述,当增大互斥量覆盖数据的粒度时,只需要锁住一个互斥量。但是,这种方案并非通用,比如:互斥量正在保护一个独立类的实例。这种情况下,锁的状态的下一个阶段,不是离开锁定区域,将锁定区域还给用户;就是有独立的互斥量去保护这个类的全部实例。


死锁

产生死锁的四个必要条件,只要其中任一条件不成立,死锁就不会发生:
(1) 互斥条件:一个资源每次只能被一个进程使用。
(2) 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
(3) 不剥夺条件:线程已获得的资源,在末使用完之前,不能强行剥夺。
(4) 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。

避免死锁的一般建议,总是按照相同的顺序来锁定两个mutexes, 即总是先于mutex B之前lock mutex A,这样就可以避免死锁。而C++标准库可以解决这个问题:std::lock — 可以一次性锁住多个(两个以上)的互斥量,并且没有死锁风险。

// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{ 

private:
    some_big_object some_detail;
    std::mutex m;

public:
    X(some_big_object const& sd):some_detail(sd){}
    
    friend void swap(X& lhs, X& rhs)
    {
        /*
        *  检查参数是否是不同的实例,因为操作试图获取 std::mutex 对象上的锁。
        *  所以当其被获取时,结果很难预料。
        */
        if(&lhs==&rhs)
            return;
        
        std::lock(lhs.m,rhs.m); // 调用 std::lock() 锁住两个互斥量

        /*
        * 两个 std:lock_guard 实例已经创建好,
        * 并且提供 std::adopt_lock 参数,表示 Mutex 对象已被当前线程锁住,
        * 交由 std::lock_guard 对象管理。
        */
        std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
        std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);

        swap(lhs.some_detail,rhs.some_detail);
    }
};

需要注意的是,当使用 std::lock 去锁 lhs.mrhs.m 时,可能会抛出异常。这种情况下,当 std::lock 成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时异常抛出,第一个锁也会随着异常的产生而自动释放,所以 std::lock 要么将两个锁都锁住,要不一个都不锁。

未完待续...

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。