C++ Concurrency in Action 2nd Edition note
3.2 用互斥量保护共享数据
在访问共享数据之前对mutex
加锁,访问完成后对mutex
解锁。其他想对mutex
加锁的线程必须等待,直到对mutex
加锁了的线程释放锁。
std::mutex
有lock()
和unlock()
成员函数。但是不推荐直接使用成员函数进行锁操作,因为需要确保所有代码路径都覆盖。推荐使用std::lock_guard
,它是RAII
的,自动对mutex
进行加锁和解锁:
#include<list>
#include<mutex>
#include<algorithm>
std::list<int> some_list;
std::mutex some_mutex;
void add_to_list(int new_value){
std::lock_guard<std::mutex> guard(some_mutex);
some_list.push_back(new_value);
}
bool list_contains(int value_to_find){
std::lock_guard<std::mutex> guard(some_mutex);
return std::find(some_list.begin(),some_list.end(),value_to_find)!=some_list.end();
}
C++17
的类模板参数推导特性可以将std::lock_guard
实例的定义改为:
std::lock_guard guard(some_mutex);
C++17
引入了std::scoped_lock
:
std::scoped_lock guard(some_mutex);
通常的做法是写一个类,add_to_list
、list_contains
等函数作为成员函数,数据、互斥量等称为类的私有数据。但是如果类的某个函数返回指向保护数据的指针或者引用,那么这样的类就不能很好地保护共享数据。
还要防止各种形式的把保护数据的指针或引用传到类外:
class some_data
{
int a;
std::string b;
public:
void do_something()
{}
};
class data_wrapper
{
private:
some_data data;
std::mutex m;
public:
template<typename Function>
void process_data(Function func)
{
std::lock_guard<std::mutex> l(m);
func(data);
}
};
some_data* unprotected;
void malicious_function(some_data& protected_data)
{
unprotected=&protected_data;
}
data_wrapper x;
void foo()
{
x.process_data(malicious_function);
unprotected->do_something();
}
原则:保护数据的指针或引用,不要从函数返回它们,不要把它们存储在外部可见的内存,不要把它们作为参数传递给用户提供的函数。
考虑一个双向链表。为了能够安全地删除一个节点,你需要同时保护三个节点。更好的办法是使用一个mutex
保护整个链表。
接口间的竞争条件。考虑std::stack
:
template<typename T,typename Container=std::deque<T> >
class stack
{
public:
explicit stack(const Container&);
explicit stack(Container&& = Container());
template <class Alloc> explicit stack(const Alloc&);
template <class Alloc> stack(const Container&, const Alloc&);
template <class Alloc> stack(Container&&, const Alloc&);
template <class Alloc> stack(stack&&, const Alloc&);
bool empty() const;
size_t size() const;
T& top();
T const& top() const;
void push(T const&);
void push(T&&);
void pop();
void swap(stack&&);
template<class … Args> void emplace(Args&& … args);
};
这里的问题是empty()
和size()
的结果不可靠。它们被调用的时候是正确的,一旦它们返回,在线程使用它们返回的信息之前,pop()
或者push()
可能对stack
进行了修改。
当stack
不共享时,下面的代码是正确的:
stack<int > s;
if(!s.empty()){
int const value=s.top();
s.pop();
do_something(value);
}
这是单线程安全的,对一个空栈调用top()
是未定义行为。对一个共享栈对象,上面的代码不再安全。在empty()
和top()
之间,可能有其他线程调用pop()
删除了最后一个元素。这是竞争条件,但是mutex不能阻止它,这是接口间的竞争。
解决方案是改变接口。
top()
和pop()
之间也有竞争条件:
假设栈开始时有两个元素。某个元素被读出了两遍,然后两个元素都被删除了。有一个元素无声无息地消失了。
Tom Cargill
指出,如果stack
的拷贝构造函数能抛出异常,那么组合调用会出问题。Herb Sutter
用异常安全解决了。但是潜在的竞争条件带来了新的问题。
考虑stack<vector<int>>
,vector
是一个动态分配的容器,拷贝vector
时,必须从栈中分配一些内存。如果内存分配失败,那么vector
的拷贝构造函数抛出std::bad_alloc
。如果pop()
返回值的同时,同时将该值从栈上删除:值的返回是在栈中的值被删除后,但是返回值的拷贝可能抛出异常。std::stack
的作者将这两个操作分成了top()
和pop()
。如果不能安全地拷贝数据的话,数据还留在栈中。
但是这样的划分造成了竞争条件。
选择1:传递引用
pop()
接收的参数是引用:
std::vector<int> result;
some_stack.pop(result);
缺点是,必须先构造栈元素的实例。对于一些类型来说,构造一个实例很昂贵,需要很多时间或者资源。有些类型的构造函数需要的参数此时可能不可用。它要求元素是可赋值的。有些用户自定义的类型不支持赋值。
选择2:需要一个不抛出异常的拷贝构造函数或者移动构造函数
可以通过std::is_nothrow_copy_constructible
和std::is_nothrow_move_constructible
检查移动或拷贝构造函数是否抛出异常。
选择3:返回一个指向出栈项的指针
优点是指针可以自由地拷贝而不会抛出异常。缺点是需要管理指针。std::shared_ptr
是一个好的选择。
选择4:选择1加上选择2或者3
如果选择了选择2或3,那么提供选择1也很容易。
选择了选择1和3,没有竞争条件的栈的实现:
#include <exception>
#include <memory>
struct empty_stack: std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
public:
threadsafe_stack();
threadsafe_stack(const threadsafe_stack&);
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value);
std::shared_ptr<T> pop();
void pop(T& value);
bool empty() const;
};
即使调用empty()
后栈被修改,程序依然正常。如果栈为空,pop()
函数抛出empty_stack
异常。
#include <exception>
#include <stack>
#include <mutex>
#include <memory>
struct empty_stack: std::exception
{
const char* what() const throw()
{
return "empty stack";
}
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack(){}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data=other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(new_value);
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty()) throw empty_stack();
value=data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
在拷贝构造函数的函数体而不是在初始化列表中进行拷贝,是为了确保在拷贝的过程中持有mutex
。
std::lock
函数可以一次锁定两个或多个mutex
:
#include <mutex>
class some_big_object
{};
void swap(some_big_object& lhs,some_big_object& rhs)
{}
class X
{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::lock(lhs.m,rhs.m);
std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);
std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);
swap(lhs.some_detail,rhs.some_detail);
}
};
首先确保传入的是两个不同的实例,因为在std::mutex
上尝试获取已经被占有锁是未定义行为。(允许一个线程多次获取锁的是std::recursive_mutex
。)std::adopt_lock
参数表明mutex
·已经锁定,std::lock_guard
实例应该接收mutex
已经存在的锁的所有权,而不是在构造函数中锁定mutex
。
std::lock
要么两个mutex
都锁定。如果第一个mutex
锁定成功,锁定第二个mutex
抛出异常,那么会释放第一个mutex
的锁。
std::scoped_lock<>
的功能类似于std::lock_guard<>
,但它是一个可变模板,可以接收多个mutex
实例作为参数。它在构造函数中用std::lock
一样的算法加锁,在析构函数中释放锁。上面的swap
函数可以重写为:
void swap(X& lhs, X& rhs){
if(&lhs==&rhs)
return;
std::scoped_lock guard(lhs.m, rhs.m);
swap(lhs.some_detail, rhs.some_detail);
}
两个线程,互相等待对方也会发生死锁。
避免死锁的原则:如果线程A被线程B等待,那么线程A就不要等待任何线程。
避免死锁的原则:
1.避免嵌套锁
如果已经持有锁,那么不要请求锁。如果需要请求多个锁,使用std::lock
一次性获取。
2.持有锁时,避免调用用户提供的代码
因为用户提供的代码,你不知道它将干什么。
3.以固定的顺序请求锁
如果需要获取多个锁,并且不能像std::lock
那样一次获取,那么可以在每个线程按相同的顺序获取锁。
4.使用层次锁
#include <mutex>
class hierarchical_mutex
{
public:
explicit hierarchical_mutex(unsigned level)
{}
void lock()
{}
void unlock()
{}
};
hierarchical_mutex high_level_mutex(10000);
hierarchical_mutex low_level_mutex(5000);
int do_low_level_stuff()
{
return 42;
}
int low_level_func()
{
std::lock_guard<hierarchical_mutex> lk(low_level_mutex);
return do_low_level_stuff();
}
void high_level_stuff(int some_param)
{}
void high_level_func()
{
std::lock_guard<hierarchical_mutex> lk(high_level_mutex);
high_level_stuff(low_level_func());
}
void thread_a()
{
high_level_func();
}
hierarchical_mutex other_mutex(100);
void do_other_stuff()
{}
void other_stuff()
{
high_level_func();
do_other_stuff();
}
void thread_b()
{
std::lock_guard<hierarchical_mutex> lk(other_mutex);
other_stuff();
}
每次请求的锁的值必须比当前值要小。
#include <mutex>
#include <stdexcept>
#include <climits>
class hierarchical_mutex
{
std::mutex internal_mutex;
unsigned long const hierarchy_value;
unsigned long previous_hierarchy_value;
static thread_local unsigned long this_thread_hierarchy_value;
void check_for_hierarchy_violation()
{
if(this_thread_hierarchy_value <= hierarchy_value)
{
throw std::logic_error("mutex hierarchy violated");
}
}
void update_hierarchy_value()
{
previous_hierarchy_value=this_thread_hierarchy_value;
this_thread_hierarchy_value=hierarchy_value;
}
public:
explicit hierarchical_mutex(unsigned long value):
hierarchy_value(value),
previous_hierarchy_value(0)
{}
void lock()
{
check_for_hierarchy_violation();
internal_mutex.lock();
update_hierarchy_value();
}
void unlock()
{
this_thread_hierarchy_value=previous_hierarchy_value;
internal_mutex.unlock();
}
bool try_lock()
{
check_for_hierarchy_violation();
if(!internal_mutex.try_lock())
return false;
update_hierarchy_value();
return true;
}
};
thread_local unsigned long
hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);
死锁不单单是由锁引起的,任何导致等待循环的同步结构都会引起死锁。持有锁等待另一个线程是危险的,因为该线程可能需要获取你手上的锁。如果等待另一个线程的完成,那么识别线程层次是必要的。一个线程只等待低层次的线程。一种简单的办法是在启动线程的函数中连接线程。
std::unique_lock
不总是持有mutex
的所有权。可以传递std::adopt_lock
、std::defer_lock
等作为参数。std::defer_lock
表示构造时不加锁。锁可以后面通过调用std::unique_lock
对象的lock()
成员,或者将std::unique_lock
对象传给std::lock()
。std::unique_lock
比std::lock_guard
需要更多的空间,并且稍慢一点。std::unique_lock
需要记录所有权信息,需要更新所有权信息。
#include <mutex>
class some_big_object
{};
void swap(some_big_object& lhs,some_big_object& rhs)
{}
class X
{
private:
some_big_object some_detail;
mutable std::mutex m;
public:
X(some_big_object const& sd):some_detail(sd){}
friend void swap(X& lhs, X& rhs)
{
if(&lhs==&rhs)
return;
std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock);
std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock);
std::lock(lock_a,lock_b);
swap(lhs.some_detail,rhs.some_detail);
}
};
owns_lock()
获取所有权的标志。
所有权标志存储在std::unique_lock
,所以std::unique_lock
对象比std::lock_guard
对象要大。性能也稍差,因为要更新和检查标志。std::unique_lock
通常用于需要延迟加锁,或者需要转移所有权的地方。
一个应用是在函数中锁定一个mutex
然后将所有权转移给调用者,调用者在这个锁的保护下可以进行额外的操作:
std::unique_lock<std::mutex> get_lock(){
extern std::mutex some_mutex;
std::unique_lock<std::mutex> lk(some_mutex);
prepare_data();
return lk;
}
void process_data(){
std::unique_lock<std::mutex> lk(get_lock());
do_something();
}
std::unique_lock
中的锁可以在不需要的时候显式释放。
void get_and_process_data(){
std::unique_lock<std::mutex> my_lock(the_mutex);
some_class data_to_process=get_next_data_chunk();
my_lock.unlock();
result_type result=process(data_to_process);
my_lock.lock();
write_result(data_to_process, result);
}
持有锁时不要进行消耗时间长的操作。
#include <mutex>
class Y
{
private:
int some_detail;
mutable std::mutex m;
int get_detail() const
{
std::lock_guard<std::mutex> lock_a(m);
return some_detail;
}
public:
Y(int sd):some_detail(sd){}
friend bool operator==(Y const& lhs, Y const& rhs)
{
if(&lhs==&rhs)
return true;
int const lhs_value=lhs.get_detail();
int const rhs_value=rhs.get_detail();
return lhs_value==rhs_value;
}
};
比较操作获取比较值的时候加锁。因为都是拷贝,所以比较时不需要加锁。但是在比较前两个值的原始值可能发生了改变,在两次读原始值的时候,原始值可能发生了交换,所以改变了比较的含义。如果在一个操作的整个过程中没有持有所需的锁,那么会造成条件竞争。
3.3 保护共享数据的可选设施
有些资源构造很昂贵,所以延迟初始化时先检查是否初始化了:
std::shared_ptr<some_resource> resource_ptr;
void foo(){
if(!resource_ptr){
resource_ptr.reset(new some_resource);
}
resource_ptr->do_something();
}
延迟初始化部分需要保护:
std::shared_ptr<some_resource> resource_ptr;
std::mutex resource_mutex;
void foo(){
std::unique_lock<std::mutex> lk(resource_mutex);
if(!resource_ptr){
resource_ptr.reset(new some_resource);
}
lk.unlock();
resource_ptr->do_something();
}
上面的方法足以。但是很多人尝试臭名昭著的两次检测。先检测指针,然后请求锁,然后再检测指针,以防在第一次检测指针和请求锁之间,其他线程作了初始化:
void undefined_behaviour_with_double_checked_locking(){
if(!resource_ptr){
std::lock_gurad<std::mutex> lk(resource_mutex);
if(!resource_ptr){
resource_ptr.reset(new some_resource);
}
}
resource_ptr->do_something();
}
两次检测存在竞争条件,第一次检测读指针的值和第二次检测后写指针的值是不同步的。竞争条件不仅包括指针本身,还包括指针所指的对象。一个线程可能看见了另一个线程在写指针,但是它可能没有看见新创建的some_resource
实例,导致do_something()
在一个错误的值上操作(即有一个线程在创建some_resource
实例,但是还没有写到指针,所以另一个线程通过了两次指针检测,也创建了some_resource
实例)。这是数据竞争,会产生不确定的行为。
C++提供std::onece_flag
和std::call_once
来解决这种情况。每个线程都可以使用std::call_once
,指针会安全地被某个线程初始化。需要同步的数据保存在std::once_flag
,一个std::once_flag
对应一个初始化。使用std::call_once
的负担比使用mutex
小,特别是当初始化已经完成的情形。所以应该优先使用。下面是用std::call_once
重写上面的例子:
std::shared_ptr<some_resource> resource_ptr;
std::once_flag resource_flag;
void init_resource(){
resource_ptr.reset(new some_resource);
}
void foo(){
std::call_once(resource_flag, init_resource);
resource_ptr->do_something();
}
std::call_once()
很容易用于类成员的延迟初始化:
class X
{
private:
connection_info connection_details;
connection_handle connection;
std::once_flag connection_init_flag;
void open_connection()
{
connection=connection_manager.open(connection_details);
}
public:
X(connection_info const& connection_details_):
connection_details(connection_details_)
{}
void send_data(data_packet const& data)
{
std::call_once(connection_init_flag,&X::open_connection,this);
connection.send_data(data);
}
data_packet receive_data()
{
std::call_once(connection_init_flag,&X::open_connection,this);
return connection.receive_data();
}
};
std::once_flag
不可复制或移动,所以你必须自定义一些特殊成员函数。
静态局部变量存在竞争条件。在C++11
中,静态局部变量的初始化只在一个线程中进行,竞争条件是在哪个线程中进行。
class my_class;
my_class& get_my_class_instance(){
static my_class instance;
return instance;
}
多线程可以安全地调用get_my_class_instance()
,不用担心初始化的竞争条件。
对于偶尔进行更新的数据,c++提供了std::shared_mutex
和std::shared_timed_mutex
。std::shared_mutex
性能更好。
需要独占访问的使用std::lock_guard<std::shared_mutex>
或者std::unique_lock<std::shared_mutex>
,需要共享访问可以使用std::shared_lock<std::shared_mutex>
。如果线程持有一个共享锁,那么试图获得排他锁的线程被阻塞,直到所有线程释放了他们的锁;线程持有排他锁,那么没有其他线程可以获得共享锁或者排他锁,直到线程释放了它的锁:
#include <map>
#include <string>
#include <mutex>
#include <shared_mutex>
class dns_entry
{};
class dns_cache
{
std::map<std::string,dns_entry> entries;
std::shared_mutex entry_mutex;
public:
dns_entry find_entry(std::string const& domain)
{
std::shared_lock<std::shared_mutex> lk(entry_mutex);
std::map<std::string,dns_entry>::const_iterator const it=
entries.find(domain);
return (it==entries.end())?dns_entry():it->second;
}
void update_or_add_entry(std::string const& domain,
dns_entry const& dns_details)
{
std::lock_guard<std::shared_mutex> lk(entry_mutex);
entries[domain]=dns_details;
}
};
std::recursive_mutex
可以重复加锁。但是当你需要递归锁的时候,就需要考虑你的设计了。递归锁的普遍用法是,一个类设计为允许多个线程并发访问。每个公共成员函数加锁mutex
,干活,然后释放锁。有时一个公共成员还是会调用另一个公共成员函数。此时就需要递归锁。
但是这种设计不好。类的不变性被打破。更好的方法是提取一个新的有锁的私有成员函数,然后被无锁的公共成员函数调用。