03 sharing data between threads

C++ Concurrency in Action 2nd Edition note

3.2 用互斥量保护共享数据

在访问共享数据之前对mutex加锁,访问完成后对mutex解锁。其他想对mutex加锁的线程必须等待,直到对mutex加锁了的线程释放锁。

std::mutexlock()unlock()成员函数。但是不推荐直接使用成员函数进行锁操作,因为需要确保所有代码路径都覆盖。推荐使用std::lock_guard,它是RAII的,自动对mutex进行加锁和解锁:


#include<list>

#include<mutex>

#include<algorithm>

std::list<int> some_list;

std::mutex some_mutex;

void add_to_list(int new_value){

 std::lock_guard<std::mutex> guard(some_mutex);

 some_list.push_back(new_value);

}

bool list_contains(int value_to_find){

 std::lock_guard<std::mutex> guard(some_mutex);

 return std::find(some_list.begin(),some_list.end(),value_to_find)!=some_list.end();

}

C++17的类模板参数推导特性可以将std::lock_guard实例的定义改为:

std::lock_guard guard(some_mutex);

C++17引入了std::scoped_lock

std::scoped_lock guard(some_mutex);

通常的做法是写一个类,add_to_listlist_contains等函数作为成员函数,数据、互斥量等称为类的私有数据。但是如果类的某个函数返回指向保护数据的指针或者引用,那么这样的类就不能很好地保护共享数据。

还要防止各种形式的把保护数据的指针或引用传到类外:


class some_data

{

 int a;

 std::string b;

public:

 void do_something()

 {}

};

class data_wrapper

{

private:

 some_data data;

 std::mutex m;

public:

 template<typename Function>

 void process_data(Function func)

 {

 std::lock_guard<std::mutex> l(m);

 func(data);

 }

};

some_data* unprotected;

void malicious_function(some_data& protected_data)

{

 unprotected=&protected_data;

}

data_wrapper x;

void foo()

{

 x.process_data(malicious_function);

 unprotected->do_something();

}

原则:保护数据的指针或引用,不要从函数返回它们,不要把它们存储在外部可见的内存,不要把它们作为参数传递给用户提供的函数。

考虑一个双向链表。为了能够安全地删除一个节点,你需要同时保护三个节点。更好的办法是使用一个mutex保护整个链表。

接口间的竞争条件。考虑std::stack:

template<typename T,typename Container=std::deque<T> >

class stack

{

public:

 explicit stack(const Container&);

 explicit stack(Container&& = Container());

 template <class Alloc> explicit stack(const Alloc&);

 template <class Alloc> stack(const Container&, const Alloc&);

 template <class Alloc> stack(Container&&, const Alloc&);

 template <class Alloc> stack(stack&&, const Alloc&);

 bool empty() const;

 size_t size() const;

 T& top();

 T const& top() const;

 void push(T const&);

 void push(T&&);

 void pop();

void swap(stack&&);

template<class … Args> void emplace(Args&& … args);

};

这里的问题是empty()size()的结果不可靠。它们被调用的时候是正确的,一旦它们返回,在线程使用它们返回的信息之前,pop()或者push()可能对stack进行了修改。

stack不共享时,下面的代码是正确的:


stack<int > s;

if(!s.empty()){

 int const value=s.top();

 s.pop();

 do_something(value);

}

这是单线程安全的,对一个空栈调用top()是未定义行为。对一个共享栈对象,上面的代码不再安全。在empty()top()之间,可能有其他线程调用pop()删除了最后一个元素。这是竞争条件,但是mutex不能阻止它,这是接口间的竞争。

解决方案是改变接口。

top()pop()之间也有竞争条件:

111.png

假设栈开始时有两个元素。某个元素被读出了两遍,然后两个元素都被删除了。有一个元素无声无息地消失了。

Tom Cargill指出,如果stack的拷贝构造函数能抛出异常,那么组合调用会出问题。Herb Sutter用异常安全解决了。但是潜在的竞争条件带来了新的问题。

考虑stack<vector<int>>vector是一个动态分配的容器,拷贝vector时,必须从栈中分配一些内存。如果内存分配失败,那么vector的拷贝构造函数抛出std::bad_alloc。如果pop()返回值的同时,同时将该值从栈上删除:值的返回是在栈中的值被删除后,但是返回值的拷贝可能抛出异常。std::stack的作者将这两个操作分成了top()pop()。如果不能安全地拷贝数据的话,数据还留在栈中。

