同步
多线程操作访问修改统一资源会产生混乱的情况,使用同步工具来了防止不同的线程意外更改数据。
同步基本使用下面几种方式
- 锁
- 原子操作
- 内存壁垒和易失性变量
原子操作,是一种适用于简单数据类型的同步方法,不会阻塞竞争线程。比锁的性能更高
内存壁垒
编辑器为了优化性能,会重新排序汇编指令。内存屏障是一种非阻塞同步工具,用于确保内存操作以正确的顺序发生。内存屏障的作用类似于围栏,迫使处理器在允许执行位于屏障之后的加载和存储操作之前,完成位于屏障前面的所有加载和存储操作。内存屏障通常用于确保一个线程(但另一线程可见)的内存操作始终按预期顺序发生。在这种情况下缺少内存屏障可能会使其他线程看到看似不可能的结果。(有关示例,请参阅Wikipedia条目中的内存障碍。)要使用内存障碍,只需OSMemoryBarrier在代码中的适当位置调用该函数。
锁
锁的类型
互斥锁
互斥锁是一种信号量,它一次只能授予对一个线程的访问权限。如果正在使用互斥锁,而另一个线程试图获取该互斥锁,则该线程将阻塞,直到该互斥锁被其原始持有者释放为止。如果多个线程竞争同一个互斥锁,则一次只能访问一个。
互斥锁只有两种状态,即上锁( lock )和解锁( unlock ).
pthread_mutex_t 是linux 下的互斥锁
特点
- 原子性:把一个互斥量锁定为一个原子操作,这意味着操作系统(或pthread函数库)保证了如果一个线程锁定了一个互斥量,没有其他线程在同一时间可以成功锁定这个互斥量;
- 唯一性:如果一个线程锁定了一个互斥量,在它解除锁定之前,没有其他线程可以锁定这个互斥量;
- 非繁忙等待:如果一个线程已经锁定了一个互斥量,第二个线程又试图去锁定这个互斥量,则第二个线程将被挂起(不占用任何cpu资源),直到第一个线程解除对这个互斥量的锁定为止,第二个线程则被唤醒并继续执行,同时锁定这个互斥量。
递归锁 Recursive lock NSRecursiveLock
递归锁 是 互斥锁 的一种变体, 递归锁允许单个线程再释放它之前多次获取锁,并将其他线程保持阻塞状态。再pthread_mutex 中可以使用 PTHREAD_MUTEX_RECURSIVE 属性来设置位递归锁。 递归锁主要在递归迭代的情况下使用
自旋锁 Spin lock OSSpinLock os_unfair_lock
自旋锁反复轮询其锁定条件,直到该条件变为true。自旋锁最常用于多处理器系统,其中锁的预期等待时间很小。在这些情况下,轮询通常比阻塞线程更有效,这涉及上下文切换和线程数据结构的更新。自旋锁有一定的隐患,比如当优先级高的线程不断的轮询, 低优先级占有的锁却不能得到cpu执行不能释放锁造成死锁。
读写锁 Read-write lock
读写锁也称为共享独占锁。在正常操作期间,多个读取器可以同时访问数据结构。但是,当线程要写入结构时,它将阻塞,直到所有读取器都释放锁为止,此时,它获取了锁并可以更新结构。当写入线程正在等待锁定时,新的读取器线程将阻塞,直到写入线程完成。
这种类型的锁通常用于较大规模的操作,如果经常读取受保护的数据结构并且仅偶尔进行修改,则可以显着提高性能。系统仅支持使用POSIX线程的读写锁。
使用锁
互斥锁 pthread_mutex_t <pthread/pthread.h> 头文件下
- pthread_mutex_init初始化,
- 使用 pthread_mutex_lock / pthread_mutex_unlock 进行加锁和解锁
#import <pthread/pthread.h>
// 初始化一个互斥锁。
int pthread_mutex_init(pthread_mutex_t *mutex,
const pthread_mutexattr_t *attr);
// 对互斥锁上锁,若互斥锁已经上锁,则调用者一直阻塞,
// 直到互斥锁解锁后再上锁。
int pthread_mutex_lock(pthread_mutex_t *mutex);
// 调用该函数时,若互斥锁未加锁,则上锁,返回 0;
// 若互斥锁已加锁,则函数直接返回失败,即 EBUSY。
int pthread_mutex_trylock(pthread_mutex_t *mutex);
/**
pthread_mutex_timedlock函数与pthread_mutex_lock函数是基本等价的,
但是在达到超时时间时,pthread_mutex_timedlock不会对互斥量进行加锁,而是返回错误码ETIMEOUT
超时指定愿意等待的绝对时间(与相对时间对比而言,指定在时间X之前可以阻塞等待,而不是说愿意阻塞Y秒)。
这个超时时间是用timespec结构来表示,它用秒和纳秒来描述时间。
*/
int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
const struct timespec *restrict abs_timeout);
// 对指定的互斥锁解锁。
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 销毁指定的一个互斥锁。互斥锁在使用完毕后,
// 必须要对互斥锁进行销毁,以释放资源。
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// 一个例子 正确的递增
self.queue = dispatch_queue_create("ceshi", DISPATCH_QUEUE_CONCURRENT);
int s = pthread_mutex_init(&_mutex, NULL);
for( int j = 0; j < 100; j++){
dispatch_async(self.queue, ^{
for (int i = 1; i < 1000; i ++) {
dispatch_async(self.queue, ^{
pthread_mutex_lock(&self->_mutex);
self.num = self.num + 1;
printf("\n 现在 %d", self.num);
pthread_mutex_unlock(&self->_mutex);
usleep(10);
});
}
});
}
NSLock
NSLock是对 mutex 互斥锁的封装。 所以不适合递归加锁。 同一个线程多次访问也会堵塞
NSLock再swift Foundation 的init源码
public override init() {
// 初始化一个 mutex 锁
pthread_mutex_init(mutex, nil)
// 初始条件变量
pthread_cond_init(timeoutCond, nil)
// 初始一个 timeout 相关的锁
pthread_mutex_init(timeoutMutex, nil)
}
NSLock的lock函数就是直接执行 pthread_mutex_lock
open func lock() {
pthread_mutex_lock(mutex)
}
NSLock 的unLock 方法 会执行 pthread_mutex_unlock(mutex)。另外 根据date方法,还回去 锁上超时相关的锁,然后广播超时条件,这样再 lock(before limit: Date) 方法中的 pthread_cond_timedwait 方法可以获取到变化通知,再次去加锁 mutex
open func unlock() {
pthread_mutex_unlock(mutex)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
}
NSLock的 lock(before limit: Date) 函数 再时间之前 阻塞等待的访问锁。
- 先执行 pthread_mutex_trylock,可以访问就返回true,
- 已经被加锁,就执行 timedLock。
timeout的方法 - 对 timeoutMutex 加锁
- pthread_cond_timedwait 方法对timeoutMutex 解锁,然后等待 timeoutCond 的通知,以及设置超时时间, 进行等待
- pthread_cond_timedwait 收到消息/超时 之后, 对timeoutMutex加锁,执行操作,
- pthread_mutex_unlock 方法对timeoutMutex 加锁,然后去尝试 对 mutex 加锁。成功返回true,
- 加锁失败,继续判断是否超时,没超时就再次执行上面的所及,超时返回false
// try 方法
open func lock(before limit: Date) -> Bool {
if pthread_mutex_trylock(mutex) == 0 {
return true
}
return timedLock(mutex: mutex, endTime: limit, using: timeoutCond, with: timeoutMutex)
}
//
private func timedLock(mutex: _MutexPointer, endTime: Date,
using timeoutCond: _ConditionVariablePointer,
with timeoutMutex: _MutexPointer) -> Bool {
// 计算秒数
var timeSpec = timeSpecFrom(date: endTime)
while var ts = timeSpec { // 当timespec 不为0 循环
// 对timeoutMutex 加锁,对 进行同步操作
let lockval = pthread_mutex_lock(timeoutMutex) // 加锁
precondition(lockval == 0)
// 解锁 -> 等待ts时间 -> (等到消息/时间结束) 加锁 ->
let waitval = pthread_cond_timedwait(timeoutCond, timeoutMutex, &ts)
precondition(waitval == 0 || waitval == ETIMEDOUT)
let unlockval = pthread_mutex_unlock(timeoutMutex) // 解锁
precondition(unlockval == 0)
if waitval == ETIMEDOUT { // 超时了就失败返回false
return false
}
// 尝试加锁 -> 多线程 可能不能反问
let tryval = pthread_mutex_trylock(mutex)
precondition(tryval == 0 || tryval == EBUSY)
if tryval == 0 { // The lock was obtained. 加锁成功 返回false
return true
}
// pthread_cond_timedwait didn't timeout so wait some more.
// 仍然没有超时,就重新计算秒数,等待
timeSpec = timeSpecFrom(date: endTime)
}
return false // 超时返回false
}
cond_wait 可以使用 pthread_cond_signal / pthread_cond_broadcast 唤醒。区别
1。pthread_cond_signal 一次只能唤醒一个线程
- pthread_cond_broadcast 一次唤醒所有睡眠的线程
pthread_cond_wait 的使用示例
// 创建 mutex 和 cond
pthread_cond_init(&_cond, NULL);
pthread_mutex_init(&_mutex, NULL);
pthread_mutex_init(&_waitMutex, NULL);
dispatch_async(queue, ^{
pthread_mutex_lock(&_waitMutex);
pthread_cond_wait(&_cond, &_waitMutex);
printf("\n 在 num == 10 之后执行 %d", self.num);
pthread_mutex_unlock(&_waitMutex);
});
for (int i = 0; i < 100; i ++) {
dispatch_async(queue, ^{
for (int j =0; j< 100; j++){
dispatch_async(queue, ^{
pthread_mutex_lock(&_mutex);
self.num = self.num + 1;
printf("\n %d", self.num);
if(self.num == 10){
self.q = 1;
// 对cond的操作进行加锁
pthread_mutex_lock(&_waitMutex);
pthread_cond_signal(&_cond);
pthread_mutex_unlock(&_waitMutex);
}
usleep(200000);
pthread_mutex_unlock(&_mutex);
});
}
});
}
NSRecursiveLock
实际上定义的是一个递归锁,NSRecursiveLock 这个锁可以被同一线程多次请求,而不会引起死锁。这主要是用在循环或递归操作中。
在swift Fundatuon 的源码中,定义和NSLock 很像,添加了 PTHREAD_MUTEX_RECURSIVE 属性。Cygwin是一个在windows平台上运行的类UNIX模拟环境
public override init() {
super.init()
// 和NSLock 相比,创建mutex 时, 添加了attr PTHREAD_MUTEX_RECURSIVE
var attrib = pthread_mutexattr_t()
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
}
PTHREAD_MUTEX_RECURSIVE 互斥锁的一个attr。表示在同一个线程中可以多次获取同一把锁。并且不会死锁。
int ret;
if(( ret = pthread_mutexattr_init(&attr)) != 0){
fprintf(stderr, "create mutex attribute error. msg:%s", strerror(ret));
exit(1);
}
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(&mutex, &attr);
NSCondition
NSCondition 就是对 pthread_mutex和 pthread_cond_t 的封装。
- init 对 mutex 和cond 的 init
- lock/unlock 对mutex 的lock 和unlock
- wait 对 cond 和mutex 的 pthread_cond_wait / pthread_cond_timedwait
- signal / broadcast pthread_cond_signal / pthread_cond_broadcast
NSConditionLock 对 NSCondition 的封装
使用NSConditionLock对象,可以确保线程只有在满足特定条件时才能获取锁。一旦获得了锁并执行了代码的关键部分,线程就可以放弃锁并将相关的条件设置为新的。条件本身是任意的:您可以根据应用程序的需要定义它们。
lock 方法
- 进入先对cond 加锁, 成功继续执行,不成功阻塞等待(有其他的线程已经加锁)
- 判断 _thread 是否为nil。如果是nil(初始时nil), 说明没有线程在操作 NSConditionLock 的lock 和unlock之间的步骤。去设置 _thread, 并且对 cond unlock
- 如果 NSConditionLock 执行完成 然后调用 unlock, 就会再次设置 _thread 为nil
- 如果 _thread 不为nil,去 wait 等待(解锁,等待) 其他线程的 [NSConditionLock unlock]中的 broadcast
- 如果收到通知唤醒,加锁 -> 再去判断 _thread 然后循环,直到可以执行
- 超时的话,再解锁,退出
open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}
open func lock(before limit: Date) -> Bool {
_cond.lock()
// _thread 会在unlock 时设置为nil。这里是加锁的
while _thread != nil {
if !_cond.wait(until: limit) { // unlock -》 等待condition -》 lock
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
// 区别时是否这是 condition
open func unlock(withCondition condition: Int) {
_cond.lock()
_thread = nil
_value = condition
_cond.broadcast()
_cond.unlock()
}
总结:
- _thread 操作都死在cond的lock 中,线程安全,在 NSConditionLock 的lock 时被赋值,在unlock 时设为nil,
执行流程 - NSConditionLock 在lock 执行完成之后都会解锁,所以在执行 器 lock 和unlock 直接时加锁的。
- wait 时 解锁 -> 挂起等待 , 有通知之后在加锁。(通知激活多线程的话就是 cond.lock 阻塞等待别的 线程 cond.unlock的完成,然后再执行,这是就需要判断 _thread d的情况了)
对于condition的lock
open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
_thread = pthread_self()
_cond.unlock()
return true
}
类似于lock没有condition的方法,只是在判断 condition 之余,多判断了一个条件 condition。
总结:NSConditionLock lock的基本的实现思路: 内部定义一个_thread,然后加锁去操作这个thread,保证线程安全,然后根据 _thread 的状态,判断是否执行中,然后等待,再在unlock 中通知所有人激活,然后再次检查 _thread,然后是否等待循环,直到 超时。