前言
- 今天来研究一下Dispatch_source源 & @Synchronized
准备
一、 dispatch_source源
- dispatch_source源是一个偏底层的函数集合,使用时CPU负荷非常小,尽量不占资源,开发过程中大多配合定时器使用。
创建dispatch_source:
dispatch_source_t source = dispatch_source_create(dispatch_source_type_t type, uintptr_t handle, unsigned long mask, dispatch_queue_t queue)
参数 | 说明 |
---|---|
type | dispatch源可处理的事件 |
handle | 可以理解为句柄、索引或id,假如要监听进程,需要传入进程的ID |
mask | 可以理解为描述,提供更详细的描述,让它知道具体要监听什么 |
queue | 自定义源需要的一个队列,用来处理所有的响应句柄 |
- 任何线程调用它的函数dispatch_source_merge_data后,会执行DispatchSource事先定义好的句柄(可以把句柄简单理解为一个block),这个过程叫custom event,用户事件。是dispatch_source支持处理的一种事件。
句柄是一种指向指针的指针。它指向的是一个类或结构,它和系统有很密切的关系。
HINSTANCE实例句柄、HBITMAP位图句柄、HDC设备表述句柄、HICON图标句柄 等。其中还有一个通用句柄,就是HANDLE。
常用方法:
dispatch_source_create
:创建源dispatch_source_set_event_handler
: 设置源事件回调dispatch_source_merge_data
:置源事件设置数据dispatch_source_get_data
:获取源事件数据dispatch_resume
: 继续dispatch_suspend
: 挂起dispatch_cancel
: 取消
举个例子:
- (void)dispatch_source_TSET
{
__block NSInteger totalComplete = 0;
// 创建串行队列
dispatch_queue_t queue = dispatch_queue_create("HJ", NULL);
// 创建主队列源,源类型为 DISPATCH_SOURCE_TYPE_DATA_ADD
dispatch_source_t source = dispatch_source_create(DISPATCH_SOURCE_TYPE_DATA_ADD, 0, 0, dispatch_get_main_queue());
// 设置源事件回调
dispatch_source_set_event_handler(source, ^{
NSLog(@"%@",[NSThread currentThread]);
NSUInteger value = dispatch_source_get_data(source);
totalComplete += value;
NSLog(@"进度: %.2f", totalComplete/100.0);
});
// 开启源事件
dispatch_resume(source);
// 发送数据源
for (int i= 0; i<100; i++) {
dispatch_async(queue, ^{
sleep(1);
// 发送源数据
dispatch_source_merge_data(source, 1);
});
}
2020-11-12 14:19:04.969558+0800 007---Dispatch_source[29658:494902] <NSThread: 0x600003048080>{number = 1, name = main}
2020-11-12 14:19:04.969941+0800 007---Dispatch_source[29658:494902] 进度: 0.01
2020-11-12 14:19:05.970992+0800 007---Dispatch_source[29658:494902] <NSThread: 0x600003048080>{number = 1, name = main}
2020-11-12 14:19:05.971493+0800 007---Dispatch_source[29658:494902] 进度: 0.02
2020-11-12 14:19:06.975953+0800 007---Dispatch_source[29658:494902] <NSThread: 0x600003048080>{number = 1, name = main}
2020-11-12 14:19:06.976476+0800 007---Dispatch_source[29658:494902] 进度: 0.03
2020-11-12 14:19:07.981136+0800 007---Dispatch_source[29658:494902] <NSThread: 0x600003048080>{number = 1, name = main}
2020-11-12 14:19:07.981392+0800 007---Dispatch_source[29658:494902] 进度: 0.04
2020-11-12 14:19:08.986199+0800 007---Dispatch_source[29658:494902] <NSThread: 0x600003048080>{number = 1, name = main}
2020-11-12 14:19:08.986579+0800 007---Dispatch_source[29658:494902] 进度: 0.05
----------------------------------------------------------------------------------------------------
平时用于定时器的简单使用案例:
- (void)use033{
//倒计时时间
__block int timeout = 3;
//创建队列
dispatch_queue_t globalQueue = dispatch_get_global_queue(0, 0);
//创建timer
dispatch_source_t timer = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, globalQueue);
//设置1s触发一次,0s的误差
/*
- source 分派源
- start 数控制计时器第一次触发的时刻。参数类型是 dispatch_time_t,这是一个opaque类型,我们不能直接操作它。我们得需要 dispatch_time 和 dispatch_walltime 函数来创建它们。另外,常量 DISPATCH_TIME_NOW 和 DISPATCH_TIME_FOREVER 通常很有用。
- interval 间隔时间
- leeway 计时器触发的精准程度
*/
dispatch_source_set_timer(timer,dispatch_walltime(NULL, 0),1.0*NSEC_PER_SEC, 0);
//触发的事件
dispatch_source_set_event_handler(timer, ^{
//倒计时结束,关闭
if (timeout <= 0) {
//取消dispatch源
dispatch_source_cancel(timer);
}else{
timeout--;
dispatch_async(dispatch_get_main_queue(), ^{
//更新主界面的操作
NSLog(@"倒计时 - %d", timeout);
});
}
});
//开始执行dispatch源
dispatch_resume(timer);
}
- 因为dispatch_source不依赖于Runloop,而是直接和底层内核交互,准确性更高。
- 时间准确,可以使用子线程
二、 synchronized锁
锁的理解:
所谓锁,就是在操作数据的时候,为了防止多个操作同时操作一个数据导致数据的错乱或者非即时而采用的一种规避手段。尤其在使用多线程进行开发时,由于多个线程的操作可能会同时对某个数据进行操作,可能是读也可能是写,如果不加以处理,那么可能在一个线程读的时候另一个线程去写,读的线程得到的数据就可能不是最新的数据,或者两个线程同时对数据进行修改,导致一些不可预知的错乱。
锁是线程编程同步工具的基础
加锁
:在A线程操作数据的时候,将数据给锁住,锁住的意思也就是不允许其他线程来操作这个数据,想操作的都得进行等待,直到 A操作完了,才将锁给打开,这时才允许其他线程排队进行操作。 之前分析的信号量
也是用来控制加锁解锁
的
iOS 中加锁常用的方式:
NSLock
dispatch_semaphore_wait
@synchronized
咋们主要分析@synchronized
互斥递归锁
@synchronized 的基本用法:
@synchronized(这里添加一个OC对象,一般使用self) {
这里写要加锁的代码
}
注意点
1.加锁的代码尽量少
2.添加的OC对象必须在多个线程中都是同一对象
3.优点是不需要显式的创建锁对象,便可以实现锁的机制。
4. @synchronized块会隐式的添加一个异常处理例程来保护代码,该处理例程会在异常抛出的时候自动的释放互斥锁。所以如果不想让隐式的异常处理例程带来额外的开销,你可以考虑使用锁对象。
- @synchronized() 小括号内需要一个参数,这个参数就表示信号量。这个参数可以是任何对象,包括 self,或者是自定义的信号量。针对不同的操作应该定义不同的信号量。
- @synchronized() {…} 大括号中就是要加锁执行的代码,代码会操作一些数据。当开始执行代码时,意味着当前线程对其加锁了,当代码执行完后,自动解锁,其他线程才允许执行此段代码。
最常见的买票案例如图:
//设置票的数量为5
_tickets = 5;
//线程1
dispatch_async(self.concurrentQueue, ^{
[self saleTickets];
});
//线程2
dispatch_async(self.concurrentQueue, ^{
[self saleTickets];
});
- (void)saleTickets
{
while (1) {
@synchronized(self) {
[NSThread sleepForTimeInterval:1];
if (_tickets > 0) {
_tickets--;
NSLog(@"剩余票数= %ld, Thread:%@",_tickets,[NSThread currentThread]);
} else {
NSLog(@"票卖完了 Thread:%@",[NSThread currentThread]);
break;
}
}
}
}
探索synchronized的底层
研究陌生函数底层实现的几个方法:
- 断点调试查看汇编
- Clang查看底层编译代码
- Product->Perform Action->Assemble "main.m" 直接查看汇编
首先开启汇编调试,发现@synchronized
在执行过程中,会走底层的objc_sync_enter
和 objc_sync_exit
方法如图:
通过clang,查看底层编译代码如下:
// 终端 cd 到main.m文件 所在文件夹然后 输入:(注意别留多余空格)
xcrun -sdk iphonesimulator clang -arch x86_64 -rewrite-objc main.m
得到 main.cpp 相关代码如下:
int main(int argc, char * argv[]) {
NSString * appDelegateClassName;
/* @autoreleasepool */ { __AtAutoreleasePool __autoreleasepool;
appDelegateClassName = NSStringFromClass(((Class (*)(id, SEL))(void *)objc_msgSend)((id)objc_getClass("AppDelegate"), sel_registerName("class")));
{ id _rethrow = 0; id _sync_obj = (id)appDelegateClassName; objc_sync_enter(_sync_obj);
try {
struct _SYNC_EXIT { _SYNC_EXIT(id arg) : sync_exit(arg) {}
~_SYNC_EXIT() {
objc_sync_exit(sync_exit);
}
id sync_exit;
} _sync_exit(_sync_obj);
} catch (id e) {_rethrow = e;}
{ struct _FIN { _FIN(id reth) : rethrow(reth) {}
~_FIN() { if (rethrow) objc_exception_throw(rethrow); }
id rethrow;
} _fin_force_rethow(_rethrow);}
}
}
return UIApplicationMain(argc, argv, __null, appDelegateClassName);
}
static struct IMAGE_INFO { unsigned version; unsigned flag; } _OBJC_IMAGE_INFO = { 0, 2 };
可以很清晰的看到objc_sync_enter
&objc_sync_exit
查看这个代码结构是不是与信号量的代码结构很像,一个上锁一个解锁呢?所以我们猜想@ synchronized 是不是对于某种上锁解锁的封装呢??
通过objc_sync_enter添加符号断点,查找源码库如下:
来到objc_sync_enter
源码如下:
// Begin synchronizing on 'obj'.
// Allocates recursive mutex associated with 'obj' if needed.
// Returns OBJC_SYNC_SUCCESS once lock is acquired.
翻译:
//在obj上开始同步。
//如果需要,分配与'obj'关联的递归互斥量。
//获得锁后返回OBJC_SYNC_SUCCESS。
int objc_sync_enter(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {//传入不为nil
SyncData* data = id2data(obj, ACQUIRE);//重点
ASSERT(data);
data->mutex.lock();//加锁
} else {//传入nil
// @synchronized(nil) does nothing
if (DebugNilSync) {
_objc_inform("NIL SYNC DEBUG: @synchronized(nil); set a breakpoint on objc_sync_nil to debug");
}
objc_sync_nil();
}
return result;
}
- 如果obj存在,则通过id2data方法获取相应的SyncData,对threadCount、lockCount进行递增操作;
- 如果obj不存在,则调用objc_sync_nil ,什么也没做。
进入objc_sync_exit
源码如下:
// End synchronizing on 'obj'. 结束对“ obj”的同步
// Returns OBJC_SYNC_SUCCESS or OBJC_SYNC_NOT_OWNING_THREAD_ERROR
int objc_sync_exit(id obj)
{
int result = OBJC_SYNC_SUCCESS;
if (obj) {//obj不为nil
SyncData* data = id2data(obj, RELEASE);
if (!data) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
} else {
bool okay = data->mutex.tryUnlock();//解锁
if (!okay) {
result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
}
}
} else {//obj为nil时,什么也不做
// @synchronized(nil) does nothing
}
return result;
}
- 如果
obj
存在,则调用id2data
方法获取对应的SyncData
,对threadCount
、lockCount
进行递减操作 - 如果
obj
为nil
,什么也不做
通过上面两个实现逻辑的对比,发现它们有一个共同点,在obj存在时,都会通过id2data
方法,获取SyncData
类型的对象,我们来查看一下SyncData
是啥?
typedef struct alignas(CacheLineSize) SyncData {
struct SyncData* nextData;//类似链表结构
DisguisedPtr<objc_object> object;
int32_t threadCount; // number of THREADS using this block
recursive_mutex_t mutex;//递归锁
} SyncData;
进入SyncData
的定义,是一个结构体,主要用来表示一个线程data
,类似于链表结构
,有next指向,且封装了recursive_mutex_t
属性,可以确认@synchronized
确实是一个递归互斥锁
。(recursive :递归的 ,mutex:互斥锁)
id2data 分析
进入id2data
源码 ,从objc_sync_enter
和objc_sync_exit
的调用 可以看出,这个方法是加锁和解锁都复用的方法
-
id2data
中有SyncCache
,我们先看下SyncCache
是什么?
进入SyncCache
的定义,也是一个结构体,用于存储线程
,其中list[0]
表示当前线程的链表data
,主要用于存储SyncData
和lockCount
typedef struct SyncCache {
unsigned int allocated;
unsigned int used;
SyncCacheItem list[0];
} SyncCache;
typedef struct {
SyncData *data;
unsigned int lockCount; // number of times THIS THREAD locked this block 此线程锁定此块的次数
} SyncCacheItem;
进入id2data
源码如下:
static SyncData* id2data(id object, enum usage why)
{
spinlock_t *lockp = &LOCK_FOR_OBJ(object);
SyncData **listp = &LIST_FOR_OBJ(object);
SyncData* result = NULL;
// ------------ 第一阶段 :快速从缓存中查找
#if SUPPORT_DIRECT_THREAD_KEYS //tls(Thread Local Storage,本地局部的线程缓存)
// Check per-thread single-entry fast cache for matching object
bool fastCacheOccupied = NO;
//通过KVC方式对线程进行获取 线程绑定的data
SyncData *data = (SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY);
//如果线程缓存中有data,执行if流程
if (data) {
fastCacheOccupied = YES;
//如果在线程空间找到了data
if (data->object == object) {
// Found a match in fast cache.
uintptr_t lockCount;
result = data;
//通过KVC获取lockCount,lockCount用来记录 被锁了几次,即 该锁可嵌套
lockCount = (uintptr_t)tls_get_direct(SYNC_COUNT_DIRECT_KEY);
if (result->threadCount <= 0 || lockCount <= 0) {
_objc_fatal("id2data fastcache is buggy");
}
switch(why) {
case ACQUIRE: {
//objc_sync_enter走这里,传入的是ACQUIRE -- 获取
lockCount++;//通过lockCount判断被锁了几次,即表示 可重入(递归锁如果可重入,会死锁)
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);//设置
break;
}
case RELEASE:
//objc_sync_exit走这里,传入的why是RELEASE -- 释放
lockCount--;
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)lockCount);
if (lockCount == 0) {
// remove from fast cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
#endif
// ------------ 第二阶段 :线程缓存中查找
// Check per-thread cache of already-owned locks for matching object
SyncCache *cache = fetch_cache(NO);//判断缓存中是否有该线程
//如果cache中有,方式与线程缓存一致
if (cache) {
unsigned int I;
for (i = 0; i < cache->used; i++) {//遍历总表
SyncCacheItem *item = &cache->list[I];
if (item->data->object != object) continue;
// Found a match.
result = item->data;
if (result->threadCount <= 0 || item->lockCount <= 0) {
_objc_fatal("id2data cache is buggy");
}
switch(why) {
case ACQUIRE://加锁
item->lockCount++;
break;
case RELEASE://解锁
item->lockCount--;
if (item->lockCount == 0) {
// remove from per-thread cache 从cache中清除使用标记
cache->list[i] = cache->list[--cache->used];
// atomic because may collide with concurrent ACQUIRE
OSAtomicDecrement32Barrier(&result->threadCount);
}
break;
case CHECK:
// do nothing
break;
}
return result;
}
}
// Thread cache didn't find anything.
// Walk in-use list looking for matching object
// Spinlock prevents multiple threads from creating multiple
// locks for the same new object.
// We could keep the nodes in some hash table if we find that there are
// more than 20 or so distinct locks active, but we don't do that now.
//第一次进来,所有缓存都找不到
lockp->lock();
// ------------ 第三阶段 :遍历所有线程查找
{
SyncData* p;
SyncData* firstUnused = NULL;
for (p = *listp; p != NULL; p = p->nextData) {//cache中已经找到
if ( p->object == object ) {//如果不等于空,且与object相似
result = p;//赋值
// atomic because may collide with concurrent RELEASE
OSAtomicIncrement32Barrier(&result->threadCount);//对threadCount进行++
goto done;
}
if ( (firstUnused == NULL) && (p->threadCount == 0) )
firstUnused = p;
}
// no SyncData currently associated with object 没有与当前对象关联的SyncData
if ( (why == RELEASE) || (why == CHECK) )
goto done;
// an unused one was found, use it 第一次进来,没有找到
if ( firstUnused != NULL ) {
result = firstUnused;
result->object = (objc_object *)object;
result->threadCount = 1;
goto done;
}
}
// Allocate a new SyncData and add to list.
// XXX allocating memory with a global lock held is bad practice,
// might be worth releasing the lock, allocating, and searching again.
// But since we never free these guys we won't be stuck in allocation very often.
posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));//创建赋值
result->object = (objc_object *)object;
result->threadCount = 1;
new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
result->nextData = *listp;
*listp = result;
// ------------ 第四阶段 :错误异常报错 崩溃处理,正确存入缓存
done:
lockp->unlock();
if (result) {
// Only new ACQUIRE should get here.
// All RELEASE and CHECK and recursive ACQUIRE are
// handled by the per-thread caches above.
if (why == RELEASE) {
// Probably some thread is incorrectly exiting
// while the object is held by another thread.
return nil;
}
if (why != ACQUIRE) _objc_fatal("id2data is buggy");
if (result->object != object) _objc_fatal("id2data is buggy");
#if SUPPORT_DIRECT_THREAD_KEYS
if (!fastCacheOccupied) { //判断是否支持栈存缓存,支持则通过KVC形式赋值 存入tls
// Save in fast thread cache
tls_set_direct(SYNC_DATA_DIRECT_KEY, result);
tls_set_direct(SYNC_COUNT_DIRECT_KEY, (void*)1);//lockCount = 1
} else
#endif
{
// Save in thread cache 缓存中存一份
if (!cache) cache = fetch_cache(YES);//第一次存储时,对线程进行了绑定
cache->list[cache->used].data = result;
cache->list[cache->used].lockCount = 1;
cache->used++;
}
}
return result;
}
一共分为三步进行查找和处理:
【第一步】如果支持
快速缓存
,就从快速缓存
中读取线程和任务
,进行相应操作并返回。
【第二步】快速缓存
没找到,就从线程缓存
中读取线程和任务
,进行相应操作并返回。
【第三步】线程缓存
也没找到,就循环遍历一个个线程和任务
,进行相应操作并跳到done。
【Done】 如果错误
: 异常报错。如果正确
,就存入快速缓存和线程缓存中,便于下次查找。其中【相应操作】包括三种状态
1 .ACQUIRE
进行中: 当前线程内任务数加1,更新相应数据
2 .RELEASE
释放中: 当前线程内任务数减1,更新相应数据
3 .CHECK
检查: 啥也不干
补充: 每个被锁的object对象
可拥有一个或多个线程。
(我们寻找线程前,都需先判断当前线程的持有对象object
是否与锁对象objec
一致)
其中fetch_cache
函数,是进行缓存查询
和开辟存储
的:
static SyncCache *fetch_cache(bool create)
{
_objc_pthread_data *data;
//获取数据
data = _objc_fetch_pthread_data(create);
if (!data) return NULL;
//如果没有缓存
if (!data->syncCache) {
//不创建
if (!create) {
return NULL;
} else {
int count = 4;
//创建线程初始化存储空间
data->syncCache = (SyncCache *)
calloc(1, sizeof(SyncCache) + count*sizeof(SyncCacheItem));
data->syncCache->allocated = count;
}
}
// Make sure there's at least one open slot in the list.
//扩容操作
if (data->syncCache->allocated == data->syncCache->used) {
data->syncCache->allocated *= 2;
data->syncCache = (SyncCache *)
realloc(data->syncCache, sizeof(SyncCache)
+ data->syncCache->allocated * sizeof(SyncCacheItem));
}
// 返回找到的缓存
return data->syncCache;
}
注
create
为NO
: 仅查询
create
为YES
: 查询并开辟/扩容内存
总结 :锁对象内部关联逻辑图 ,递归互斥原理一目了然
疑问解答:
1 . @synchronized为什么锁定对象写self?
因为被锁对象不能提前释放,会触发解锁操作,锁内代码不安全。
【补充】
当对象被释放时,调用objc_sync_enter和objc_sync_exit,底层代码显示:啥也不会做。这把锁已经完全失去作用了。
- 为什么@synchronized耗时严重?
因为对象被锁后(比如self),该对象的所有操作,都变成了加锁操作,为了确保锁内代码安全,我们锁了对象(比如self)的所有操作。
最直接的影响是,被锁线程变多,执行操作时,查找线程和查找任务都变得很耗时,而且每个被锁线程内的任务还是递归持有,更耗时。
【补充】
我们查询任务时,可能经历3次查询(快速缓存查询->线程缓存查询->遍历所有线程查询),需要寻找线程、匹配被锁对象,nextData递归寻找任务。这些,就是耗时的点。
(self需要处理的事务越多,占有的线程数threadCount和每个线程内的锁数量lockCount都会越多,查询也更耗时。)
慎用@synchronized(self)
举个例子:
- (void)cjl_testSync{
_testArray = [NSMutableArray array];
for (int i = 0; i < 200000; i++) {
dispatch_async(dispatch_get_global_queue(0, 0), ^{
@synchronized (self.testArray) {
self.testArray = [NSMutableArray array];
}
});
}
}
运行结果发现,运行就崩溃
崩溃的主要原因是testArray在某一瞬间变成了nil,从@synchronized底层流程知道,如果加锁的对象成了nil,是锁不住的,相当于下面这种情况,block内部不停的retain、release,会在某一瞬间上一个还未release,下一个已经准备release,这样会导致野指针的产生
我们一般使用@synchronized (self),主要是因为_testArray的持有者是self
在书写@synchronized内部代码的时候,要十分小心内部隐蔽的函数调用。
结论:
- @synchronized在底层封装的是一把递归锁,所以这个锁是递归互斥锁;
- synchronized中传入的object的内存地址,被用作key,通过hash map对应的一个系统维护的递归锁;
- @synchronized的可重入,即可嵌套,主要是由于lockCount 和 threadCount的搭配;
- @synchronized使用链表的原因是链表方便下一个data的插入;
- 但是由于底层中链表查询、缓存的查找以及递归,是非常耗内存以及性能的,导致性能低;
- 但是目前该锁的使用频率仍然很高,主要是因为方便简单,且不用解锁;
- 不能使用非OC对象作为加锁对象,因为其object的参数为id;
- @synchronized (self)这种适用于嵌套次数较少的场景。这里锁住的对象也并不永远是self,这里需要读者注意;
- 如果锁嵌套次数较多,即锁self过多,会导致底层的查找非常麻烦,因为其底层是链表进行查找,所以会相对比较麻烦,所以此时可以考虑使用信号量,线程同步函数等等;
三、 总结
Dispatch_source & @Synchronized 其实开发中调用其实比较简单,但是需要注意的点还是有很多,知其所以然才能运用得当,今天暂时就研究到这里,接下来再去研究其他几种锁。