但是这样的划分造成了竞争条件。

选择1:传递引用

pop()接收的参数是引用:


std::vector<int> result;

some_stack.pop(result);

缺点是,必须先构造栈元素的实例。对于一些类型来说,构造一个实例很昂贵,需要很多时间或者资源。有些类型的构造函数需要的参数此时可能不可用。它要求元素是可赋值的。有些用户自定义的类型不支持赋值。

选择2:需要一个不抛出异常的拷贝构造函数或者移动构造函数

可以通过std::is_nothrow_copy_constructiblestd::is_nothrow_move_constructible检查移动或拷贝构造函数是否抛出异常。

选择3:返回一个指向出栈项的指针

优点是指针可以自由地拷贝而不会抛出异常。缺点是需要管理指针。std::shared_ptr是一个好的选择。

选择4:选择1加上选择2或者3

如果选择了选择2或3,那么提供选择1也很容易。

选择了选择1和3,没有竞争条件的栈的实现:


#include <exception>

#include <memory>

struct empty_stack: std::exception

{

 const char* what() const throw();

};

template<typename T>

class threadsafe_stack

{

public:

 threadsafe_stack();

 threadsafe_stack(const threadsafe_stack&);

 threadsafe_stack& operator=(const threadsafe_stack&) = delete;

 void push(T new_value);

 std::shared_ptr<T> pop();

 void pop(T& value);

 bool empty() const;

};

即使调用empty()后栈被修改,程序依然正常。如果栈为空,pop()函数抛出empty_stack异常。


#include <exception>

#include <stack>

#include <mutex>

#include <memory>

struct empty_stack: std::exception

{

 const char* what() const throw()

 {

 return "empty stack";

 }

};

template<typename T>

class threadsafe_stack

{

private:

 std::stack<T> data;

 mutable std::mutex m;

public:

 threadsafe_stack(){}

 threadsafe_stack(const threadsafe_stack& other)

 {

 std::lock_guard<std::mutex> lock(other.m);

 data=other.data;

 }

 threadsafe_stack& operator=(const threadsafe_stack&) = delete;

 void push(T new_value)

 {

 std::lock_guard<std::mutex> lock(m);

 data.push(new_value);

 }

 std::shared_ptr<T> pop()

 {

 std::lock_guard<std::mutex> lock(m);

 if(data.empty()) throw empty_stack();

 std::shared_ptr<T> const res(std::make_shared<T>(data.top()));

 data.pop();

 return res;

 }

 void pop(T& value)

 {

 std::lock_guard<std::mutex> lock(m);

 if(data.empty()) throw empty_stack();

 value=data.top();

 data.pop();

 }

 bool empty() const

 {

 std::lock_guard<std::mutex> lock(m);

 return data.empty();

 }

};

在拷贝构造函数的函数体而不是在初始化列表中进行拷贝,是为了确保在拷贝的过程中持有mutex

std::lock函数可以一次锁定两个或多个mutex


#include <mutex>

class some_big_object

{};

void swap(some_big_object& lhs,some_big_object& rhs)

{}

class X

{

private:

 some_big_object some_detail;

 mutable std::mutex m;

public:

 X(some_big_object const& sd):some_detail(sd){}

 friend void swap(X& lhs, X& rhs)

 {

 if(&lhs==&rhs)

 return;

 std::lock(lhs.m,rhs.m);

 std::lock_guard<std::mutex> lock_a(lhs.m,std::adopt_lock);

 std::lock_guard<std::mutex> lock_b(rhs.m,std::adopt_lock);

 swap(lhs.some_detail,rhs.some_detail);

 }

};

首先确保传入的是两个不同的实例,因为在std::mutex上尝试获取已经被占有锁是未定义行为。(允许一个线程多次获取锁的是std::recursive_mutex。)std::adopt_lock参数表明mutex·已经锁定,std::lock_guard实例应该接收mutex已经存在的锁的所有权,而不是在构造函数中锁定mutex

std::lock要么两个mutex都锁定。如果第一个mutex锁定成功,锁定第二个mutex抛出异常,那么会释放第一个mutex的锁。

