前言
iOS 底层第24
天的学习。 今天开始继续分析 GCD
底层源码。
首先先把上一次留的一个疑问点 dispatch_lane_t
❓ 先给解决掉
dispatch_lane_t
全局搜索 dispatch_lane_t
typedef struct dispatch_lane_s {
DISPATCH_LANE_CLASS_HEADER(lane);
/* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN *dispatch_lane_t;
dispatch_lane_t
来自于 dispatch_lane_s
进入 DISPATCH_LANE_CLASS_HEADER
#define DISPATCH_LANE_CLASS_HEADER(x) \
struct dispatch_queue_s _as_dq[0]; \
DISPATCH_QUEUE_CLASS_HEADER(x, \
struct dispatch_object_s *volatile dq_items_tail); \
dispatch_unfair_lock_s dq_sidelock; \
struct dispatch_object_s *volatile dq_items_head; \
uint32_t dq_side_suspend_cnt
发现 dispatch_lane_s
继承 dispatch_queue_s
,这下明白了在底层还是会来到 dispatch_queue_s
,只是对其做了一层封装而言
dispatch_sync & dispatch_async
在上一次学习我们已经探索了 dispatch_async 流程 , dispatch_sync 流程 。但有些细节还未知。今天我们来开始具体分析。
dispatch_sync + 串行队列流程
-> dispatch_sync
-> _dispatch_sync_f
-> _dispatch_sync_f_inline
-> _dispatch_sync_f_slow
-> _dispatch_sync_function_invoke
-> _dispatch_client_callout
-> return f(ctxt);
dispatch_sync 死锁原理
回到 _dispatch_sync_f_inline
开始探索 死锁
原理,
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
把会引发死锁的程序运行起来,添加符号断点 _dispatch_barrier_sync_f
进入 _dispatch_barrier_sync_f
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
进入 _dispatch_barrier_sync_f_inline
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
把程序继续往下走触发死锁异常👇
进入 _dispatch_sync_f_slow
,找到 __DISPATCH_WAIT_FOR_QUEUE__
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
// 异常核心
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
// ....
}
进入 __DISPATCH_WAIT_FOR_QUEUE__
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
}
终于找到了触发死锁异常的原因,分析下底层是如何进行判断的
分析1: dq_state = _dispatch_wait_prepare(dq) ❓
=> dq_state
当前队列的一个等待状态
分析2: dsc->dsc_waiter ❓
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_tid_self() ❓
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())
dsc->dsc_waiter
=> 线程 id
分析3: _dq_state_drain_locked_by() ❓
进入 _dq_state_drain_locked_by
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
进入 _dispatch_lock_is_locked_by
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
开始分析:
假设 ((lock_value ^ tid) & DLOCK_OWNER_MASK)
= x
当 x == 0 时 返回 true
这是就会死锁报错
分析 x
已知 DLOCK_OWNER_MASK
是一个很大的值
#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc)
((lock_value ^ tid)
& 一个很大的值
== 0
=>((lock_value ^ tid)
= 0
=>lock_value
==tid
=>lock_value
与 tid
相同,就会死锁报异常
小结:
-
lock_value
= 当前队列的状态 -
tid
= 当前线程 id
当前队列里已经有一条线程在等待了(lock_value
),又去调用这条线程(tid
) ,这就导致了冲突引发导致死锁
dispatch_sync + 全局并发队列
再次回到 _dispatch_sync_f_inline
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
调试代码👇
// dispatch_sync + 全局并发
dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
dispatch_sync(queue, ^{
NSLog(@" 函数分析");
})
把程序运行起来 ,添加符号断点👇
继续下一步来到 _dispatch_sync_f_slow
进入 _dispatch_sync_f_slow
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
继续添加符号断点👇
继续下一步来到 👇
进入_dispatch_sync_function_invoke
-> _dispatch_client_callout
-> return f(ctxt);
小结:
dispatch_sync + 全局并发行队列流程
-> dispatch_sync
-> _dispatch_sync_f
-> _dispatch_sync_f_inline
-> _dispatch_sync_f_slow
-> _dispatch_sync_function_invoke
-> _dispatch_client_callout
-> return f(ctxt);
dispatch_sync + 自定义并行队列
继续往下走👇
进入 _dispatch_sync_invoke_and_complete
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_lane_non_barrier_complete(dq, 0);
}
进入 _dispatch_sync_function_invoke_inline
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
小结:
dispatch_sync + 自定义并发行队列流程
-> dispatch_sync
-> _dispatch_sync_f
-> _dispatch_sync_f_inline
-> _dispatch_sync_invoke_and_complete
-> _dispatch_client_callout
-> return f(ctxt);
dispatch_async + 全局并行队列
上一次我们已经分析过了 dispatch_async 流程
-> dispatch_async
-> _dispatch_continuation_async
-> dq_push
-> if 全局并发队列 -> dq_push == _dispatch_root_queue_push
-> _dispatch_root_queue_push
-> _dispatch_root_queue_push_inline
-> _dispatch_root_queue_poke
-> _dispatch_root_queues_init
-> _dispatch_root_queues_init_once
-> _dispatch_worker_thread2
-> _dispatch_root_queue_drain
-> _dispatch_continuation_invoke_inline
-> _dispatch_client_callout
-> return f(ctxt);
-> Else if 并发队列 -> dq_push == _dispatch_lane_concurrent_push
-> _dispatch_lane_concurrent_push
-> ❓
dispatch_async + 自定义并行队列
开始探索_dispatch_lane_concurrent_push ->
❓
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
添加符号断点👇
继续下一步 👇
进入 _dispatch_continuation_redirect_push
if (likely(!_dispatch_object_is_redirection(dou))) {
dou._dc = _dispatch_async_redirect_wrap(dl, dou);
} else if (!dou._dc->dc_ctxt) {
// find first queue in descending target queue order that has
// an autorelease frequency set, and use that as the frequency for
// this continuation.
dou._dc->dc_ctxt = (void *)
(uintptr_t)_dispatch_queue_autorelease_frequency(dl);
}
dispatch_queue_t dq = dl->do_targetq;
if (!qos) qos = _dispatch_priority_qos(dq->dq_priority);
// 又回来了
dx_push(dq, dou, qos);
这时有点晕,又回到了 dq_push
之后往哪跳全然不知了,用了最傻的办法
就是把 所有 dq_push
的跳转都加上了符号断点
继续下一步发现来到了 _dispatch_root_queue_push
下面的流程就跟全局并发队列是一样。
小结: dispatch_async + 自定义并行流程
-> dispatch_async
-> _dispatch_continuation_async
-> dq_push
-> _dispatch_lane_concurrent_push
-> _dispatch_continuation_redirect_push
这里会有个跳转
-> dq_push
再一次来到 dq_push
-> _dispatch_root_queue_push
-> _dispatch_root_queue_push_inline
-> _dispatch_root_queue_poke
-> _dispatch_root_queues_init
-> _dispatch_root_queues_init_once
-> _dispatch_worker_thread2
-> _dispatch_root_queue_drain
-> _dispatch_continuation_invoke_inline
-> _dispatch_client_callout
-> return f(ctxt);
dispatch_once_f
回到 _dispatch_root_queues_init
来分析下 dispatch_once_f
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
一般定义单例代码👇
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
<#code to be executed once#>
});
看一下 dispatch_once
底层源码
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
dispatch_once
单例底层的实现就是 dispatch_once_f
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
// 进行 val 的 强转转换 -> l
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
分析代码 dispatch_once_f
里有四个流程,我们一个个开始分析
流程1:
// 如果已经执行一次了,就 return
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
流程2:
当前已经执行过一次,标记为 done ,进行返回
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
流程3:
// 一次就没有执行,进行处理
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
进入 _dispatch_once_gate_tryenter
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
// 原子操作
// _dispatch_lock_value_for_self 进行锁处理,防止线程不安全
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
进入 _dispatch_once_callout
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
dispatch_function_t func)
{
// 回到执行任务处理
_dispatch_client_callout(ctxt, func);
// 发起广播,进行标记
_dispatch_once_gate_broadcast(l);
}
进入 _dispatch_once_gate_broadcast
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
// 标记为 done,下次就不会进来直接 return
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
流程4:
// 一直等待,等待开锁
return _dispatch_once_wait(l);
小结:单例底层原理
在你任务一次都没有执行过的时候,加锁执行任务,执行完后设置一个 done
标识。
当你第二次进来的时候 根据 done
标识进行判断,如果已经执行过了 就 return
不执行了。
还有种情况:在你执行第一次任务还没完成,又有一个任务进来就会锁在门外,一直等待。所以单例是线程安全的
总结
今天主要分享了👇
dispatch_sync 死锁在底层是如何触发的?
队列里已经有一条线程在等待了,又去调用这条线程。这就导致了冲突引发导致死锁dispatch_async + 全局并行和自定义并行流程探索
探索结果就是它们最终都会来到
->_dispatch_root_queues_init
->_dispatch_root_queues_init_once
->_dispatch_worker_thread2
而_dispatch_worker_thread2
调用pthread
去执行任务dispatch_once 单例在底层是如何实现的?
简单理解就是加一个done
标识进行了判断,做过了就不会再做了。并且还了解到了单例的执行是线程安全的,在底层做了加锁的处理
_dispatch_sync_invoke_and_complete 细节补充
发现
func
后没有,
❓
进入 DISPATCH_TRACE_ARG
#define DISPATCH_TRACE_ARG(arg) , arg
发现 ,
定义了在了宏里面
进入 _dispatch_sync_invoke_and_complete
发现在接收参数的时候也定义了一个宏 DISPATCH_TRACE_ARG
,传参和入参进行了统一的处理。