iOS 中的各种锁

前言

多线程开发是性能优化常用的技术,在多线程开发中,线程安全是绕不开的一个话题。线程安全的定义,在之前的文章中也有介绍。简单来讲,线程安全就是在同一时刻,对同一数据操作的线程只有一个,可以保证程序每次运行,都能得到预期的结果。

1 锁

为保证线程安全,多线程开发中经常会用的技术就是锁。锁是一种非强制机制,每个线程在操作数据或者资源前,先获取(Acquire)锁,并在操作结束后释放(Release)锁。如果锁已经被占用,那么其他线程获取锁时会等待,直到锁被释放。

1.1 锁的分类

在iOS中,锁只有两种:自旋锁和互斥锁。其他锁比如条件所、递归锁、信号量等都是基于这两种锁的封装。

互斥锁

当线程试图获取锁而锁被占用时,线程会进入休眠状态,等待锁释放时被唤醒。互斥锁又可分为可重入锁不可重入锁

  • 可重入锁:又叫递归锁,同一线程在锁被释放前可以再次获取锁,即可以递归调用。
  • 不可重入锁:非递归锁,线程必须要在锁被释放后才能获取锁,不可递归调用。

自旋锁

锁被占用时,线程会反复检查锁是否可用,线程不会休眠,而是处于一种忙等待状态。一旦获取到锁,线程会一直持有,知道显式释放。自旋锁减少的线程上下文的调度,适用于线程阻塞时间很短的场景。

自旋锁和互斥锁的区别在于,在锁被占用时,互斥锁会进入休眠,等待锁释放时被唤醒,而自旋锁则处于忙等状态,因此自选锁的效率更高,但可能会造成安全问题。

2 自旋锁

2.1 OSSpinLock

OSSpinLock是系统提供的一种自旋锁(iOS 10之前)。在iOS 10之前,OSSpinLock是不安全的,可能会造成优先级反转,导致无法执行。例如:优先级较高的A线程在获取自旋锁时,由于锁被占用而处于忙等状态,一直占用CPU时间片,而正在占用锁、优先级较低的B线程由于无法获取到CPU时间片,从而无法执行,不能释放锁。这样,A和B相互等待,形成死锁。
iOS 10之前,OSSpinLock的源码如下:

class spinlock_t {
    os_lock_handoff_s mLock;
 public:
    spinlock_t() : mLock(OS_LOCK_HANDOFF_INIT) { }
    
    void lock() { os_lock_lock(&mLock); }
    void unlock() { os_lock_unlock(&mLock); }
    bool trylock() { return os_lock_trylock(&mLock); }
    ...
};

可以看到,在iOS 10之前OSSpinLock的底层实现用的是os_lock_handoff_s。在iOS 10之后,苹果为了解决OSSpinLock的不安全问题,将底层实现换成了os_unfair_lock。源码如下:

typedef int32_t OSSpinLock UNAVAILABLE_ATTRIBUTE;
typedef struct os_unfair_lock_s os_unfair_lock UNAVAILABLE_ATTRIBUTE;
...
typedef mutex_t spinlock_t;
#define spinlock_lock(l) mutex_lock(l)
#define spinlock_unlock(l) mutex_unlock(l)
#define SPINLOCK_INITIALIZER MUTEX_INITIALIZER
...
using spinlock_t = mutex_tt<LOCKDEBUG>;
using mutex_t = mutex_tt<LOCKDEBUG>;
using monitor_t = monitor_tt<LOCKDEBUG>;
using recursive_mutex_t = recursive_mutex_tt<LOCKDEBUG>;
...
class mutex_tt : nocopy_t {
    os_unfair_lock mLock;
 public:
    ...
};

我们看一下简化后的代码,可以看出,spinlock_t就是mutex_tt, 而mutex_tt是基于os_unfair_lock实现的,从名称就可以看出来,替换底层实现后,虽然还叫OSSpinLock,但其实它已经是互斥锁了。关于os_unfair_lock,我们在讲互斥锁时,再详细探讨。

objc源码传送门

2.2 atomic原理

OC中,用atomic修饰的属性,在生成setter方法时,最终都会调用reallySetProperty方法,reallySetProperty的实现源码如下:

