前言
iOS 底层第25
天的学习。今天将迎来GCD
底层原理探索的最后一个篇章,将要学习的内容主要有 dispatch_barrier_async
栅栏函数底层源码的探索以及信号量等。
dispatch_barrier_(a)sync
不知你是否在开发过程中会用到 dispatch_barrier_(a)sync
,我们先来看一下它的定义
栅栏函数:主要是控制任务的执行顺序,同步。
dispatch_barrier_sync:前面的任务执行完毕,才会执行 dispatch_barrier_sync
,但会堵塞线程,影响当前线程的执行。
dispatch_barrier_async: 前面的任务执行完毕,才会执行 dispatch_barrier_async
里面的任务
dispatch_barrier_(a)sync 应用
dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
sleep(1);
NSLog(@">>>>>>> 执行任务1");
});
dispatch_async(concurrentQueue, ^{
NSLog(@">>>>>>> 执行任务2");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_async(concurrentQueue, ^{
NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
});
dispatch_async(concurrentQueue, ^{
sleep(2);
NSLog(@">>>>>>> 执行任务3");
});
NSLog(@">>>>>>> 执行任务4");
打印输出👇
验证了一下:栅栏函数任务 会在 任务1
和 任务2
之后进行执行
把 dispatch_barrier_async
改成 dispatch_barrier_sync
打印输出👇
改成 dispatch_barrier_sync
后,栅栏函数任务 会在 任务1
和 任务2
之后进行执行,但会堵塞之后的 任务3
和 任务4
.
这里留下一个疑问点:栅栏函数是如何让
任务1
和任务2
执行完之后,再去执行其内部的任务的呢 ❓
我们把 queue
改成 global_queue
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
dispatch_async(concurrentQueue, ^{
sleep(1);
NSLog(@">>>>>>> 执行任务1");
});
dispatch_async(concurrentQueue, ^{
NSLog(@">>>>>>> 执行任务2");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
});
dispatch_async(concurrentQueue, ^{
sleep(2);
NSLog(@">>>>>>> 执行任务3");
});
NSLog(@">>>>>>> 执行任务4");
输出 👇
发现了在全局并行队列里,栅栏函数没起作用
为何
dispatch_barrier_sync
在全局并行队列无效呢❓
在修改一下代码👇 , 验证一下在不同的列队,栅栏函数是否有效
dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
// 新增 concurrentQueue2
dispatch_queue_t concurrentQueue2 = dispatch_queue_create("xkQueue2", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
sleep(1);
NSLog(@">>>>>>> 执行任务1");
});
dispatch_async(concurrentQueue, ^{
NSLog(@">>>>>>> 执行任务2");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_sync(concurrentQueue2, ^{
NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
});
dispatch_async(concurrentQueue, ^{
sleep(2);
NSLog(@">>>>>>> 执行任务3");
});
NSLog(@">>>>>>> 执行任务4");
打印输出👇
验证结果:栅栏函数在不同的队列里是无效的。
小结
栅栏函数要起到作用,必须在同一个队列并且当前队列不是全局的。
还留下了2个问题
- 为何全局并发无效❓
- 为何栅栏函数能让前面的任务执行,再执行自己的任务❓
会在下面的底层探索里进行解答
dispatch_barrier_(a)sync 底层探索
进入源码,全局搜索 dispatch_barrier_sync
void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
进入 _dispatch_barrier_sync_f
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
进入 _dispatch_barrier_sync_f_inline
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
添加符号断点 _dispatch_sync_f_slow
& _dispatch_sync_recurse
进入 _dispatch_sync_f_slow
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
发现 dispatch_barrier_sync
和 dispatch_sync
都会来到 _dispatch_sync_f_slow
,也会来到 __DISPATCH_WAIT_FOR_QUEUE__
死锁的报错
继续添加符号断点👇
怎么直接来到了
_dispatch_sync_invoke_and_complete_recurse
这时感觉很奇怪 :跳了步骤了,怎么就直接完成了呢?
肯定是哪里出了问题 符号断点没断住
我们带着疑问回到 _dispatch_sync_invoke_and_complete_recurse
之前的调用的函数 _dispatch_sync_complete_recurse
直接进入 _dispatch_sync_complete_recurse
static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
uintptr_t dc_flags)
{
bool barrier = (dc_flags & DC_FLAG_BARRIER);
do {
if (dq == stop_dq) return;
// 如果是栅栏就调用 dx_wakeup
if (barrier) {
dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
} else {
_dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
}
dq = dq->do_targetq;
barrier = (dq->dq_width == 1);
} while (unlikely(dq->do_targetq));
}
看代码可知 do while
死循环 if (barrier) 有栅栏
调用 dx_wake
唤醒
else 没有栅栏
,调用 _dispatch_lane_non_barrier_complete
我们在之前已经知道了dx_wakeup
-> dq_wakeup
全局 搜索 dq_wakeup
// 并发
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
// ....
.dq_wakeup = _dispatch_lane_wakeup,
);
// 全局并发
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
// ....
.dq_wakeup = _dispatch_root_queue_wakeup,
);
进行验证添加符号断点 _dispatch_lane_wakeup
解决了疑问点: 发现在执行 _dispatch_sync_invoke_and_complete_recurse
之前会先来到 _dispatch_sync_complete_recurse
->_dispatch_lane_wakeup
进入 _dispatch_lane_wakeup
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
符号断点会来到 _dispatch_queue_wakeup
调度队列唤醒任务
当队列任务唤醒完以后 还会来到 _dispatch_lane_non_barrier_complete
添加符号断点进行验证
进入
_dispatch_lane_non_barrier_complete
static void
_dispatch_lane_non_barrier_complete(dispatch_lane_t dq,
dispatch_wakeup_flags_t flags)
{
uint64_t old_state, new_state, owner_self = _dispatch_lock_value_for_self();
// see _dispatch_lane_resume()
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
if (unlikely(_dq_state_drain_locked(old_state))) {
// make drain_try_unlock() fail and reconsider whether there's
// enough width now for a new item
new_state |= DISPATCH_QUEUE_DIRTY;
} else if (likely(_dq_state_is_runnable(new_state))) {
new_state = _dispatch_lane_non_barrier_complete_try_lock(dq,
old_state, new_state, owner_self);
}
});
_dispatch_lane_non_barrier_complete_finish(dq, flags, old_state, new_state);
}
进入 _dispatch_lane_non_barrier_complete_finish
static void
_dispatch_lane_non_barrier_complete_finish(dispatch_lane_t dq,
dispatch_wakeup_flags_t flags, uint64_t old_state, uint64_t new_state)
{. // ...
if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
if (_dq_state_is_dirty(old_state)) {
// <rdar://problem/14637483>
// dependency ordering for dq state changes that were flushed
// and not acted upon
os_atomic_thread_fence(dependency);
dq = os_atomic_inject_dependency(dq, (unsigned long)old_state);
}
return _dispatch_lane_barrier_complete(dq, 0, flags);
}
// ...
}
添加符号断点 _dispatch_lane_barrier_complete
进入 _dispatch_lane_barrier_complete
static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
dispatch_lane_t dq = dqu._dl;
if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
if (_dispatch_object_is_waiter(dc)) {
return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
}
} else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
return _dispatch_lane_drain_non_barriers(dq, dc, flags);
}
if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}
最后进入 _dispatch_lane_class_barrier_complete
static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
uint64_t owned)
{
uint64_t old_state, new_state, enqueue;
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
tq = _dispatch_mgr_q._as_dq;
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
} else if (target) {
tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
enqueue = DISPATCH_QUEUE_ENQUEUED;
} else {
tq = NULL;
enqueue = 0;
}
// ....
}
_dispatch_lane_class_barrier_complete
里面主要做了对栅栏函数的清空处理,让队列中下面的任务不受影响
再次添加 _dispatch_sync_invoke_and_complete_recurse
符号断点,进行验证
进入 _dispatch_sync_invoke_and_complete_recurse
static void
_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
void *ctxt, dispatch_function_t func, uintptr_t dc_flags
DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
}
进入 _dispatch_sync_function_invoke_inline
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func); // callout
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
小结
根据 dispatch_barrier_sync 底层
探索得知👇
我探索得出的结论是这样的一个流程,其实我也不确定真假,或许这只是 dispatch_barrier_sync
一种情况而已.
梳理一下把疑问点:为何栅栏函数能让前面的任务执行完毕,再执行自己的任务
在调用dispatch_barrier_sync
会进入一个死循环,当栅栏函数前面的任务都唤醒完毕后会进入_dispatch_lane_non_barrier_complete
里面,目的是为了删除栅栏,删除完栅栏跳出循环发起 callout
再去执行栅栏函数里任务。
而删除栅栏的目的就为了不影响后面任务的执行。
探索的代码👇
dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
dispatch_async(concurrentQueue, ^{
sleep(10);
NSLog(@">>>>>>> 执行任务1");
});
dispatch_async(concurrentQueue, ^{
sleep(10);
NSLog(@">>>>>>> 执行任务2");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
});
回到 _dispatch_sync_f_slow
把疑问点:为何全局并发设置栅栏函数无效给解决了
代码👇
dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
dispatch_async(concurrentQueue, ^{
sleep(10);
NSLog(@">>>>>>> 执行任务1");
});
dispatch_async(concurrentQueue, ^{
sleep(10);
NSLog(@">>>>>>> 执行任务2");
});
/* 2. 栅栏函数 */ // - dispatch_barrier_sync
dispatch_barrier_sync(concurrentQueue, ^{
NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
});
添加符号断点
发现全局并发直接来到了 _dispatch_sync_function_invoke
进入 _dispatch_sync_function_invoke
DISPATCH_NOINLINE
static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
}
进入 _dispatch_sync_function_invoke_inline
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
小结:全局并发队列根本没有做 barrier
的处理 ,而是直接调用了 _dispatch_sync_function_invoke_inline
-> _dispatch_client_callout
所以栅栏函数无效
dispatch_semaphore_t
dispatch_semaphore_t
它是多元信号量,允许多个线程并发访问资源。
dispatch_semaphore_t 应用
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
// 创建信号量 ,允许通过最大并发数为 1
dispatch_semaphore_t sem = dispatch_semaphore_create(1);
//任务1
dispatch_async(queue, ^{
// 等待
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
NSLog(@"执行任务1");
NSLog(@"任务1完成");
// 发信号
dispatch_semaphore_signal(sem);
});
//任务2
dispatch_async(queue, ^{
// 等待
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
sleep(2);
NSLog(@"执行任务2");
NSLog(@"任务2完成");
// 发信号
dispatch_semaphore_signal(sem);
});
输出👇
dispatch_semaphore_wait 同步
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_semaphore_t sem = dispatch_semaphore_create(0);
//任务1
dispatch_async(queue, ^{
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
NSLog(@"执行任务1");
NSLog(@"任务1完成");
});
//任务2
dispatch_async(queue, ^{
sleep(2);
NSLog(@"执行任务2");
NSLog(@"任务2完成");
dispatch_semaphore_signal(sem); // 发信号
});
输出👇
dispatch_semaphore_wait
是如何控制同步的呢?当任务2执行后,为何当发送一个signal
就能执行任务1 呢?接下来我们带着疑问开始对dispatch_semaphore_t
底层原理进行探索。
dispatch_semaphore_t 底层原理
进入源码,全局搜索 dispatch_semaphore_wait
intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
// 对 dsema_value 进行 -1 操作
long value = os_atomic_dec2o(dsema, dsema_value, acquire);
if (likely(value >= 0)) {
return 0;
}
return _dispatch_semaphore_wait_slow(dsema, timeout);
}
情况1:dsema_value
进行 -1
后 value >= 0
, 直接 return 0
情况2:dsema_value = 0
,进行 -1
后 value < 0
, 就会执行 _dispatch_semaphore_wait_slow
进入 _dispatch_semaphore_wait_slow
static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
dispatch_time_t timeout)
{
long orig;
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
switch (timeout) {
default:
if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
break;
}
// Fall through and try to undo what the fast path did to
// dsema->dsema_value
case DISPATCH_TIME_NOW:
orig = dsema->dsema_value;
while (orig < 0) {
if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
&orig, relaxed)) {
return _DSEMA4_TIMEOUT();
}
}
// Another thread called semaphore_signal().
// Fall through and drain the wakeup.
case DISPATCH_TIME_FOREVER:
_dispatch_sema4_wait(&dsema->dsema_sema);
break;
}
return 0;
}
timeout = DISPATCH_TIME_NOW
,发出一个超时处理
timeout = DISPATCH_TIME_FOREVER
,进入 _dispatch_sema4_wait
void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
int ret = 0;
do {
ret = sem_wait(sema);
} while (ret == -1 && errno == EINTR);
DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}
发现在 dispatch_semaphore_wait
底层就是一个do while
循环,当创建信号量的 value < 0
时,就会进入等待状态
我们在全局搜索一下另一个函数 dispatch_semaphore_signal
intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
// os_atomic_inc2o 进行 +1 操作
long value = os_atomic_inc2o(dsema, dsema_value, release);
if (likely(value > 0)) {
return 0;
}
if (unlikely(value == LONG_MIN)) {
DISPATCH_CLIENT_CRASH(value,
"Unbalanced call to dispatch_semaphore_signal()");
}
return _dispatch_semaphore_signal_slow(dsema);
}
情况1: dsema_value
进行 +1
后 value > 0
, 直接 return 0
情况2: dsema_value
进行 +1
后 value == LONG_MIN
,抛出 crash
情况3: dsema_value
进行 +1
后 value <0 && value != LONG_MIN
进入 _dispatch_semaphore_signal_slow
ISPATCH_NOINLINE
intptr_t
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
// 创建
_dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
// 循环+1,直到能正常执行
_dispatch_sema4_signal(&dsema->dsema_sema, 1);
return 1;
}
_dispatch_semaphore_signal_slow
创建新信号量,循环+1,直到能正常执行.
小结
-
dispatch_semaphore_wait
将信号量的值-1
- 如果信号的值
<0
,则进入等待状态,否则继续执行。 -
dispatch_semaphore_signal
将信号量的值+1
, 如果信号的值>0
,唤起一个等待中的线程,否则就创建新信号量,循环+1,直到能正常执行
dispatch_group_t
dispatch_group_t 应用
//创建调度组
dispatch_group_t group = dispatch_group_create();
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
// 进组任务
dispatch_group_async(group, queue, ^{
NSLog(@"组任务1");
});
// // 方式1
// dispatch_group_async(group, queue, ^{
// NSLog(@"组任务2");
// });
// 方式2:
// 进组
dispatch_group_enter(group);
dispatch_async(queue, ^{
NSLog(@"组任务2");
});
// 出组
dispatch_group_leave(group);
//进组任务执行完毕通知
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"组任务1,2 执行完毕通知");
});
打印输出👇
dispatch_group_t 底层探索
先从创建组开始,全局搜索 dispatch_group_create
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
// 传入 0 进入 _dispatch_group_create_with_count
static inli ne dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
// 把 n 存进去
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
可知 dispatch_group_create
就是创建了一个信号标记为 0
再全局搜索一下 dispatch_group_enter
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
根据注解可知 dispatch_group_enter
把 value
值 从 0 -> -1
全局搜索 dispatch_group_leave
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
根据注解可知 dispatch_group_leave
把 value
值 从 -1 -> 0
old_state = -1
, old_value
=-1 & DISPATCH_GROUP_VALUE_MASK
=> old_value
= DISPATCH_GROUP_VALUE_MASK
=> if (old_state == new_state) break;
=> old_state = 0
=> _dispatch_group_wake
再来看一下 _dispatch_group_notify
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
从源码可知:
=> if old_state == 0
=> _dispatch_group_wake
只要状态 为 0 ,就会唤起组任务。
而在 dispatch_group_leave
也会唤起组任务
进入 _dispatch_group_wake
看一下,里面应该会有 callout
处理
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
进入 _dispatch_continuation_async
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
来到了 dx_push
就完全明白了,之后就是 callout
执行 block()
->dx_push
-> dq_push
-> _dispatch_root_queue_push
-> _dispatch_client_callout
小结:
dispatch_group_create
=> 创建了一个信号标识为 0
dispatch_group_enter
=> 信号标识从 0 -> -1
dispatch_group_leave
=> 信号标识从 -1 -> 0
,如果标识为0
唤起 _dispatch_group_wake
_dispatch_group_notify
如果标识为 0
唤起 _dispatch_group_wake
这里的精髓之处就是 dispatch_group_leave
和 _dispatch_group_notify
都
调用了 _dispatch_group_notify
。
一般来说只要在 _dispatch_group_notify
调用就可以了。
因为考虑到是异步操作,任务执行的时间会很长。在任务执行完毕后调用dispatch_group_leave
时,又做了一次 callout
。
dispatch_group_async
在上面应用的之后已知 dispatch_group_async
做了enter
和 leave
的封装,我们在底层源码中验证一下
全局搜索 dispatch_group_async
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
进入 _dispatch_continuation_group_async
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
在这里做了 dispatch_group_enter
操作
继续往下寻找到 dispatch_group_leave
进入 _dispatch_continuation_async
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
又看到 dx_push
,继续探索来到 _dispatch_continuation_with_group_invoke
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}
源码得知:
-> if (type == DISPATCH_GROUP_TYPE)
->callout
-> dispatch_group_leave
dispatch_Source_t 应用
用 dispatch_Source
里的 DISPATCH_SOURCE_TYPE_TIMER
定义了一个计时器的功能
核心代码
- (void) createTimer {
dispatch_queue_t queue = dispatch_queue_create("timeQueue", DISPATCH_QUEUE_CONCURRENT);
// 创建源
self.t_source = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,0,0, queue);
dispatch_time_t start = DISPATCH_TIME_NOW; //dispatch_walltime(NULL,0);
dispatch_source_set_timer(self.t_source,start,1.0*NSEC_PER_SEC,0);
__weak typeof(self) weakSelf = self;
// 设置源数据回调
dispatch_source_set_event_handler(self.t_source, ^{
__strong typeof(self) strongSelf = weakSelf;
strongSelf.seconds++;
dispatch_async(dispatch_get_main_queue(), ^{
// 更新 UI
strongSelf.secondLabel.text = [NSString stringWithFormat:@"%.f",strongSelf.seconds];
});
});
}
// 重置
- (void) stop {
if(self.t_source) {
if(_isSuspend){ // 已挂起
NSLog(@">>>>>> 已挂起,无法继续挂起");
return;
}
dispatch_suspend(self.t_source); // 挂起
_isSuspend = true;
[_sbutton setEnabled:YES];
[_sbutton setTitle:@"继续" forState:UIControlStateNormal];
}
}
用 dispatch_Source_t
作为计时器, 时间准确可以使用子线程,解决定时间跑在主线程上卡UI问题,且 CPU
负荷⾮常⼩,不占⽤资源。
使用注意点:
- 在
dispatch_source_set_event_handler
里要避免循环引用 - 在
dispatch_source_t
的suspend
和resume
要依次进行,不然会crash
,我在这里做了按钮状态的判断 - 在
resume
的状态下,才能执行cancel
,不然也会crash
最后附上 源码地址
总结
今天主要分享了
-
barrier
函数的原理。
简单理解: 就是一个死循环去唤醒队列里在栅栏函数前面的任务,当任务都唤醒完毕后再去执行栅栏函数里的任务, 之后再删除栅栏以免影响后面任务的执行。 -
dispatch_semaphore_t
底层原理。
dispatch_semaphore_wait
将信号量的值-1
如果信号的值<0
,则进入等待状态,否则继续执行。
dispatch_semaphore_signal
将信号量的值+1
, 如果信号的值>0
,唤起一个等待中的线程 -
dispatch_group_t
底层原理-
dispatch_group_create
=>state
=0
-
dispatch_group_enter
=>state
=>0 -> -1
-
dispatch_group_leave
=>state
=>-1 -> 0
,如果标识为0
唤起_dispatch_group_wake
=>callout
-
_dispatch_group_notify
如果标识为0
唤起_dispatch_group_wake
=>callout
-
- dispatch_Source_t 的应用和一些坑点。
知识点补充
可变数组线程安全
问:可变数组线程安全吗?
答:不安全
为什么不安全呢?
运行👇
- (void)demo4{
// 可变数组线程安全?
dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
// 多线程 操作marray
for (int i = 0; i<1000; i++) {
dispatch_async(concurrentQueue, ^{
NSString *name = [NSString stringWithFormat:@"%d", i];
[self.mArray addObject:name];
});
}
}
多线程同时对 可变数据 self.mArray
进行操作,最后 crash
了
为什么会 crash
了 ? 开始分析
假设 mArray
里已经有 2 个数据 1
和 2
, 要添加一个新的数据
多线程同时读
线程1 -> 读 mArray(1,2)
线程2 -> 读mArray(1,2)
多线程同时写
线程1 -> mArray
写 3
=> 写成功了 => mArray(1,2,3)
线程2 -> mArray(1,2)
写 4
假设线程1已经执行完了 ,但线程2不知道,还在对之前读的数据 mArray(1,2)
进行写入 4
=> mArray(1,2,4)
,这时就造成了数据的混乱,因为这时的 mArray(1,2,3)
已经有3
个数据了 。
小结:mArray
只有一片内存空间,多线程同时对同一片内存空间进行读写操作,不安全。