当一个线程使用特定互斥量锁住共享数据时,其他线程想要访问锁住的数据,都必须等到之前那个线程堆数据进行解锁后,才能进行访问。这就保证了所有线程能看到共享数据,而不破坏“不变量”。
C++中使用互斥量
通过实例化std::mutex创建互斥量,再通过成员函数lock()进行上锁,unlock()进行解锁。不过,这些步骤太过麻烦(需要再每个函数出口处调用unlock(),并且要考虑到异常的情况)。
因此,C++标准库为互斥量提供了一个RAII语法的模板类std::lock_guard(RAII:资源获取即初始化,拥有自动回收资源的功能)。它在构造的时候提供已锁的互斥量,并在析构的时候进行解锁。
#include <list>
#include <mutex>
#include <algorithm>
std::list<int> _list; // 1
std::mutex _mutex; //全局变量被一个全局的互斥量保护
// List_Find() 和 ADD都是互斥的,双方都不可能看到对方
void ADD(int new_value)
{
std::lock_guard<std::mutex> guard(_mutex);
_list.push_back(new_value);
}
bool List_Find(int value_to_find)
{
std::lock_guard<std::mutex> guard(_mutex);
return
std::find(_list.begin(), _list.end(), value_to_find) !=
some_list.end();
}
不过,互斥量通常会与保护的数据放在同一个类中。因可以让互斥量和被保护的变量联系在一起,同时也可对类的功能进行封装,并进行数据保护。
精心的保护共享数据
除了检查成员函数不将 指针或引用 传递给它们的调用方之外。还有一点很重要,就是要检查它们是否把指针或引用传递给它们调用的、不在你控制下的函数。因为这时,函数可能没在互斥量保护的区域内,它可能会存储着指针或者引用,这样就很危险。更危险的是:将保护数据作为一个运行时参数。
class s_data
{
int a;
std::string b;
public:
void do_something();
};
class wrapper
{
private:
s_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data); // 2 malicious_function(data) 传递“保护”数据给用户函数
}
};
some_data* unprotected; //入侵保护数据的变量
void malicious_function(s_data& protected_data) //极度危险将保护数据作为一个运行时参数
{
unprotected = &protected_data;//得到保护数据
}
wrapper x;
void foo()//入口函数
{
/*
* foo能够绕过保护机制将函数 malicious_function 传递进去,
* 在没有锁定互斥量的情况下调用 do_something()。
*/
x.process_data(malicious_function); // 1 传递一个恶意函数
unprotected->do_something(); // 3 在无保护的情况下访问保护数据
}
怎么解决?
无论是函数返回值、存储在外部可见内存,还是以参数的形式传递到用户提供的函数中,都不要将受保护数据的 指针或引用 传递到互斥锁作用域之外。
接口内在的条件竞争
即使在一个很简单的接口中,依旧可能遇到条件竞争。
比如下面的例子,构建一个类似于 std::stack 结构的堆栈容器类,除了构造函数和swap()以外,需要对 std::stack 提供五个操作:push()一个新元素进栈,pop()一个元素出栈,top()查看栈顶元素,empty()判断栈是否是空栈,size()了解栈中有多少个元素
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);//push()一个新元素进堆
void push(T&&);
void pop();
void swap(stack&&);
};
stack<int> s;
if (! s.empty()){
int const value = s.top();
s.pop();
do_something(value);
}
在多线程当中,这段代码是有条件竞争的。
(因为当栈实例时共享时,empty和size返回的结果,会因为其他线程访问push或者pop函数,导致增加或减少元素,从而使得empty和size的返回结果出问题。还有一个隐藏的条件竞争,在调用top()和pop()之间。假设两个线程运行着前面的代码,并且都引用同一个栈对象s,这时也会产生。)
另外,假设有一个 stack<vector<int>> ,vector是一个动态容器,当你拷贝一个vetcor,标准库会从堆上分配很多内存来完成这次拷贝。当这个系统处在重度负荷,或有严重的资源限制的情况下,这种内存分配就会失败,所以vector的拷贝构造函数可能会抛出一个 std::bad_alloc 异常。所以当pop() 函数返回移除的值时,会有一个潜在的问题,这个值要返回到调用pop() 的时候,堆才会被移除堆顶元素。由于中间会拷贝这个数据作为返回值,此时拷贝构造函数抛出了一个异常,那么拷贝将会失败,虽然它已经从堆中移除了。
因此std::stack 的设计人员将这个操作分为两部分:先获取顶部元素(top()),然后从堆中移除(pop())。不幸的是,这样的分割却制造了本想避免或消除的条件竞争。
难道我们就没有办法了吗?
幸运的是,我们还有其他做法:
1. 传入一个引用
将变量的引用作为参数,传入pop()函数中获取想要的“弹出值”:
std::vector<int> result;
some_stack.pop(result);
不过,他有明显的缺点:需要构造出一个堆中类型的实例,用于接收目标值。由于它会临时构造一个实例,这会让消耗的时间和资源会比较大,所以不值得这么做。
另外,即使支持移动构造,甚至是拷贝构造(从而允许返回一个值)。但对于可赋值的存储类型,很多用户自定义类型可能都不支持赋值操作。
2. 无异常抛出的拷贝构造函数或移动构造函数
对于有返回值的pop()函数来说,只有当返回值时抛出异常是担忧。不过,很多类型都有拷贝构造函数,它们不会抛出异常,并且新标准中对“右值引用”的支持,让很多类型都将会有一个移动构造函数,即使他们和拷贝构造函数做着相同的事情,它也不会抛出异常。
尽管能在编译时可使用 std::is_nothrow_copy_constructible
和 std::is_nothrow_move_constructible
类型特征,让拷贝或移动构造函数不抛出异常,但是这种方式的局限性太强。虽然在用户自定义的类型中,会有不抛出异常的拷贝构造函数或移动构造函数的类型。但是有抛出异常的拷贝构造函数,又没有移动构造函数的自定义类型往往更多。
3. 返回指向弹出值的指针
指针的优势是自由拷贝,并且不会产生异常,缺点就是返回一个指针需要给对象的内存分配进行管理,对于选择这个方案的接口,使用 std::shared_ptr 是个不错的选择:不仅能避免内存泄露(因为当对象中指针销毁时,对象也会被销毁),而且标准库能够完全控制内存分配方案,也就不需要new和delete操作。
4. “选项1 + 选项2” 或 “选项1 + 选项3”
以下例子实现了选项1和选项3:
关于粗粒度锁和细粒度锁的思考:
粗粒度锁:一个全局互斥量要去保护全部共享数据,在一个系统中存在有大量的共享数据时,因为线程可以强制运行,甚至可以访问不同位置的数据,从而抵消了并发带来的性能提升。
细粒度锁:使用多个互斥量保护所有的数据,细粒度锁也有问题。如前所述,当增大互斥量覆盖数据的粒度时,只需要锁住一个互斥量。但是,这种方案并非通用,比如:互斥量正在保护一个独立类的实例。这种情况下,锁的状态的下一个阶段,不是离开锁定区域,将锁定区域还给用户;就是有独立的互斥量去保护这个类的全部实例。
死锁
产生死锁的四个必要条件,只要其中任一条件不成立,死锁就不会发生:
(1) 互斥条件:一个资源每次只能被一个进程使用。
(2) 请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
(3) 不剥夺条件:线程已获得的资源,在末使用完之前,不能强行剥夺。
(4) 循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。
避免死锁的一般建议,总是按照相同的顺序来锁定两个mutexes, 即总是先于mutex B之前lock mutex A,这样就可以避免死锁。而C++标准库可以解决这个问题:std::lock — 可以一次性锁住多个(两个以上)的互斥量,并且没有死锁风险。
// 这里的std::lock()需要包含<mutex>头文件
class some_big_object;
void swap(some_big_object& lhs,some_big_object& rhs);
class X
{
private:
some_big_object some_detail;
std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
/*
* 检查参数是否是不同的实例,因为操作试图获取 std::mutex 对象上的锁。
* 所以当其被获取时,结果很难预料。
*/
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m); // 调用 std::lock() 锁住两个互斥量
/*
* 两个 std:lock_guard 实例已经创建好,
* 并且提供 std::adopt_lock 参数,表示 Mutex 对象已被当前线程锁住,
* 交由 std::lock_guard 对象管理。
*/
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
swap(lhs.some_detail,rhs.some_detail);
}
};
需要注意的是,当使用 std::lock
去锁 lhs.m
或 rhs.m
时,可能会抛出异常。这种情况下,当 std::lock
成功的获取一个互斥量上的锁,并且当其尝试从另一个互斥量上再获取锁时异常抛出,第一个锁也会随着异常的产生而自动释放,所以 std::lock
要么将两个锁都锁住,要不一个都不锁。