static inline void reallySetProperty(id self, SEL _cmd, id newValue, ptrdiff_t offset, bool atomic, bool copy, bool mutableCopy)
{
    if (offset == 0) {
        object_setClass(self, newValue);
        return;
    }

    id oldValue;
    id *slot = (id*) ((char*)self + offset);

    if (copy) {
        newValue = [newValue copyWithZone:nil];
    } else if (mutableCopy) {
        newValue = [newValue mutableCopyWithZone:nil];
    } else {
        if (*slot == newValue) return;
        newValue = objc_retain(newValue);
    }

    if (!atomic) {
        oldValue = *slot;
        *slot = newValue;
    } else {
        spinlock_t& slotlock = PropertyLocks[slot];
        slotlock.lock();
        oldValue = *slot;
        *slot = newValue;        
        slotlock.unlock();
    }

    objc_release(oldValue);
}

从代码中可以看出,atomic修饰的属性在赋值时做了spinlock处理,而非原子的属性,除了不加锁之外,其他逻辑与atomic一致。getter方法的处理方式与setter一致:

id objc_getProperty(id self, SEL _cmd, ptrdiff_t offset, BOOL atomic) {
    if (offset == 0) {
        return object_getClass(self);
    }

    // Retain release world
    id *slot = (id*) ((char*)self + offset);
    if (!atomic) return *slot;
        
    // Atomic retain release world
    spinlock_t& slotlock = PropertyLocks[slot];
    slotlock.lock();
    id value = objc_retain(*slot);
    slotlock.unlock();
    
    // for performance, we (safely) issue the autorelease OUTSIDE of the spinlock.
    return objc_autoreleaseReturnValue(value);
}

这里setter和getter中使用的spinlock就是替换为os_unfair_lock之后的,此时它本质上已经是一个互斥锁。

atomic只能保证setter、getter的方法安全,并不能保证数据安全。

2.3 读写锁

读写锁是一种特殊的自旋锁,它把资源访问者分为读者和写者。读者只能对共享资源进行读操作,写者能对共享资源进行写操作。写者具有排他性,一个读写锁只能同时有一个写者或多个读者。当没有写者时,多个读者可同时访问共享资源,提高性能。

当写者想要获取读写锁时,如果读写锁没有任何写者或者读者,那么该写者可以立刻或得读写锁,否则必须自旋在那里,直到没有任何读者或写者。

当读者想要获取读写锁时,如果当前没有写者,那么它可以立即或得读写锁,否则必须自旋在那里,直到写者释放读写锁。

// 初始化读写锁
let rwlock = UnsafeMutablePointer<pthread_rwlock_t>.allocate(capacity: 1)
pthread_rwlock_init(rwlock, nil)
// 写操作,加锁
pthread_rwlock_wrlock(rwlock)
// 写操作,尝试加锁
pthread_rwlock_trywrlock(rwlock)
// 读操作,加锁
pthread_rwlock_rdlock(rwlock)
// 读操作,尝试加锁
pthread_rwlock_tryrdlock(rwlock)
// 释放锁
pthread_rwlock_unlock(rwlock)
// 销毁锁
pthread_rwlock_destroy(rwlock)
rwlock.deinitialize(count: 1)
rwlock.deallocate()

既然读写锁是自旋锁,那么就会存在自旋锁优先级反转的安全问题。

我们考虑这样一种场景,假设某个火车站有四个窗口A、B、C、D,A和B只负责卖票,C和D只负责查票,为保证数据安全,卖票的时候不能查票,查票的时候也不能卖票。卖票和买票均在票数为0时停止。

很明显,这个场景非常适合读写锁:

private func startSaleTicket() {
    ticketCount = 50
    let queue1 = DispatchQueue(label: "dispatchSemaphore.queue1", attributes: .concurrent)
    let queue2 = DispatchQueue(label: "dispatchSemaphore.queue2", attributes: .concurrent)
    queue1.async { [weak self] in
        self?.saleTicket()
    }
    queue2.async { [weak self] in
        self?.saleTicket()
    }
}

private func saleTicket() {
    while true {
        // 对票数加锁
        pthread_rwlock_wrlock(rwlock)
        if (ticketCount > 0) {
            print("窗口 --- \(Thread.current), 剩余票数 ---- \(ticketCount)")
            ticketCount -= 1
            // 模拟耗时操作
            Thread.sleep(forTimeInterval: 1)
            // 售票完成释放锁
            pthread_rwlock_unlock(rwlock)
        } else {
            print("--- 所有票已售完 --- \(Thread.current)")
            break
        }
    }
    pthread_rwlock_unlock(rwlock)
}
    
    
private func selectTicket() {
    DispatchQueue.global().async {
        self.readTicket()
    }
    DispatchQueue.global().async {
        self.readTicket()
    }
    DispatchQueue.global().async {
        self.readTicket()
    }
}