std::scoped_lock<>的功能类似于std::lock_guard<>,但它是一个可变模板,可以接收多个mutex实例作为参数。它在构造函数中用std::lock一样的算法加锁,在析构函数中释放锁。上面的swap函数可以重写为:


void swap(X& lhs, X& rhs){

 if(&lhs==&rhs)

 return;

 std::scoped_lock guard(lhs.m, rhs.m);

 swap(lhs.some_detail, rhs.some_detail);

}

两个线程,互相等待对方也会发生死锁。

避免死锁的原则:如果线程A被线程B等待,那么线程A就不要等待任何线程。

避免死锁的原则:

1.避免嵌套锁

如果已经持有锁,那么不要请求锁。如果需要请求多个锁,使用std::lock一次性获取。

2.持有锁时,避免调用用户提供的代码

因为用户提供的代码,你不知道它将干什么。

3.以固定的顺序请求锁

如果需要获取多个锁,并且不能像std::lock那样一次获取,那么可以在每个线程按相同的顺序获取锁。

4.使用层次锁


#include <mutex>

class hierarchical_mutex

{

public:

 explicit hierarchical_mutex(unsigned level)

 {}

 void lock()

 {}

 void unlock()

 {}

};

hierarchical_mutex high_level_mutex(10000);

hierarchical_mutex low_level_mutex(5000);

int do_low_level_stuff()

{

 return 42;

}

int low_level_func()

{

 std::lock_guard<hierarchical_mutex> lk(low_level_mutex);

 return do_low_level_stuff();

}

void high_level_stuff(int some_param)

{}

void high_level_func()

{

 std::lock_guard<hierarchical_mutex> lk(high_level_mutex);

 high_level_stuff(low_level_func());

}

void thread_a()

{

 high_level_func();

}

hierarchical_mutex other_mutex(100);

void do_other_stuff()

{}

void other_stuff()

{

 high_level_func();

 do_other_stuff();

}

void thread_b()

{

 std::lock_guard<hierarchical_mutex> lk(other_mutex);

 other_stuff();

}

每次请求的锁的值必须比当前值要小。


#include <mutex>

#include <stdexcept>

#include <climits>

class hierarchical_mutex

{

 std::mutex internal_mutex;

 unsigned long const hierarchy_value;

 unsigned long previous_hierarchy_value;

 static thread_local unsigned long this_thread_hierarchy_value;

 void check_for_hierarchy_violation()

 {

 if(this_thread_hierarchy_value <= hierarchy_value)

 {

 throw std::logic_error("mutex hierarchy violated");

 }

 }

 void update_hierarchy_value()

 {

 previous_hierarchy_value=this_thread_hierarchy_value;

 this_thread_hierarchy_value=hierarchy_value;

 }

public:

 explicit hierarchical_mutex(unsigned long value):

 hierarchy_value(value),

 previous_hierarchy_value(0)

 {}

 void lock()

 {

 check_for_hierarchy_violation();

 internal_mutex.lock();

 update_hierarchy_value();

 }

 void unlock()

 {

 this_thread_hierarchy_value=previous_hierarchy_value;

 internal_mutex.unlock();

 }

 bool try_lock()

 {

 check_for_hierarchy_violation();

 if(!internal_mutex.try_lock())

 return false;

 update_hierarchy_value();

 return true;

 }

};

thread_local unsigned long

hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);

死锁不单单是由锁引起的,任何导致等待循环的同步结构都会引起死锁。持有锁等待另一个线程是危险的,因为该线程可能需要获取你手上的锁。如果等待另一个线程的完成,那么识别线程层次是必要的。一个线程只等待低层次的线程。一种简单的办法是在启动线程的函数中连接线程。

std::unique_lock不总是持有mutex的所有权。可以传递std::adopt_lockstd::defer_lock等作为参数。std::defer_lock表示构造时不加锁。锁可以后面通过调用std::unique_lock对象的lock()成员,或者将std::unique_lock对象传给std::lock()std::unique_lockstd::lock_guard需要更多的空间,并且稍慢一点。std::unique_lock需要记录所有权信息,需要更新所有权信息。


#include <mutex>

class some_big_object

{};

void swap(some_big_object& lhs,some_big_object& rhs)

{}

class X

{

private:

 some_big_object some_detail;

 mutable std::mutex m;

public:

 X(some_big_object const& sd):some_detail(sd){}

 friend void swap(X& lhs, X& rhs)

 {

 if(&lhs==&rhs)

 return;

 std::unique_lock<std::mutex> lock_a(lhs.m,std::defer_lock);

 std::unique_lock<std::mutex> lock_b(rhs.m,std::defer_lock);

 std::lock(lock_a,lock_b);

 swap(lhs.some_detail,rhs.some_detail);

 }

};

owns_lock()获取所有权的标志。

所有权标志存储在std::unique_lock,所以std::unique_lock对象比std::lock_guard对象要大。性能也稍差,因为要更新和检查标志。std::unique_lock通常用于需要延迟加锁,或者需要转移所有权的地方。

一个应用是在函数中锁定一个mutex然后将所有权转移给调用者,调用者在这个锁的保护下可以进行额外的操作:


std::unique_lock<std::mutex> get_lock(){

 extern std::mutex some_mutex;

 std::unique_lock<std::mutex> lk(some_mutex);

 prepare_data();

 return lk;

}

void process_data(){

 std::unique_lock<std::mutex> lk(get_lock());

 do_something();

}

std::unique_lock中的锁可以在不需要的时候显式释放。


void get_and_process_data(){

 std::unique_lock<std::mutex> my_lock(the_mutex);

 some_class data_to_process=get_next_data_chunk();

 my_lock.unlock();

 result_type result=process(data_to_process);

 my_lock.lock();

 write_result(data_to_process, result);

}

持有锁时不要进行消耗时间长的操作。


#include <mutex>

class Y

{

private:

 int some_detail;

 mutable std::mutex m;

 int get_detail() const

 {

 std::lock_guard<std::mutex> lock_a(m);

 return some_detail;

 }

public:

 Y(int sd):some_detail(sd){}

 friend bool operator==(Y const& lhs, Y const& rhs)

 {

 if(&lhs==&rhs)

 return true;

 int const lhs_value=lhs.get_detail();

 int const rhs_value=rhs.get_detail();

 return lhs_value==rhs_value;

 }

};

比较操作获取比较值的时候加锁。因为都是拷贝,所以比较时不需要加锁。但是在比较前两个值的原始值可能发生了改变,在两次读原始值的时候,原始值可能发生了交换,所以改变了比较的含义。如果在一个操作的整个过程中没有持有所需的锁,那么会造成条件竞争。

3.3 保护共享数据的可选设施

有些资源构造很昂贵,所以延迟初始化时先检查是否初始化了:


std::shared_ptr<some_resource> resource_ptr;

void foo(){

 if(!resource_ptr){

 resource_ptr.reset(new some_resource);

}

resource_ptr->do_something();

}

延迟初始化部分需要保护:


std::shared_ptr<some_resource> resource_ptr;

std::mutex resource_mutex;

void foo(){

 std::unique_lock<std::mutex> lk(resource_mutex);

if(!resource_ptr){

 resource_ptr.reset(new some_resource);

}

lk.unlock();

resource_ptr->do_something();

}

上面的方法足以。但是很多人尝试臭名昭著的两次检测。先检测指针,然后请求锁,然后再检测指针,以防在第一次检测指针和请求锁之间,其他线程作了初始化:


void undefined_behaviour_with_double_checked_locking(){

 if(!resource_ptr){

 std::lock_gurad<std::mutex> lk(resource_mutex);

 if(!resource_ptr){

 resource_ptr.reset(new some_resource);

}

}

resource_ptr->do_something();

}

两次检测存在竞争条件,第一次检测读指针的值和第二次检测后写指针的值是不同步的。竞争条件不仅包括指针本身,还包括指针所指的对象。一个线程可能看见了另一个线程在写指针,但是它可能没有看见新创建的some_resource实例,导致do_something()在一个错误的值上操作(即有一个线程在创建some_resource实例,但是还没有写到指针,所以另一个线程通过了两次指针检测,也创建了some_resource实例)。这是数据竞争,会产生不确定的行为。

C++提供std::onece_flagstd::call_once来解决这种情况。每个线程都可以使用std::call_once,指针会安全地被某个线程初始化。需要同步的数据保存在std::once_flag,一个std::once_flag对应一个初始化。使用std::call_once的负担比使用mutex小,特别是当初始化已经完成的情形。所以应该优先使用。下面是用std::call_once重写上面的例子:


std::shared_ptr<some_resource> resource_ptr;

std::once_flag resource_flag;

void init_resource(){

 resource_ptr.reset(new some_resource);

}

void foo(){

 std::call_once(resource_flag, init_resource);

 resource_ptr->do_something();

}

std::call_once()很容易用于类成员的延迟初始化:


class X

{

private:

 connection_info connection_details;

 connection_handle connection;

 std::once_flag connection_init_flag;

 void open_connection()

 {

 connection=connection_manager.open(connection_details);

 }

public:

 X(connection_info const& connection_details_):

 connection_details(connection_details_)

 {}

 void send_data(data_packet const& data)

 {

 std::call_once(connection_init_flag,&X::open_connection,this);

 connection.send_data(data);

 }

 data_packet receive_data()

 {

 std::call_once(connection_init_flag,&X::open_connection,this);

 return connection.receive_data();

 }

};

std::once_flag不可复制或移动,所以你必须自定义一些特殊成员函数。

静态局部变量存在竞争条件。在C++11中,静态局部变量的初始化只在一个线程中进行,竞争条件是在哪个线程中进行。


class my_class;

my_class& get_my_class_instance(){

 static my_class instance;

 return instance;

}

多线程可以安全地调用get_my_class_instance(),不用担心初始化的竞争条件。

对于偶尔进行更新的数据,c++提供了std::shared_mutexstd::shared_timed_mutexstd::shared_mutex性能更好。

需要独占访问的使用std::lock_guard<std::shared_mutex>或者std::unique_lock<std::shared_mutex>,需要共享访问可以使用std::shared_lock<std::shared_mutex>。如果线程持有一个共享锁,那么试图获得排他锁的线程被阻塞,直到所有线程释放了他们的锁;线程持有排他锁,那么没有其他线程可以获得共享锁或者排他锁,直到线程释放了它的锁:


#include <map>

#include <string>

#include <mutex>

#include <shared_mutex>

class dns_entry

{};

class dns_cache

{

 std::map<std::string,dns_entry> entries;

 std::shared_mutex entry_mutex;

public:

 dns_entry find_entry(std::string const& domain)

 {

 std::shared_lock<std::shared_mutex> lk(entry_mutex);

 std::map<std::string,dns_entry>::const_iterator const it=

 entries.find(domain);

 return (it==entries.end())?dns_entry():it->second;

 }

 void update_or_add_entry(std::string const& domain,

 dns_entry const& dns_details)

 {

 std::lock_guard<std::shared_mutex> lk(entry_mutex);

 entries[domain]=dns_details;

 }

};

std::recursive_mutex可以重复加锁。但是当你需要递归锁的时候,就需要考虑你的设计了。递归锁的普遍用法是,一个类设计为允许多个线程并发访问。每个公共成员函数加锁mutex,干活,然后释放锁。有时一个公共成员还是会调用另一个公共成员函数。此时就需要递归锁。

但是这种设计不好。类的不变性被打破。更好的方法是提取一个新的有锁的私有成员函数,然后被无锁的公共成员函数调用。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,635评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,628评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,971评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,986评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,006评论 6 394
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,784评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,475评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,364评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,860评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,008评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,152评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,829评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,490评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,035评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,156评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,428评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,127评论 2 356

推荐阅读更多精彩内容

  • 参考cplusplus参考cppreference 1.mutex 用于保护临界区(critical sectio...
    王侦阅读 4,276评论 0 0
  • 最近是恰好写了一些c++11多线程有关的东西,就写一下笔记留着以后自己忘记回来看吧,也不是专门写给读者看的,我就想...
    编程小世界阅读 2,508评论 1 2
  • 不讲语言特性,只从工程角度出发,个人觉得C++标准委员会在C++11中对多线程库的引入是有史以来做得最人道的一件事...
    stidio阅读 13,303评论 0 11
  • <condition_variable > 头文件主要包含了与条件变量相关的类和函数。相关的类包括 std::co...
    张霸天阅读 3,775评论 1 0
  • 从2018年3月16日开始到现在为止,我画画的状态,我觉得走过了三个阶段。第一个阶段就是乱画,然后慢慢有了一些形状...
    田馨兰阅读 569评论 0 0