前言
多线程开发是性能优化常用的技术,在多线程开发中,线程安全是绕不开的一个话题。线程安全的定义,在之前的文章中也有介绍。简单来讲,线程安全就是在同一时刻,对同一数据操作的线程只有一个,可以保证程序每次运行,都能得到预期的结果。
1 锁
为保证线程安全,多线程开发中经常会用的技术就是锁。锁是一种非强制机制,每个线程在操作数据或者资源前,先获取(Acquire)锁,并在操作结束后释放(Release)锁。如果锁已经被占用,那么其他线程获取锁时会等待,直到锁被释放。
1.1 锁的分类
在iOS中,锁只有两种:自旋锁和互斥锁。其他锁比如条件所、递归锁、信号量等都是基于这两种锁的封装。
互斥锁
当线程试图获取锁而锁被占用时,线程会进入休眠状态,等待锁释放时被唤醒。互斥锁又可分为可重入锁和不可重入锁。
- 可重入锁:又叫递归锁,同一线程在锁被释放前可以再次获取锁,即可以递归调用。
- 不可重入锁:非递归锁,线程必须要在锁被释放后才能获取锁,不可递归调用。
自旋锁
锁被占用时,线程会反复检查锁是否可用,线程不会休眠,而是处于一种忙等待状态。一旦获取到锁,线程会一直持有,知道显式释放。自旋锁减少的线程上下文的调度,适用于线程阻塞时间很短的场景。
自旋锁和互斥锁的区别在于,在锁被占用时,互斥锁会进入休眠,等待锁释放时被唤醒,而自旋锁则处于忙等状态,因此自选锁的效率更高,但可能会造成安全问题。
2 自旋锁
2.1 OSSpinLock
OSSpinLock是系统提供的一种自旋锁(iOS 10之前)。在iOS 10之前,OSSpinLock是不安全的,可能会造成优先级反转,导致无法执行。例如:优先级较高的A线程在获取自旋锁时,由于锁被占用而处于忙等状态,一直占用CPU时间片,而正在占用锁、优先级较低的B线程由于无法获取到CPU时间片,从而无法执行,不能释放锁。这样,A和B相互等待,形成死锁。
iOS 10之前,OSSpinLock的源码如下:
class spinlock_t {
os_lock_handoff_s mLock;
public:
spinlock_t() : mLock(OS_LOCK_HANDOFF_INIT) { }
void lock() { os_lock_lock(&mLock); }
void unlock() { os_lock_unlock(&mLock); }
bool trylock() { return os_lock_trylock(&mLock); }
...
};
可以看到,在iOS 10之前OSSpinLock的底层实现用的是os_lock_handoff_s
。在iOS 10之后,苹果为了解决OSSpinLock的不安全问题,将底层实现换成了os_unfair_lock
。源码如下:
typedef int32_t OSSpinLock UNAVAILABLE_ATTRIBUTE;
typedef struct os_unfair_lock_s os_unfair_lock UNAVAILABLE_ATTRIBUTE;
...
typedef mutex_t spinlock_t;
#define spinlock_lock(l) mutex_lock(l)
#define spinlock_unlock(l) mutex_unlock(l)
#define SPINLOCK_INITIALIZER MUTEX_INITIALIZER
...
using spinlock_t = mutex_tt<LOCKDEBUG>;
using mutex_t = mutex_tt<LOCKDEBUG>;
using monitor_t = monitor_tt<LOCKDEBUG>;
using recursive_mutex_t = recursive_mutex_tt<LOCKDEBUG>;
...
class mutex_tt : nocopy_t {
os_unfair_lock mLock;
public:
...
};
我们看一下简化后的代码,可以看出,spinlock_t
就是mutex_tt
, 而mutex_tt
是基于os_unfair_lock
实现的,从名称就可以看出来,替换底层实现后,虽然还叫OSSpinLock
,但其实它已经是互斥锁了。关于os_unfair_lock
,我们在讲互斥锁时,再详细探讨。
2.2 atomic原理
OC中,用atomic修饰的属性,在生成setter方法时,最终都会调用reallySetProperty
方法,reallySetProperty
的实现源码如下:
static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
if (offset == 0) {
object_setClass(self, newValue);
return;
}
id oldValue;
id *slot = (id*) ((char*)self + offset);
if (copy) {
newValue = [newValue copyWithZone:nil];
} else if (mutableCopy) {
newValue = [newValue mutableCopyWithZone:nil];
} else {
if (*slot == newValue) return;
newValue = objc_retain(newValue);
}
if (!atomic) {
oldValue = *slot;
*slot = newValue;
} else {
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
oldValue = *slot;
*slot = newValue;
slotlock.unlock();
}
objc_release(oldValue);
}
从代码中可以看出,atomic修饰的属性在赋值时做了spinlock处理,而非原子的属性,除了不加锁之外,其他逻辑与atomic一致。getter方法的处理方式与setter一致:
id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
if (offset == 0) {
return object_getClass(self);
}
// Retain release world
id *slot = (id*) ((char*)self + offset);
if (!atomic) return *slot;
// Atomic retain release world
spinlock_t& slotlock = PropertyLocks[slot];
slotlock.lock();
id value = objc_retain(*slot);
slotlock.unlock();
// for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
return objc_autoreleaseReturnValue(value);
}
这里setter和getter中使用的spinlock就是替换为os_unfair_lock之后的,此时它本质上已经是一个互斥锁。
atomic只能保证setter、getter的方法安全,并不能保证数据安全。
2.3 读写锁
读写锁是一种特殊的自旋锁,它把资源访问者分为读者和写者。读者只能对共享资源进行读操作,写者能对共享资源进行写操作。写者具有排他性,一个读写锁只能同时有一个写者或多个读者。当没有写者时,多个读者可同时访问共享资源,提高性能。
当写者想要获取读写锁时,如果读写锁没有任何写者或者读者,那么该写者可以立刻或得读写锁,否则必须自旋在那里,直到没有任何读者或写者。
当读者想要获取读写锁时,如果当前没有写者,那么它可以立即或得读写锁,否则必须自旋在那里,直到写者释放读写锁。
// 初始化读写锁
let rwlock = UnsafeMutablePointer<pthread_rwlock_t>.allocate(capacity: 1)
pthread_rwlock_init(rwlock, nil)
// 写操作,加锁
pthread_rwlock_wrlock(rwlock)
// 写操作,尝试加锁
pthread_rwlock_trywrlock(rwlock)
// 读操作,加锁
pthread_rwlock_rdlock(rwlock)
// 读操作,尝试加锁
pthread_rwlock_tryrdlock(rwlock)
// 释放锁
pthread_rwlock_unlock(rwlock)
// 销毁锁
pthread_rwlock_destroy(rwlock)
rwlock.deinitialize(count: 1)
rwlock.deallocate()
既然读写锁是自旋锁,那么就会存在自旋锁优先级反转的安全问题。
我们考虑这样一种场景,假设某个火车站有四个窗口A、B、C、D,A和B只负责卖票,C和D只负责查票,为保证数据安全,卖票的时候不能查票,查票的时候也不能卖票。卖票和买票均在票数为0时停止。
很明显,这个场景非常适合读写锁:
private func startSaleTicket() {
ticketCount = 50
let queue1 = DispatchQueue(label: "dispatchSemaphore.queue1", attributes: .concurrent)
let queue2 = DispatchQueue(label: "dispatchSemaphore.queue2", attributes: .concurrent)
queue1.async { [weak self] in
self?.saleTicket()
}
queue2.async { [weak self] in
self?.saleTicket()
}
}
private func saleTicket() {
while true {
// 对票数加锁
pthread_rwlock_wrlock(rwlock)
if (ticketCount > 0) {
print("窗口 --- \(Thread.current), 剩余票数 ---- \(ticketCount)")
ticketCount -= 1
// 模拟耗时操作
Thread.sleep(forTimeInterval: 1)
// 售票完成释放锁
pthread_rwlock_unlock(rwlock)
} else {
print("--- 所有票已售完 --- \(Thread.current)")
break
}
}
pthread_rwlock_unlock(rwlock)
}
private func selectTicket() {
DispatchQueue.global().async {
self.readTicket()
}
DispatchQueue.global().async {
self.readTicket()
}
DispatchQueue.global().async {
self.readTicket()
}
}
private func readTicket() {
while (ticketCount > 0) {
// 加读锁
pthread_rwlock_tryrdlock(rwlock)
print("readTicket --- \(Thread.current), 剩余票数 ---- \(ticketCount)")
// 模拟耗时操作
Thread.sleep(forTimeInterval: 0.5)
// 释放锁
pthread_rwlock_unlock(rwlock)
}
}
运行起来就会发现,查询窗口不停的在查询,而售票窗口则无法售票。这是因为查询线程有多个,占用了读者锁,当售票线程获取写者锁时,因为有读者锁存在,所以售票线程处于自旋等待状态。而查询线程一直在查询,导致读者锁一直被占用,售票线程就一直无法执行。
虽然上述场景在实际应用中比较少,但一旦用到读写锁,还是要考虑安全问题。可以使用dispatch_semaphore
来替代读写锁。
3 互斥锁
互斥锁在锁被占用时,线程会进入休眠,直到锁释放时被唤醒。互斥锁又可以分为可重入锁和不可重入锁,最基础的互斥锁是pthread_mutex
,其他互斥锁如NSLock、NSRecursiveLock、NSConditionLock
等都是基于pthread_mutex
的面向对象封装。我们可以在以开源的swift-corelibs-foundation中查看相关的swift源码。
3.1 pthread_mutex
pthread_mutex
就是互斥锁本身,所有的其他互斥锁都是基于它实现的,pthread_mutex
的定义如下:
typedef __darwin_pthread_mutex_t pthread_mutex_t;
pthread_mutex
使用:
// 声明全局mutex锁
let mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
// 初始化锁
pthread_mutex_init(mutex, nil)
// 加锁
pthread_mutex_lock(mutex)
// 需要做线程安全的操作
//...
// 释放锁
pthread_mutex_unlock(mutex)
// 销毁锁
pthread_mutex_destroy(mutex)
mutex.deinitialize(count: 1)
mutex.deallocate()
那么以上面这种方式初始化的mutex lock能否递归调用,即是否是可重入锁呢?下面我们递归调用试一下:
private func recursiveLockTest(value: Int) {
print("testRecursive ----- 加锁前")
pthread_mutex_lock(mutex)
print("testRecursive ----- 加锁后")
if (value > 0) {
print("testRecursive ----- value ---- \(value)")
recursiveLockTest(value: value - 1)
}
pthread_mutex_unlock(mutex)
}
private func testRecursive() {
DispatchQueue.global().async {
self.recursiveLockTest(value: 10)
}
}
输出结果如下:
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 10
testRecursive ----- 加锁前
可以看到,只打了一次value值,注意,这时候线程不是死锁了,而是阻塞了。第一次加锁时,成功获取到了锁,并打印了value值,当第二次加锁时,由于第一加的锁并未释放,所以线程阻塞。
这说明我们以上面的方式初始化的锁是不可重入锁,那么怎么获取到可重入锁呢?pthread_mutex
是通过属性设置来标记是否是可重入锁的,当属性传nil
时,默认得到的就是不可重入锁。设置可重入锁的方式如下:
var attrb = pthread_mutexattr_t()
withUnsafeMutablePointer(to: &attrb) { (attrs) in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
当初始化为可重入锁后,我们再运行递归代码输出如下:
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 10
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 9
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 8
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 7
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 6
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 5
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 4
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 3
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 2
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 1
testRecursive ----- 加锁前
testRecursive ----- 加锁后
程序正确运行,同一线程可以对可重入锁多次加锁。
3.2 os_unfair_lock
之前在介绍OSSpinLock
的时候已经提到过os_unfair_lock
,它是OSSpinLock
的底层实现,下面我们看一下os_unfair_lock
的源码,在swift-corelibs-foundation中搜索os_unfair_lock
:
typedef pthread_mutex_t os_unfair_lock;
typedef pthread_mutex_t * os_unfair_lock_t;
CF_INLINE void os_unfair_lock_lock(os_unfair_lock_t lock) { pthread_mutex_lock(lock); }
CF_INLINE void os_unfair_lock_unlock(os_unfair_lock_t lock) { pthread_mutex_unlock(lock); }
发现了吧,其实os_unfair_lock
和pthread_mutex_t
是一个东西,它就是pthread_mutex。。
3.3 NSLock
NSLock是对pthread_mutex的简单封装,更加易用且面向对象,下面是简化后的NSLock源码:
open class NSLock: NSObject, NSLocking {
internal var mutex = _MutexPointer.allocate(capacity: 1)
private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
private var timeoutMutex = _MutexPointer.allocate(capacity: 1)
public override init() {
pthread_mutex_init(mutex, nil)
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
}
deinit {
pthread_mutex_destroy(mutex)
mutex.deinitialize(count: 1)
mutex.deallocate()
deallocateTimedLockData(cond: timeoutCond, mutex: timeoutMutex)
}
open func lock() {
pthread_mutex_lock(mutex)
}
open func unlock() {
pthread_mutex_unlock(mutex)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
}
open func `try`() -> Bool {
return pthread_mutex_trylock(mutex) == 0
}
open func lock(before limit: Date) -> Bool {
if pthread_mutex_trylock(mutex) == 0 {
return true
}
return timedLock(mutex: mutex, endTime: limit, using: timeoutCond, with: timeoutMutex)
guard var endTime = timeSpecFrom(date: limit) else {
return false
}
return pthread_mutex_timedlock(mutex, &endTime) == 0
}
open var name: String?
}
看一下init
方法,这里初始化pthread_mutex_lock
的方法与我们上面介绍的一致,并且传入的属性为nil,这说明NSLock为不可重入锁,不能在递归中使用。在递归中使用锁时,可以使用NSRecursiveLock。
3.4 NSRecursiveLock
从名称就可以看出来,NSRecursiveLock是可重入锁,事实也确实如此,我们看一下NSRecursiveLock的源码:
open class NSRecursiveLock: NSObject, NSLocking {
internal var mutex = _RecursiveMutexPointer.allocate(capacity: 1)
private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
private var timeoutMutex = _MutexPointer.allocate(capacity: 1)
public override init() {
super.init()
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
pthread_cond_init(timeoutCond, nil)
pthread_mutex_init(timeoutMutex, nil)
}
deinit {
pthread_mutex_destroy(mutex)
mutex.deinitialize(count: 1)
mutex.deallocate()
deallocateTimedLockData(cond: timeoutCond, mutex: timeoutMutex)
}
open func lock() {
pthread_mutex_lock(mutex)
}
open func unlock() {
pthread_mutex_unlock(mutex)
// Wakeup any threads waiting in lock(before:)
pthread_mutex_lock(timeoutMutex)
pthread_cond_broadcast(timeoutCond)
pthread_mutex_unlock(timeoutMutex)
}
open func `try`() -> Bool {
return pthread_mutex_trylock(mutex) == 0
}
open func lock(before limit: Date) -> Bool {
if pthread_mutex_trylock(mutex) == 0 {
return true
}
return timedLock(mutex: mutex, endTime: limit, using: timeoutCond, with: timeoutMutex)
guard var endTime = timeSpecFrom(date: limit) else {
return false
}
return pthread_mutex_timedlock(mutex, &endTime) == 0
}
open var name: String?
}
NSRecursiveLock和NSLock的大部分源码是一致的,我们仍然重点关注init
方法,相比NSLock,NSRecursiveLock的init
方法多了以下代码:
withUnsafeMutablePointer(to: &attrib) { attrs in
pthread_mutexattr_init(attrs)
pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
pthread_mutex_init(mutex, attrs)
}
这就是是否是可重入锁的关键。
3.5 NSCondition和NSConditionLock
NSCondition是一个条件锁,可能平时用的不多,但与信号量相似:线程1需要等到条件1满足才会往下走,否则就会堵塞等待,直至条件满足。
与NSCondition相比,NSConditionLock是NSCondition加线程数的封装,NSConditionLock可以设置锁条件,而NSCondition只是无脑的通知信号。
NSCondition和NSConditionLock在日常开发中用的不多,这里就不做过多介绍了。相关源码可以在swift-corelibs-foundation中NSLock.swift
文件查看。
3.6 @synchronized
最后我们重点介绍一下@synchronized,@synchronized可能是日常开发中用的比较多的一种互斥锁,因为它的使用比较简单,但并不是在任意场景下都能使用@synchronized,且它的性能较低。先看一下基本用法。
在OC中使用:
@synchronized (obj) {
// 需要线程安全的代码
...
}
在swift中使用:
// 加锁
objc_sync_enter(obj)
// 需要线程安全的代码
...
// 解锁
objc_sync_exit(obj)
对比OC和swift的用法可知,@synchronized (obj)
与objc_sync_enter(obj)
、objc_sync_exit(obj)
是等价的,通过clang
OC代码,也可以得到一些信息:
int main(int argc, char * argv[]) {
NSString * appDelegateClassName;
/* @autoreleasepool */ { __AtAutoreleasePool __autoreleasepool;
appDelegateClassName = NSStringFromClass(((Class (*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("AppDelegate"), sel_registerName("class")));
{
id _rethrow = 0;
id _sync_obj = (id)appDelegateClassName;
objc_sync_enter(_sync_obj);
try {
struct _SYNC_EXIT {
_SYNC_EXIT(id arg) : sync_exit(arg) {}
~_SYNC_EXIT() {
objc_sync_exit(sync_exit);
}
id sync_exit;
}
_sync_exit(_sync_obj);
}
catch (id e) {_rethrow = e;}
{
struct _FIN { _FIN(id reth) : rethrow(reth) {}
~_FIN() { if (rethrow) objc_exception_throw(rethrow); }
id rethrow;
}_fin_force_rethow(_rethrow);
}
}
}
return UIApplicationMain(argc, argv, __null, appDelegateClassName);
}
可以看出,@synchronized (obj)
底层就是由objc_sync_enter(obj)
、objc_sync_exit(obj)
实现的,那我们就看一下objc_sync_enter(obj)
、objc_sync_exit(obj)
的源码(objc源码传送门)。
// Begin synchronizing on 'obj'.
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
ASSERT(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
// End synchronizing on 'obj'.
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
在深入探究objc_sync_enter(obj)
、objc_sync_exit(obj)
源码之前,我们先看一下它们有关的数据结构。
SyncData
typedef struct alignas(CacheLineSize) SyncData {
// 下一个SyncData的指针
struct SyncData* nextData;
// 加锁的对象
DisguisedPtr<objc_object> object;
// 对object加锁的线程数
int32_t threadCount; // number of THREADS using this block
// 加锁使用的递归锁
recursive_mutex_t mutex;
} SyncData;
SyncData 是@synchronized
实现线程安全最基本的数据结构,它记录了线程同步的一下数据,包括当前的加锁对象、对当前对象加锁的线程数、加锁使用的递归锁以及指向下一个SyncData的指针。
SyncCacheItem
typedef struct {
// 该缓存条目对应的SyncData
SyncData *data;
// 该对像在当前线程被加锁的次数
unsigned int lockCount; // number of times THIS THREAD locked this block
} SyncCacheItem;
SyncCacheItem记录的是SyncData在某个线程中被加锁的记录,一个SyncData可对应多个SyncCacheItem.
SyncCache
typedef struct SyncCache {
// 分配的缓存大小
unsigned int allocated;
// 已使用的缓存大小
unsigned int used;
// SyncCacheItem数组
SyncCacheItem list[0];
} SyncCache;
SyncCache记录了某个线程中所有的SyncCacheItem,同时记录了分配的缓存大小和已使用的缓存大小。
StripedMap
struct SyncList {
// SyncData链表
SyncData *data;
// 自旋锁
spinlock_t lock;
constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
};
// Use multiple parallel lists to decrease contention among unrelated objects.
#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;
class StripedMap {
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
enum { StripeCount = 8 };
#else
enum { StripeCount = 64 };
#endif
...
}
其中,sDataLists
是StripedMap<SyncList>
类型的静态变量,StripedMap<SyncList>
是一个最大可以存储64个变量的字典。LOCK_FOR_OBJ(obj)
和LIST_FOR_OBJ(obj)
两个宏定义,可以通过obj的内存地址,从sDataLists中获取对应的lock和SyncData链表。
_objc_pthread_data
在iOS中,每个线程都会维护一个_objc_pthread_data
结构体,该结构体会维护一个SyncCache
, 初始大小为4个SyncCacheItem
大小,当SyncCache
缓存填满时,会以当前缓存两倍的大小进行扩充。
static SyncCache *fetch_cache(bool create)
{
_objc_pthread_data *data;
data = _objc_fetch_pthread_data(create);
if (!data) return NULL;
if (!data->syncCache) {
if (!create) {
return NULL;
} else {
int count = 4;
data->syncCache = (SyncCache *)
calloc(1, sizeof(SyncCache) + count*sizeof(SyncCacheItem));
data->syncCache->allocated = count;
}
}
// Make sure there's at least one open slot in the list.
if (data->syncCache->allocated == data->syncCache->used) {
data->syncCache->allocated *= 2;
data->syncCache = (SyncCache *)
realloc(data->syncCache, sizeof(SyncCache)
+ data->syncCache->allocated * sizeof(SyncCacheItem));
}
return data->syncCache;
}
TLS
TLS全称是Thread Local Storage,在iOS中,每个线程都会维护一个TLS,用来存储本线程的一些变量,由于TLS只能在本线程访问,所以无需加锁保护。
通过tls_get_direct/tls_set_direct
可以从tls(Thread Local Storage)线程局部存储中快速获取和设置变量。
在@synchronized
的内部实现中,iOS定义了两个宏SYNC_DATA_DIRECT_KEY/SYNC_COUNT_DIRECT_KEY
,通过tls_get_direct/tls_set_direct
,可以从线程局部缓存中快速取得和设置SyncCacheItem.data和SyncCacheItem.lockCount。
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
其实_objc_pthread_data
也是存储在TLS中的,它对应的读取关键字为 _objc_pthread_key
。
基本上,实现@synchronized
用到的数据结构我们已经分析完了,看起来挺复杂,但是我们用一张图来表示一下,就比较清晰了(只列出了与@synchronized
有关的部分)。
分析完了数据结构,我们看一下objc_sync_enter
和objc_sync_exit
的具体实现。相关代码都在runtime源码中。
objc_sync_enter
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, ACQUIRE);
ASSERT(data);
data->mutex.lock();
} else {
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
objc_sync_exit
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {
// @synchronized(nil) does nothing
}
return result;
}
objc_sync_enter
和objc_sync_exit
的代码都比较简单,都是先调用id2data
方法获取到SyncData
,然后对SyncData
进行加锁解锁操作。
那么我们就看一下id2data的具体实现。
id2data
static SyncData* id2data(id object, enum usage why)
{
// 获取需加锁对象在sDataList中对应的自旋锁和SyncList
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;
// SUPPORT_DIRECT_THREAD_KEYS表示可以使用快速缓存
// tls_get_direct/tls_set_direct是从tls(Thread Local Storage)线程局部存储中获取变量
// 快速缓存的含义为:定义两个变量SYNC_DATA_DIRECT_KEY/SYNC_COUNT_DIRECT_KEY,
// 可以从线程局部缓存中快速取得SyncCacheItem.data和SyncCacheItem.lockCount
// 该缓存策略可以避免线程只对一个对象进行加锁时创建SyncCache的多余消耗
#if SUPPORT_DIRECT_THREAD_KEYS
// Check per-thread single-entry fast cache for matching object
bool fastCacheOccupied = NO;
// 从快速缓存中获取SyncData
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
if (data) {
// 将标记设置为yes
fastCacheOccupied = YES;
// 判断从快速缓存中获取的SyncData是否是需要加锁对象对应的SyncData
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
// 判断SyncData的合法性
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
// 根据操作做相应处理
switch(why) {
case ACQUIRE: {
// 加锁 lockCount加1,并更新快速缓存中的数据
lockCount++;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
break;
}
case RELEASE:
// 解锁,lockCount减1,并更新快速缓存中的数据
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
// 如果lockCount为0,从快速缓存中移除,并减少SyncData中被线程使用的个数
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
#endif
// 没有从快速缓存中找到对应的SyncData,则获取该线程对应的SyncCache,获取不到不需要新建
// Check per-thread cache of already-owned locks for matching object
SyncCache *cache = fetch_cache(NO);
if (cache) {
unsigned int i;
// 尝试从SyncCache中寻找需加锁object对应的SyncData,若找到,则做相应处理
for (i = 0; i < cache->used; i++) {
SyncCacheItem *item = &cache->list[i];
if (item->data->object != object) continue;
// Found a match.
result = item->data;
if (result->threadCount <= 0 || item->lockCount <= 0) {
_objc_fatal("id2data cache is buggy");
}
switch(why) {
case ACQUIRE:
item->lockCount++;
break;
case RELEASE:
item->lockCount--;
// 解锁后,如果lockCount为0,则从SyncCache中将该cacheItem移除
// 同时更新SyncData中threadCount的数目
if (item->lockCount == 0) {
// remove from per-thread cache
cache->list[i] = cache->list[--cache->used];
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
// Thread cache didn't find anything.
// Walk in-use list looking for matching object
// Spinlock prevents multiple threads from creating multiple
// locks for the same new object.
// We could keep the nodes in some hash table if we find that there are
// more than 20 or so distinct locks active, but we don't do that now.
// 线程对应的SyncCache不存在,或者没有从SyncCache中找到需加锁对象对应的SyncData
// 尝试从全局存储中对应的SyncList里搜索SyncData
// 加锁,防止多个线程同时操作同一个Object
lockp->lock();
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {
if ( p->object == object ) {
// 从SyncList中找到SyncData, 由于之前没有在SyncCache中找到,说明是该线程第一次对Object加锁
// 需要将SyncData的threadCount增加,并进入最后处理
result = p;
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);
goto done;
}
// 寻找SyncList中未使用的SyncData,并赋值给firstUnused
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}
// no SyncData currently associated with object
// 未找到SyncData,且为RELEASE或CHECK,直接进入最后处理
if ( (why == RELEASE) || (why == CHECK) )
goto done;
// an unused one was found, use it
// 将firstUnused置位当前object,并赋值给result,进入最后处理
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}
// Allocate a new SyncData and add to list.
// XXX allocating memory with a global lock held is bad practice,
// might be worth releasing the lock, allocating, and searching again.
// But since we never free these guys we won't be stuck in allocation very often.
// 快速缓存、SyncCache、全局存储对应的SyncList都未找到相应的SyncData,则为改需加锁对象创建新的SyncData
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
done:
lockp->unlock();
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
// 异常校验,保证安全,只有新加锁对象才会走这个逻辑分支
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
// 支持快速缓存且快速缓存未被占用,则存入快速缓存,否则存入SyncCache
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) {
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
} else
#endif
{
// Save in thread cache
if (!cache) cache = fetch_cache(YES);
cache->list[cache->used].data = result;
cache->list[cache->used].lockCount = 1;
cache->used++;
}
}
return result;
}
代码很多,但其实逻辑并不复杂,通过id2data
方法获取SyncData
的流程如下:
最后,总结一下@synchronize
:
-
@synchronize
是递归锁。 - 不能对非OC对象加锁,因为id2data接受的是id类型
- 不能对nil条件加锁,临界区代码可正常执行,但无法保证线程安全
-
@synchronize
性能低的原因是因为底层做了大量的增删盖查,但如果一个线程只对某一个对象反复加锁,性能较高(因为会存入线程的快速缓存中)。
3.7 互斥锁性能对比
4 总结
- 锁可分为互斥锁和自旋锁,互斥锁又可分为可重入锁和不可重入锁,可重入锁可以递归调用,重复加锁。
- OSSpinLock原本为自旋锁,因为安全问题,底层实现在iOS 10之后改为os_unfair_lock,替换完之后实际是互斥锁。
- atomic只能保证setter、getter的线程安全,并不能保证数据操作的安全。
- 读写锁是特殊的自旋锁,也会有安全问题,使用时需小心,可以使用信号量替代。
- pthread_mutex是最基本的互斥锁
- os_unfair_lock与pthread_mutex是同一个的东西
- NSLock、NSRecursiveLock、NSCondition、NSConditionLock都是基于pthread_mutex的封装。
- 普通场景下的线程安全可以使用NSLock、递归场景下可以使用NSRecursiveLock。
- @synchronize的性能低是因为底层做了大量的增删改查工作。
- @synchronize底层维护了一个哈希链表来存储SyncData,并且通过快速缓存来提升线程仅对某个对象反复加锁时的效率。