private func readTicket() {
    while (ticketCount > 0) {  
        // 加读锁      
        pthread_rwlock_tryrdlock(rwlock)
        print("readTicket --- \(Thread.current), 剩余票数 ---- \(ticketCount)")
        // 模拟耗时操作
        Thread.sleep(forTimeInterval: 0.5)
        // 释放锁
        pthread_rwlock_unlock(rwlock)
    }   
}

运行起来就会发现,查询窗口不停的在查询,而售票窗口则无法售票。这是因为查询线程有多个,占用了读者锁,当售票线程获取写者锁时,因为有读者锁存在,所以售票线程处于自旋等待状态。而查询线程一直在查询,导致读者锁一直被占用,售票线程就一直无法执行。

虽然上述场景在实际应用中比较少,但一旦用到读写锁,还是要考虑安全问题。可以使用dispatch_semaphore来替代读写锁。

3 互斥锁

互斥锁在锁被占用时,线程会进入休眠,直到锁释放时被唤醒。互斥锁又可以分为可重入锁和不可重入锁,最基础的互斥锁是pthread_mutex,其他互斥锁如NSLock、NSRecursiveLock、NSConditionLock等都是基于pthread_mutex的面向对象封装。我们可以在以开源的swift-corelibs-foundation中查看相关的swift源码。

3.1 pthread_mutex

pthread_mutex就是互斥锁本身,所有的其他互斥锁都是基于它实现的,pthread_mutex的定义如下:

typedef __darwin_pthread_mutex_t pthread_mutex_t;

pthread_mutex使用:

// 声明全局mutex锁
let mutex = UnsafeMutablePointer<pthread_mutex_t>.allocate(capacity: 1)
// 初始化锁
pthread_mutex_init(mutex, nil)
// 加锁
pthread_mutex_lock(mutex)
// 需要做线程安全的操作
//...
// 释放锁
pthread_mutex_unlock(mutex)
// 销毁锁
pthread_mutex_destroy(mutex)
mutex.deinitialize(count: 1)
mutex.deallocate()

那么以上面这种方式初始化的mutex lock能否递归调用,即是否是可重入锁呢?下面我们递归调用试一下:

private func recursiveLockTest(value: Int) {
    print("testRecursive ----- 加锁前")
    pthread_mutex_lock(mutex)
    print("testRecursive ----- 加锁后")
    if (value > 0) {
        print("testRecursive ----- value ---- \(value)")
        recursiveLockTest(value: value - 1)
    }
    pthread_mutex_unlock(mutex)
}

private func testRecursive() {
    DispatchQueue.global().async {
        self.recursiveLockTest(value: 10)
    }
}

输出结果如下:

testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 10
testRecursive ----- 加锁前

可以看到,只打了一次value值,注意,这时候线程不是死锁了,而是阻塞了。第一次加锁时,成功获取到了锁,并打印了value值,当第二次加锁时,由于第一加的锁并未释放,所以线程阻塞。

这说明我们以上面的方式初始化的锁是不可重入锁,那么怎么获取到可重入锁呢?pthread_mutex是通过属性设置来标记是否是可重入锁的,当属性传nil时,默认得到的就是不可重入锁。设置可重入锁的方式如下:

var attrb = pthread_mutexattr_t()
withUnsafeMutablePointer(to: &attrb) { (attrs) in
    pthread_mutexattr_init(attrs)
    pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
    pthread_mutex_init(mutex, attrs)
}

当初始化为可重入锁后,我们再运行递归代码输出如下:

testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 10
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 9
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 8
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 7
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 6
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 5
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 4
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 3
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 2
testRecursive ----- 加锁前
testRecursive ----- 加锁后
testRecursive ----- value ---- 1
testRecursive ----- 加锁前
testRecursive ----- 加锁后

程序正确运行,同一线程可以对可重入锁多次加锁。

3.2 os_unfair_lock

之前在介绍OSSpinLock的时候已经提到过os_unfair_lock,它是OSSpinLock的底层实现,下面我们看一下os_unfair_lock的源码,在swift-corelibs-foundation中搜索os_unfair_lock

