相关总结如下:
同步原语(Synchronization Primitive)
在同步原语中,最重要的就是互斥器和条件变量两者,对它们需要重点掌握。
互斥器(Mutex)
只使用非递归的互斥量
无论是递归(可重入)还是非递归(不可重入)的Mutex,当程序出现线程错误的时候,都是因为设计不合理不严谨所导致,与Mutex的类型无关。非递归的Mutex可以让我们的debug变得相对轻松一些。从inventory和request中得到的启发:
造成死锁,往往是因为不同函数需要加多个锁,而他们各自的加锁顺序又不一致。比如inventory执行print_all时,先对自己加锁,再对每个request加锁;而request在析构的时候,先对自己加锁,然后再对inventory加锁(因为要在inventory中删除自己对应的指针数据)。所以死锁就出现了。各类互斥量(锁)的区别和应用场景
博客[1]总结的相当到位。
条件变量(Condition Variable)
条件变量必须在mutex的保护之下使用;
signal和broadcast的区别:
signal更着重于表明资源可用;broadcast着重于表明状态变化。spurious wakeup
即虚假唤醒。针对虚假唤醒,接收方(等待方)一定要使用while而非if来判断布尔值。引用一段另外一篇博客看到的话如下。
其实说白了很简单,就是
pthread_cond_signal()也可能唤醒多个线程,而如果你同时只允许一个线程访问的话,就必须要使用while来进行条件判断,以保证临界区内只有一个线程在处理[2]。
// 示例代码,基于muduo网络库实现
// 一个简单的BlockingQueue
muduo::MutexLock mutex_;
muduo::condition cond(mutex_);
std::queue<int> Q; // BlockingQueue
int dequeue(){
// out of multiple threads waiting here,
// only one thread could be waked up;
muduo::MutexLockGuard lock_(mutex_); // RAII mode
while (Q.empty()){
cond.wait(); // unlock the mutex automatically;
}
assert(!Q.empty());
int ret = Q.front();
Q.pop();
return ret;
}
void enqueue(int num){
muduo::MutexLockGuard lock_(mutex_); // RAII mode
Q.push(num);
cond.notify(); // resoure is available now;
// 思考,如果改成只在Q.size()从0到1的时候去notify,会造成什么问题呢?
// 答:假设这样一种情况,Q现在是空的,且有多个dequeue线程在等待,直到其非空;
// 此时一个新的线程进行了enqueue,但因为只进行了1次notify,故只有一个dequeue线程被唤醒;
// 因此其他的线程则必须一直阻塞,直至下一次0->1的情况出现;
// 程序的效率大大折扣。
}
- CountDownLatch
条件变量是很底层的原语,真正使用的时候,应该对其进行封装,倒计时(CountDownLatch)就是一个很好的规范,其模拟了一种一对多的模式。使用wait进行阻塞等待(多),使用countDown进行更新(一),如若必要,通知其他所有线程。
class CountDownLatch{
public:
explicit CountDownLatch (const int &count); // constructor
void wait_();
void countDown();
private:
mutable MutexLock mutex_;
condition cond_;
int cnt;
};
explicit CountDownLatch::CountDownLatch(const int &count)
: mutex_(), cond_(mutex_), cnt(count){ // keep in order!
}
void CountDownLatch::wait_(){
MutexLockGuard lock_(mutex_);
while(cnt > 0){
cond_.wait();
}
}
void CountDownLatch::countDown(){
MutexLockGuard lock_(mutex_);
if(--cnt == 0){
cond_.notifyAll(); // why notifyAll() instead of notify() ?
}
}
- 思考:为什么在
BlockingQueue中使用notify()(也就是signal),而在CountDownLatch中使用notifyAll()(也就是broadcast)呢?
答:在BlockingQueue中即使使用了notifyAll(),也只能有一个线程进入临界区进行dequeue操作,其他的等待线程白白被唤醒而又得继续阻塞,浪费资源。CountDownLatch本身就是一对多的模式,当满足条件的时候,生产者必须需要通知多个消费者进行相应活动。因为是倒计时,所以一旦cnt变为0了,那么就必须通知所有线程。
其他
- 线程安全的单例模式(singleton)的实现
直接使用pthread_once()调用即可,保证了实例化最多被执行一次。引用文档如下[3]:
#include <pthread.h>
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine) (void));
The purpose of
pthread_onceis to ensure that a piece of initialization code is executed at most once.
The first timepthread_onceis called with a givenonce_controlargument, it callsinit_routinewith no argument and changes the value of theonce_controlvariable to record that initialization has been performed. Subsequent calls topthread_oncewith the same once_control argument do nothing.
// 一个简单的实现
template <typename T> //基于模板实现
class singleton{
public:
T &instantiate(){
// ensure only one instance is built;
pthread_once(&ponce_, &singleton::new_);
return *value_;
}
private:
singleton();
~singleton();
void new_(){
value = new T ();
}
static pthread_once_t ponce_;
static T *value_;
};
template <typename T>
pthread_once_t singleton<T>::ponce_ = PTHREAD_ONCE_INIT;
template <typename T>
T * singleton<T>::value_ = nullptr;
- 利用shared_ptr控制读写
并发读写的问题就在于,由于读写顺序的不确定性,使得有些新的数据被覆盖掉。使用shared_ptr来管理资源,即为需要访问的资源添加了一层计数器,线程可以很清晰地判断出资源在当前的使用情况,并选择最佳的读写手段:
- 在Read端:线程设置一个新的栈上局部shared_ptr,指向相同资源(即计数器加1);这样就告诉了其他线程,资源在当前正在被访问,不允许直接写入(但可以一起读);
- 在Write端:
a. 此时资源的计数器value为1,说明资源被自己独占,可以写入;
b. 此时资源的计数器value大于1,应该先将其复制(Copy-On-Write),再进行写入。
// 使用shared_ptr管理线程读写的示例
using Foo = int;
using FooList = std::vector<Foo>;
using FooListPtr = std::shared_ptr<FooList>;
muduo::MutexLock mutex_;
FooListPtr g_ptr;
void traverse(){ // 可能有多个线程同时执行travarse(),因为shared_ptr的机制,这没有问题
FooListPtr tmp;
{
MutexLockGuard lock_(mutex_);
tmp = g_ptr; // counter added;
}
for(auto itr = tmp->begin(); itr != tmp->end(); ++itr){
//应该使用tmp而不是g_ptr,因为g_ptr可能被其他线程reset;
// traverse it...
}
}
void post(Foo &item_){ // 同一时刻只能有一个线程执行post(),但可能会有其他线程在traverse()
MutexLockGuard lock_(mutex_); //注意,写操作的mutex临界区永远是整个函数内部
if(!g_ptr.unique()){
g_ptr.reset(new FooList (*g_ptr)); // 重新复制一遍数据,这样保证了不会影响其他正在traverse的线程;
assert(g_ptr.unique());
}
g_ptr->push_back(item_);
}
// 下面是关于post的错误写法以及原因
// 因为traverse的Mutex临界区变短,
// 所以会导致一边push_back(),一边有其他的线程在遍历数据,可能会使得迭代器失效,导致程序出错
void post_wrong_version1(Foo &item_){
MutexLockGuard lock_(mutex_);
g_ptr->push_back(item_); // what is some other threads are travers()ING ?
}
// 多个线程同时post,则会导致数据丢失,因为g_ptr最终只选择一个new_ptr
void post_wrong_version2(Foo &item_){
FooListPtr new_ptr(new FooList (*g_ptr));
new_ptr->push_back(item_); // what if there are multiple threads posting?
// That means there are multiple 'new_ptr' exists;
MutexLockGuard lock_(mutex_);
g_ptr = new_ptr;
}
Reference
-
https://blog.csdn.net/lyx_323/article/details/85048677?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.edu_weight&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.edu_weight ↩
-
https://sourceware.org/pthreads-win32/manual/pthread_once.html ↩