为什么要线程同步
- 我们在使用多线程的时候,可能会遇到多个线程同时访问同一个数据导致数据错乱和数据不安全的问题,所以就需要使用线程同步
常用的线程同步方法
- 常用线程同步的方法就是加锁,以保证同一时间只有一个线程在访问该数据
锁的分类
- 自旋锁 等待锁的线程会处于忙等(busy-wait)状态,一直占用着CPU资源,通常在下面情况下使用
- 预计线程等待锁的时间很短
- 加锁的代码(临界区)经常被调用,但竞争情况很少发生
- CPU资源不紧张
- 多核处理器
- 互斥锁 等待锁的线程会处于休眠状态
- 预计线程等待锁的时间较长
- 临界区代码复杂或者循环量大
- 临界区有IO操作
- 单核处理器
- 临界区竞争非常激烈
- 常用的锁
-
OSSpinLock(自旋锁),已经弃用了
会出现优先级翻转的情况.比如线程1优先级比较高,线程2优先级比较低,然后在某一时刻是线程2先获取到锁,所以先是线程2加锁,这时候,线程1就在while(目标锁还未释放),这个状态,但因为线程1优先级比较高,所以系统分配的时间比较多,有可能会没有分配时间给线程2执行后续的操作(需要做的任务和解锁)了,这时候就会造成死锁。
但如果是线程休眠的情况,在优先级高的线程休眠后,优先级比较低的线程会给系统调用,所以不会有死锁的情况 os_unfair_lock被用来取代OSSpinLock,并且从iOS 10开始支持os_unfair_lock。等待锁的线程会处于休眠状态(不同于OSSpinLock的忙等状态),不会占用CPU资源。因此,使用os_unfair_lock不会导致优先级反转的问题。
pthread_mutex
mutex
叫做”互斥锁”,等待锁的线程会处于休眠状态
pthread_mutex – 递归锁
pthread_mutex – 条件NSLock、NSRecursiveLock
NSLock是对mutex普通锁的封装
NSRecursiveLock也是对mutex递归锁的封装,API跟NSLock基本一致NSCondition
NSCondition是对mutex和cond的封装NSConditionLock
NSConditionLock是对NSCondition的进一步封装,可以设置具体的条件值dispatch_semaphore
semaphore叫做”信号量”
信号量的初始值,可以用来控制线程并发访问的最大数量
信号量的初始值为1,代表同时只允许1条线程访问资源,保证线程同步@synchronized
@synchronized是对mutex递归锁的封装
@synchronized(obj)内部会生成obj对应的递归锁,然后进行加锁、解锁操作
-
-
各种锁的性能对比
@synchronized
- @synchronized的研究,我们可以从2个方向入手
-
查看汇编
-
clang
-
- 可以发现@synchronized的底层实现是通过
objc_sync_enter
和objc_sync_exit
,添加符号断点发现objc_sync_enter
是在libobjc.dylib
中实现的
objc_sync_enter
- objc_sync_enter的代码实现
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
ASSERT(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
- 判断obj是否为空, // @synchronized(nil) does nothing从注释看到obj为空的时候没有做任何事情,如果不为空,通过
id2data(obj, ACQUIRE)
获取到一个SyncData
类型的data然后调用data属性的lock - SyncData的结构
typedef struct alignas(CacheLineSize) SyncData {
struct SyncData* nextData;
DisguisedPtr<objc_object> object;
int32_t threadCount; // number of THREADS using this block
recursive_mutex_t mutex;
} SyncData;
-
id2data
的实现
static SyncData* id2data(id object, enum usage why)
{
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;
#if SUPPORT_DIRECT_THREAD_KEYS
// Check per-thread single-entry fast cache for matching object
bool fastCacheOccupied = NO;
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
if (data) {
fastCacheOccupied = YES;
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
switch(why) {
case ACQUIRE: {
lockCount++;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
break;
}
case RELEASE:
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
#endif
// Check per-thread cache of already-owned locks for matching object
SyncCache *cache = fetch_cache(NO);
if (cache) {
unsigned int i;
for (i = 0; i < cache->used; i++) {
SyncCacheItem *item = &cache->list[i];
if (item->data->object != object) continue;
// Found a match.
result = item->data;
if (result->threadCount <= 0 || item->lockCount <= 0) {
_objc_fatal("id2data cache is buggy");
}
switch(why) {
case ACQUIRE:
item->lockCount++;
break;
case RELEASE:
item->lockCount--;
if (item->lockCount == 0) {
// remove from per-thread cache
cache->list[i] = cache->list[--cache->used];
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
// Thread cache didn't find anything.
// Walk in-use list looking for matching object
// Spinlock prevents multiple threads from creating multiple
// locks for the same new object.
// We could keep the nodes in some hash table if we find that there are
// more than 20 or so distinct locks active, but we don't do that now.
lockp->lock();
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
result = p;
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;
}
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}
// no SyncData currently associated with object
if ( (why == RELEASE) || (why == CHECK) )
goto done;
// an unused one was found, use it
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}
// Allocate a new SyncData and add to list.
// XXX allocating memory with a global lock held is bad practice,
// might be worth releasing the lock, allocating, and searching again.
// But since we never free these guys we won't be stuck in allocation very often.
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
done:
lockp->unlock();
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
} else
#endif
{
// Save in thread cache
if (!cache) cache = fetch_cache(YES);
cache->list[cache->used].data = result;
cache->list[cache->used].lockCount = 1;
cache->used++;
}
}
return result;
}
- 通过代码调试发现第一次进来的时候会执行下面流程
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
}
return result;
- 第二次之后进来会执行下面流程
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
result = p;
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;// 从这里跳转到done
}
}
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
} else
#endif
return result;
objc_sync_exit
-
objc_sync_exit
和objc_sync_enter
差不多,只是在id2data传递的参数不一样,所以在id2data里面的执行流程也不一样
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
-
objc_sync_exit
在id2data的执行流程
bool fastCacheOccupied = NO;
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
if (data) {
fastCacheOccupied = YES;
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
switch(why) {
case ACQUIRE: {
lockCount++;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
break;
}
case RELEASE:
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
- 个人理解就是
@synchronized
会通过全局哈希表**listp根据不同的对象存储SyncData,第二次进来之后就会返回对应的SyncData,再对threadCount+1,因为@synchronized
实现的是一个同步锁,所以任务执行完毕之后会通过tls_get_direct获取SyncData的lockCount进行--,当结果为0的时候就通过tls_set_direct将该key设置为NULL回收控件
NSLock、NSRecursiveLock
- NSLock的底层其实就是对pthread_ mutex的封装,通过lldb调试发现NSLock是基于Foundation实现的,而Foundation是不开源的,我们可以通过查看swift的实现
// 初始化方法
public override init() {
#if os(Windows)
InitializeSRWLock(mutex)
InitializeConditionVariable(timeoutCond)
InitializeSRWLock(timeoutMutex)
#else
pthread_mutex_init(mutex, nil)
#if os(macOS) || os(iOS)
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
}
open func lock() {
#if os(Windows)
AcquireSRWLockExclusive(mutex)
#else
pthread_mutex_lock(mutex)
#endif
}
open func unlock() {
#if os(Windows)
ReleaseSRWLockExclusive(mutex)
AcquireSRWLockExclusive(timeoutMutex)
WakeAllConditionVariable(timeoutCond)
ReleaseSRWLockExclusive(timeoutMutex)
#else
pthread_mutex_unlock(mutex)
#if os(macOS) || os(iOS)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
#endif
#endif
}
- NSRecursiveLock和NSLock差不多,只不过是在基础锁的基础上加了递归属性
public override init() {
super.init()
#if os(Windows)
InitializeCriticalSection(mutex)
InitializeConditionVariable(timeoutCond)
InitializeSRWLock(timeoutMutex)
#else
#if CYGWIN
var attrib : pthread_mutexattr_t? = nil
#else
var attrib = pthread_mutexattr_t()
#endif
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
#if os(macOS) || os(iOS)
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
#endif
#endif
}
NSConditionLock、NSCondition
- NSCondition
public override init() {
#if os(Windows)
InitializeSRWLock(mutex)
InitializeConditionVariable(cond)
#else
pthread_mutex_init(mutex, nil)
pthread_cond_init(cond, nil)
#endif
}
- NSConditionLock的初始化
public convenience override init() {
self.init(condition: 0)
}
public init(condition: Int) {
_value = condition
}
- NSConditionLock不同于NSLock的是它的lock方法进行了处理
open func lock() {
let _ = lock(before: Date.distantFuture)
}
open func lock(whenCondition condition: Int) {
let _ = lock(whenCondition: condition, before: Date.distantFuture)
}
open func lock(whenCondition condition: Int, before limit: Date) -> Bool {
_cond.lock()
while _thread != nil || _value != condition {
if !_cond.wait(until: limit) {
_cond.unlock()
return false
}
}
#if os(Windows)
_thread = GetCurrentThread()
#else
_thread = pthread_self()
#endif
_cond.unlock()
return true
}
- NSConditionLock调用lock的时候会判断下条件,如果条件不成立就会一直等待,直到date到了