typedef pthread_mutex_t os_unfair_lock;
typedef pthread_mutex_t * os_unfair_lock_t;
CF_INLINE void os_unfair_lock_lock(os_unfair_lock_t lock) { pthread_mutex_lock(lock); }
CF_INLINE void os_unfair_lock_unlock(os_unfair_lock_t lock) { pthread_mutex_unlock(lock); }

发现了吧,其实os_unfair_lockpthread_mutex_t是一个东西,它就是pthread_mutex。。

3.3 NSLock

NSLock是对pthread_mutex的简单封装,更加易用且面向对象,下面是简化后的NSLock源码:

open class NSLock: NSObject, NSLocking {
    internal var mutex = _MutexPointer.allocate(capacity: 1)
    private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
    private var timeoutMutex = _MutexPointer.allocate(capacity: 1)

    public override init() {
        pthread_mutex_init(mutex, nil)
        pthread_cond_init(timeoutCond, nil)
        pthread_mutex_init(timeoutMutex, nil)
    }
    
    deinit {
        pthread_mutex_destroy(mutex)
        mutex.deinitialize(count: 1)
        mutex.deallocate()
        deallocateTimedLockData(cond: timeoutCond, mutex: timeoutMutex)
    }
    
    open func lock() {
        pthread_mutex_lock(mutex)
    }

    open func unlock() {
        pthread_mutex_unlock(mutex)
        // Wakeup any threads waiting in lock(before:)
        pthread_mutex_lock(timeoutMutex)
        pthread_cond_broadcast(timeoutCond)
        pthread_mutex_unlock(timeoutMutex)
    }

    open func `try`() -> Bool {
        return pthread_mutex_trylock(mutex) == 0
    }
    
    open func lock(before limit: Date) -> Bool {
        if pthread_mutex_trylock(mutex) == 0 {
            return true
        }
        return timedLock(mutex: mutex, endTime: limit, using: timeoutCond, with: timeoutMutex)
        guard var endTime = timeSpecFrom(date: limit) else {
            return false
        }
        return pthread_mutex_timedlock(mutex, &endTime) == 0
    }

    open var name: String?
}

看一下init方法,这里初始化pthread_mutex_lock的方法与我们上面介绍的一致,并且传入的属性为nil,这说明NSLock为不可重入锁,不能在递归中使用。在递归中使用锁时,可以使用NSRecursiveLock。

3.4 NSRecursiveLock

从名称就可以看出来,NSRecursiveLock是可重入锁,事实也确实如此,我们看一下NSRecursiveLock的源码:

open class NSRecursiveLock: NSObject, NSLocking {
    internal var mutex = _RecursiveMutexPointer.allocate(capacity: 1)
    private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
    private var timeoutMutex = _MutexPointer.allocate(capacity: 1)

    public override init() {
        super.init()
        withUnsafeMutablePointer(to: &attrib) { attrs in
            pthread_mutexattr_init(attrs)
            pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
            pthread_mutex_init(mutex, attrs)
        }
        pthread_cond_init(timeoutCond, nil)
        pthread_mutex_init(timeoutMutex, nil)
    }
    
    deinit {
        pthread_mutex_destroy(mutex)
        mutex.deinitialize(count: 1)
        mutex.deallocate()
        deallocateTimedLockData(cond: timeoutCond, mutex: timeoutMutex)
    }
    
    open func lock() {
        pthread_mutex_lock(mutex)
    }
    
    open func unlock() {
        pthread_mutex_unlock(mutex)
        // Wakeup any threads waiting in lock(before:)
        pthread_mutex_lock(timeoutMutex)
        pthread_cond_broadcast(timeoutCond)
        pthread_mutex_unlock(timeoutMutex)
    }
    
    open func `try`() -> Bool {
        return pthread_mutex_trylock(mutex) == 0
    }
    
    open func lock(before limit: Date) -> Bool {
        if pthread_mutex_trylock(mutex) == 0 {
            return true
        }

        return timedLock(mutex: mutex, endTime: limit, using: timeoutCond, with: timeoutMutex)
        guard var endTime = timeSpecFrom(date: limit) else {
            return false
        }
        return pthread_mutex_timedlock(mutex, &endTime) == 0
    }

    open var name: String?
}

NSRecursiveLock和NSLock的大部分源码是一致的,我们仍然重点关注init方法,相比NSLock,NSRecursiveLock的init方法多了以下代码:

 withUnsafeMutablePointer(to: &attrib) { attrs in
    pthread_mutexattr_init(attrs)
    pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
    pthread_mutex_init(mutex, attrs)
}

这就是是否是可重入锁的关键。

3.5 NSCondition和NSConditionLock

NSCondition是一个条件锁,可能平时用的不多,但与信号量相似:线程1需要等到条件1满足才会往下走,否则就会堵塞等待,直至条件满足。

与NSCondition相比,NSConditionLock是NSCondition加线程数的封装,NSConditionLock可以设置锁条件,而NSCondition只是无脑的通知信号。

NSCondition和NSConditionLock在日常开发中用的不多,这里就不做过多介绍了。相关源码可以在swift-corelibs-foundationNSLock.swift文件查看。

3.6 @synchronized

最后我们重点介绍一下@synchronized,@synchronized可能是日常开发中用的比较多的一种互斥锁,因为它的使用比较简单,但并不是在任意场景下都能使用@synchronized,且它的性能较低。先看一下基本用法。
在OC中使用:

@synchronized (obj) {
    // 需要线程安全的代码
    ...
}

在swift中使用:

// 加锁
objc_sync_enter(obj)
// 需要线程安全的代码
...
// 解锁
objc_sync_exit(obj)

对比OC和swift的用法可知,@synchronized (obj)objc_sync_enter(obj)objc_sync_exit(obj)是等价的,通过clang OC代码,也可以得到一些信息:

int main(int argc, char * argv[]) {
    NSString * appDelegateClassName;
    /* @autoreleasepool */ { __AtAutoreleasePool __autoreleasepool; 

        appDelegateClassName = NSStringFromClass(((Class (*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("AppDelegate"), sel_registerName("class")));
        {
            id _rethrow = 0;
            id _sync_obj = (id)appDelegateClassName;
            objc_sync_enter(_sync_obj);
            try {
                struct _SYNC_EXIT {
                    _SYNC_EXIT(id arg) : sync_exit(arg) {}
                    ~_SYNC_EXIT() {
                        objc_sync_exit(sync_exit);
                    }
                    id sync_exit;
                }
                _sync_exit(_sync_obj);
            }
            catch (id e) {_rethrow = e;}
            {
                struct _FIN { _FIN(id reth) : rethrow(reth) {}
                    ~_FIN() { if (rethrow) objc_exception_throw(rethrow); }
                    id rethrow;
                }_fin_force_rethow(_rethrow);
            }
        }
    }
    return UIApplicationMain(argc, argv, __null, appDelegateClassName);
}

可以看出,@synchronized (obj)底层就是由objc_sync_enter(obj)objc_sync_exit(obj)实现的,那我们就看一下objc_sync_enter(obj)objc_sync_exit(obj)的源码(objc源码传送门)。

// Begin synchronizing on 'obj'. 
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.  
int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        ASSERT(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}
// End synchronizing on 'obj'. 
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); 
        if (!data) {
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
    }
    return result;
}

在深入探究objc_sync_enter(obj)objc_sync_exit(obj)源码之前,我们先看一下它们有关的数据结构。

SyncData

typedef struct alignas(CacheLineSize) SyncData {
    // 下一个SyncData的指针
    struct SyncData* nextData;
    // 加锁的对象
    DisguisedPtr<objc_object> object;
    // 对object加锁的线程数
    int32_t threadCount;  // number of THREADS using this block
    // 加锁使用的递归锁
    recursive_mutex_t mutex;
} SyncData;

SyncData 是@synchronized实现线程安全最基本的数据结构,它记录了线程同步的一下数据,包括当前的加锁对象、对当前对象加锁的线程数、加锁使用的递归锁以及指向下一个SyncData的指针。

SyncCacheItem

typedef struct {
    // 该缓存条目对应的SyncData
    SyncData *data;
    // 该对像在当前线程被加锁的次数
    unsigned int lockCount;  // number of times THIS THREAD locked this block
} SyncCacheItem;

SyncCacheItem记录的是SyncData在某个线程中被加锁的记录,一个SyncData可对应多个SyncCacheItem.

SyncCache

typedef struct SyncCache {
    // 分配的缓存大小
    unsigned int allocated;
    // 已使用的缓存大小
    unsigned int used;
    // SyncCacheItem数组
    SyncCacheItem list[0];
} SyncCache;

SyncCache记录了某个线程中所有的SyncCacheItem,同时记录了分配的缓存大小和已使用的缓存大小。

StripedMap

struct SyncList {
    // SyncData链表
    SyncData *data;
    // 自旋锁
    spinlock_t lock;

    constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
};

// Use multiple parallel lists to decrease contention among unrelated objects.
#define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
#define LIST_FOR_OBJ(obj) sDataLists[obj].data
static StripedMap<SyncList> sDataLists;

class StripedMap {
#if TARGET_OS_IPHONE && !TARGET_OS_SIMULATOR
    enum { StripeCount = 8 };
#else
    enum { StripeCount = 64 };
#endif
    ...
}

其中,sDataListsStripedMap<SyncList>类型的静态变量,StripedMap<SyncList>是一个最大可以存储64个变量的字典。LOCK_FOR_OBJ(obj)LIST_FOR_OBJ(obj)两个宏定义,可以通过obj的内存地址,从sDataLists中获取对应的lock和SyncData链表。

_objc_pthread_data

在iOS中,每个线程都会维护一个_objc_pthread_data结构体,该结构体会维护一个SyncCache, 初始大小为4个SyncCacheItem大小,当SyncCache缓存填满时,会以当前缓存两倍的大小进行扩充。

static SyncCache *fetch_cache(bool create)
{
    _objc_pthread_data *data;
    
    data = _objc_fetch_pthread_data(create);
    if (!data) return NULL;

    if (!data->syncCache) {
        if (!create) {
            return NULL;
        } else {
            int count = 4;
            data->syncCache = (SyncCache *)
                calloc(1, sizeof(SyncCache) + count*sizeof(SyncCacheItem));
            data->syncCache->allocated = count;
        }
    }

    // Make sure there's at least one open slot in the list.
    if (data->syncCache->allocated == data->syncCache->used) {
        data->syncCache->allocated *= 2;
        data->syncCache = (SyncCache *)
            realloc(data->syncCache, sizeof(SyncCache) 
                    + data->syncCache->allocated * sizeof(SyncCacheItem));
    }

    return data->syncCache;
}

TLS

TLS全称是Thread Local Storage,在iOS中,每个线程都会维护一个TLS,用来存储本线程的一些变量,由于TLS只能在本线程访问,所以无需加锁保护。

通过tls_get_direct/tls_set_direct可以从tls(Thread Local Storage)线程局部存储中快速获取和设置变量。

@synchronized的内部实现中,iOS定义了两个宏SYNC_DATA_DIRECT_KEY/SYNC_COUNT_DIRECT_KEY,通过tls_get_direct/tls_set_direct,可以从线程局部缓存中快速取得和设置SyncCacheItem.data和SyncCacheItem.lockCount。

SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);

其实_objc_pthread_data也是存储在TLS中的,它对应的读取关键字为 _objc_pthread_key

基本上,实现@synchronized用到的数据结构我们已经分析完了,看起来挺复杂,但是我们用一张图来表示一下,就比较清晰了(只列出了与@synchronized有关的部分)。

image

分析完了数据结构,我们看一下objc_sync_enterobjc_sync_exit的具体实现。相关代码都在runtime源码中

objc_sync_enter

int objc_sync_enter(id obj)
{
    int result = OBJC_SYNC_SUCCESS;

    if (obj) {
        SyncData* data = id2data(obj, ACQUIRE);
        ASSERT(data);
        data->mutex.lock();
    } else {
        // @synchronized(nil) does nothing
        if (DebugNilSync) {
            _objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
        }
        objc_sync_nil();
    }

    return result;
}

objc_sync_exit

int objc_sync_exit(id obj)
{
    int result = OBJC_SYNC_SUCCESS;
    
    if (obj) {
        SyncData* data = id2data(obj, RELEASE); 
        if (!data) {
            result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
        } else {
            bool okay = data->mutex.tryUnlock();
            if (!okay) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            }
        }
    } else {
        // @synchronized(nil) does nothing
    }
    

    return result;
}

objc_sync_enterobjc_sync_exit的代码都比较简单,都是先调用id2data方法获取到SyncData,然后对SyncData进行加锁解锁操作。

那么我们就看一下id2data的具体实现。

id2data

static SyncData* id2data(id object, enum usage why)
{
    // 获取需加锁对象在sDataList中对应的自旋锁和SyncList
    spinlock_t *lockp = &LOCK_FOR_OBJ(object);
    SyncData **listp = &LIST_FOR_OBJ(object);
    SyncData* result = NULL;

    // SUPPORT_DIRECT_THREAD_KEYS表示可以使用快速缓存
    // tls_get_direct/tls_set_direct是从tls(Thread Local Storage)线程局部存储中获取变量
    // 快速缓存的含义为:定义两个变量SYNC_DATA_DIRECT_KEY/SYNC_COUNT_DIRECT_KEY,
    // 可以从线程局部缓存中快速取得SyncCacheItem.data和SyncCacheItem.lockCount
    // 该缓存策略可以避免线程只对一个对象进行加锁时创建SyncCache的多余消耗
#if SUPPORT_DIRECT_THREAD_KEYS
    // Check per-thread single-entry fast cache for matching object
    bool fastCacheOccupied = NO;
    // 从快速缓存中获取SyncData
    SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
    if (data) {
        // 将标记设置为yes
        fastCacheOccupied = YES;
        // 判断从快速缓存中获取的SyncData是否是需要加锁对象对应的SyncData
        if (data->object == object) {
            // Found a match in fast cache.
            uintptr_t lockCount;

            result = data;
            lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
            // 判断SyncData的合法性
            if (result->threadCount <= 0  ||  lockCount <= 0) {
                _objc_fatal("id2data fastcache is buggy");
            }
            // 根据操作做相应处理
            switch(why) {
            case ACQUIRE: {
                // 加锁 lockCount加1,并更新快速缓存中的数据
                lockCount++;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                break;
            }
            case RELEASE:
                // 解锁,lockCount减1,并更新快速缓存中的数据
                lockCount--;
                tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
                // 如果lockCount为0,从快速缓存中移除,并减少SyncData中被线程使用的个数
                if (lockCount == 0) {
                    // remove from fast cache
                    tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }
#endif
    // 没有从快速缓存中找到对应的SyncData,则获取该线程对应的SyncCache,获取不到不需要新建
    // Check per-thread cache of already-owned locks for matching object
    SyncCache *cache = fetch_cache(NO);
    if (cache) {
        unsigned int i;
        // 尝试从SyncCache中寻找需加锁object对应的SyncData,若找到,则做相应处理
        for (i = 0; i < cache->used; i++) {
            SyncCacheItem *item = &cache->list[i];
            if (item->data->object != object) continue;

            // Found a match.
            result = item->data;
            if (result->threadCount <= 0  ||  item->lockCount <= 0) {
                _objc_fatal("id2data cache is buggy");
            }
                
            switch(why) {
            case ACQUIRE:
                item->lockCount++;
                break;
            case RELEASE:
                item->lockCount--;
                // 解锁后,如果lockCount为0,则从SyncCache中将该cacheItem移除
                // 同时更新SyncData中threadCount的数目
                if (item->lockCount == 0) {
                    // remove from per-thread cache
                    cache->list[i] = cache->list[--cache->used];
                    // atomic because may collide with concurrent ACQUIRE
                    OSAtomicDecrement32Barrier(&result->threadCount);
                }
                break;
            case CHECK:
                // do nothing
                break;
            }

            return result;
        }
    }

    // Thread cache didn't find anything.
    // Walk in-use list looking for matching object
    // Spinlock prevents multiple threads from creating multiple 
    // locks for the same new object.
    // We could keep the nodes in some hash table if we find that there are
    // more than 20 or so distinct locks active, but we don't do that now.
    
    // 线程对应的SyncCache不存在,或者没有从SyncCache中找到需加锁对象对应的SyncData
    // 尝试从全局存储中对应的SyncList里搜索SyncData
    
    // 加锁,防止多个线程同时操作同一个Object
    lockp->lock();

    {
        SyncData* p;
        SyncData* firstUnused = NULL;
        for (p = *listp; p != NULL; p = p->nextData) {
            if ( p->object == object ) {
                // 从SyncList中找到SyncData, 由于之前没有在SyncCache中找到,说明是该线程第一次对Object加锁
                // 需要将SyncData的threadCount增加,并进入最后处理
                result = p;
                // atomic because may collide with concurrent RELEASE
                OSAtomicIncrement32Barrier(&result->threadCount);
                goto done;
            }
            // 寻找SyncList中未使用的SyncData,并赋值给firstUnused
            if ( (firstUnused == NULL) && (p->threadCount == 0) )
                firstUnused = p;
        }
    
        // no SyncData currently associated with object
        // 未找到SyncData,且为RELEASE或CHECK,直接进入最后处理
        if ( (why == RELEASE) || (why == CHECK) )
            goto done;
    
        // an unused one was found, use it
        // 将firstUnused置位当前object,并赋值给result,进入最后处理
        if ( firstUnused != NULL ) {
            result = firstUnused;
            result->object = (objc_object *)object;
            result->threadCount = 1;
            goto done;
        }
    }

    // Allocate a new SyncData and add to list.
    // XXX allocating memory with a global lock held is bad practice,
    // might be worth releasing the lock, allocating, and searching again.
    // But since we never free these guys we won't be stuck in allocation very often.
    
    // 快速缓存、SyncCache、全局存储对应的SyncList都未找到相应的SyncData,则为改需加锁对象创建新的SyncData
    posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
    result->object = (objc_object *)object;
    result->threadCount = 1;
    new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
    result->nextData = *listp;
    *listp = result;
    
 done:
    lockp->unlock();
    if (result) {
        // Only new ACQUIRE should get here.
        // All RELEASE and CHECK and recursive ACQUIRE are 
        // handled by the per-thread caches above.
        
        // 异常校验,保证安全,只有新加锁对象才会走这个逻辑分支
        if (why == RELEASE) {
            // Probably some thread is incorrectly exiting 
            // while the object is held by another thread.
            return nil;
        }
        if (why != ACQUIRE) _objc_fatal("id2data is buggy");
        if (result->object != object) _objc_fatal("id2data is buggy");

        // 支持快速缓存且快速缓存未被占用,则存入快速缓存,否则存入SyncCache
#if SUPPORT_DIRECT_THREAD_KEYS
        if (!fastCacheOccupied) {
            // Save in fast thread cache
            tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
            tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);
        } else 
#endif
        {
            // Save in thread cache
            if (!cache) cache = fetch_cache(YES);
            cache->list[cache->used].data = result;
            cache->list[cache->used].lockCount = 1;
            cache->used++;
        }
    }

    return result;
}

代码很多,但其实逻辑并不复杂,通过id2data方法获取SyncData的流程如下:

image

最后,总结一下@synchronize:

  • @synchronize是递归锁。
  • 不能对非OC对象加锁,因为id2data接受的是id类型
  • 不能对nil条件加锁,临界区代码可正常执行,但无法保证线程安全
  • @synchronize性能低的原因是因为底层做了大量的增删盖查,但如果一个线程只对某一个对象反复加锁,性能较高(因为会存入线程的快速缓存中)。

3.7 互斥锁性能对比

image

4 总结

  • 锁可分为互斥锁和自旋锁,互斥锁又可分为可重入锁和不可重入锁,可重入锁可以递归调用,重复加锁。
  • OSSpinLock原本为自旋锁,因为安全问题,底层实现在iOS 10之后改为os_unfair_lock,替换完之后实际是互斥锁。
  • atomic只能保证setter、getter的线程安全,并不能保证数据操作的安全。
  • 读写锁是特殊的自旋锁,也会有安全问题,使用时需小心,可以使用信号量替代。
  • pthread_mutex是最基本的互斥锁
  • os_unfair_lock与pthread_mutex是同一个的东西
  • NSLock、NSRecursiveLock、NSCondition、NSConditionLock都是基于pthread_mutex的封装。
  • 普通场景下的线程安全可以使用NSLock、递归场景下可以使用NSRecursiveLock。
  • @synchronize的性能低是因为底层做了大量的增删改查工作。
  • @synchronize底层维护了一个哈希链表来存储SyncData,并且通过快速缓存来提升线程仅对某个对象反复加锁时的效率。
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容

  • 在日常开发过程中,为了提升程序运行效率,以及用户体验,我们经常使用多线程。在使用多线程的过程中,难免会遇到资源竞争...
    星捷阅读 4,262评论 0 0
  • 在日常开发过程中,为了提升程序运行效率,以及用户体验,我们经常使用多线程。在使用多线程的过程中,难免会遇到资源竞争...
    xiao333ma阅读 5,889评论 1 2
  • 在日常开发过程中,为了提升程序运行效率,以及用户体验,我们经常使用多线程。在使用多线程的过程中,难免会遇到资源竞争...
    知识小集阅读 11,067评论 8 61
  • 欢迎阅读iOS探索系列(按序阅读食用效果更加)iOS探索 alloc流程iOS探索 内存对齐&malloc源码iO...
    吕子乔_eabd阅读 4,816评论 0 2
  • 写在前面 多线程在日常开发中能起到性能优化的作用,但是一旦没用好就会造成线程不安全,本文就来讲讲如何保证线程安全 ...
    M_慕宸阅读 3,490评论 0 5