GCD函数

在本篇文章中,我们主要来探索GCD的两个函数dispatch_once 和 dispatch_sync,我们主要来探索两个问题:

  • 1,dispatch_once是如何保证只执行一次的?
  • 2,dispatch_sync当前串行队列目标串行队列时,为什么会死锁?

dispatch_once

我们通常使用 dispatch_once来创建单例,该函数如下

    static dispatch_once_t onceToken;
    dispatch_once(&onceToken, ^{
        NSLog(@"dddd");
    });

我们来看下其实现,在这里直接贴码了

void
_dispatch_once_f(dispatch_once_t *predicate, void *_Nullable context,
        dispatch_function_t function)
{
    if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
        dispatch_once_f(predicate, context, function); // libdispatch
    } else {
        dispatch_compiler_barrier();
    }
    DISPATCH_COMPILER_CAN_ASSUME(*predicate == ~0l);
}

dispatch_once_f函数在 libdispatch库中,我们在libdispatch库中,我们来看下其实现:

DISPATCH_NOINLINE
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
    dispatch_once_gate_t l = (dispatch_once_gate_t)val;

#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
    if (likely(v == DLOCK_ONCE_DONE)) {
        return;
    }
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    if (likely(DISPATCH_ONCE_IS_GEN(v))) {
        return _dispatch_once_mark_done_if_quiesced(l, v);
    }
#endif
#endif
    if (_dispatch_once_gate_tryenter(l)) {
        return _dispatch_once_callout(l, ctxt, func);
    }
    return _dispatch_once_wait(l);
}

static inline void
_dispatch_once_mark_done_if_quiesced(dispatch_once_gate_t dgo, uintptr_t gen)
{
    if (_dispatch_once_generation() - gen >= DISPATCH_ONCE_GEN_SAFE_DELTA) {
        /*
         * See explanation above, when the quiescing counter approach is taken
         * then this store needs only to be relaxed as it is used as a witness
         * that the required barriers have happened.
         */
        os_atomic_store(&dgo->dgo_once, DLOCK_ONCE_DONE, relaxed);
    }
}
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
    return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
            (uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}

_dispatch_once_gate_tryenter: 将 onceTokenDLOCK_ONCE_UNLOCKED ((uintptr_t)0)进行比较,如果onceToken!= DLOCK_ONCE_UNLOCKED,就加锁,并返回YES

DISPATCH_NOINLINE
static void
_dispatch_once_callout(dispatch_once_gate_t l, void *ctxt,
        dispatch_function_t func)
{
        
    _dispatch_client_callout(ctxt, func); // 1
    _dispatch_once_gate_broadcast(l); // 2
}
  • 1,_dispatch_client_callout:对传入的block进行调用。
  • 2,_dispatch_once_gate_broadcast:对外进行广播,已经调用了block。
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
    dispatch_lock value_self = _dispatch_lock_value_for_self();
    uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
    v = _dispatch_once_mark_quiescing(l);
#else
    v = _dispatch_once_mark_done(l);
#endif
    if (likely((dispatch_lock)v == value_self)) return;
    _dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}

DISPATCH_ALWAYS_INLINE
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
    return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}

_dispatch_once_mark_done:将 onceToken的值修改为 DLOCK_ONCE_DONE (~(uintptr_t)0)

当下一次调用的时候,v == DLOCK_ONCE_DONE,直接return,就不会在次执行block。

如果多个线程同时执行,首先,访问线程会通过_dispatch_lock_value_for_self方法进行加锁,另外的线程将会进行等待 _dispatch_once_wait

dispatch_sync

当我们调用dispatch_sync函数时,会调用_dispatch_sync_f_inline函数。

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    if (likely(dq->dq_width == 1)) {
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

对于串行队列而言,其dq_width == 1,则会调用 _dispatch_barrier_sync_f函数。

_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self(); \\ 1

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // The more correct thing to do would be to merge the qos of the thread
    // that just acquired the barrier lock into the queue state.
    //
    // However this is too expensive for the fast path, so skip doing it.
    // The chosen tradeoff is that if an enqueue on a lower priority thread
    // contends with this fast path, this thread may receive a useless override.
    //
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
  • 1,先获取当前线程的pid
static inline void
_dispatch_introspection_sync_begin(dispatch_queue_class_t dq)
{
    if (!_dispatch_introspection.debug_queue_inversions) return;
    _dispatch_introspection_order_record(dq._dq);
}

_dispatch_introspection_order_record对当前的线程的一些信息进行处理和记录。
_dispatch_sync_function_invoke_inline

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

因为同步不会开辟新线程,所以会根据当前线程id,让该线程的block执行完毕后,才会执行后面的代码,让该线程挂起。

如何判断当前线程id的线程已经是挂起状态?

我们看下_dispatch_queue_try_acquire_barrier_sync_and_suspend该函数

DISPATCH_ALWAYS_INLINE DISPATCH_WARN_RESULT
static inline bool
_dispatch_queue_try_acquire_barrier_sync_and_suspend(dispatch_lane_t dq,
        uint32_t tid, uint64_t suspend_count)
{
    uint64_t init  = DISPATCH_QUEUE_STATE_INIT_VALUE(dq->dq_width);
    uint64_t value = DISPATCH_QUEUE_WIDTH_FULL_BIT | DISPATCH_QUEUE_IN_BARRIER |
            _dispatch_lock_value_from_tid(tid) |
            (suspend_count * DISPATCH_QUEUE_SUSPEND_INTERVAL);
    uint64_t old_state, new_state;

    return os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, acquire, {
        uint64_t role = old_state & DISPATCH_QUEUE_ROLE_MASK;
        if (old_state != (init | role)) {
            os_atomic_rmw_loop_give_up(break);
        }
        new_state = value | role;
    });
}

我们通过查看函数调用栈,可以看出,当发生同步死锁时,调用了_dispatch_sync_f_slow函数:

DISPATCH_NOINLINE
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}

当执行到这一步时,当前线程的状态是suspend同步函数会往当前线程加入同步任务_dispatch_trace_item_push函数就起到添加任务的作用。紧接着调用__DISPATCH_WAIT_FOR_QUEUE__函数

__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
    uint64_t dq_state = _dispatch_wait_prepare(dq);// 判断当前队列是否是要等待的队列
    if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
        DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
                "dispatch_sync called on queue "
                "already owned by current thread");
    }
}

DISPATCH_ALWAYS_INLINE
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
        // 将传入的tid与挂起线程的tid进行异或运算,相同为0 
    // equivalent to _dispatch_lock_owner(lock_value) == tid
    return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}

_dispatch_lock_is_locked_by函数将 当前线程的tid传入线程的tid进行异或运算。然后判断是否等于0,在这里,如果两个 tid值相同,该函数则会返回YES,这时,系统也不知道该怎么做,就抛出一个异常crash,就是死锁

总结:

  • 1,dispatch_once:在运行的时候会改变onceToken的值,在传入的block执行过后,onceToken的值会变为 DLOCK_ONCE_DONE (~(uintptr_t)0)。当其他线程在调用该函数时,会直接return

  • 2,dispatch_sync:当dq_width == 1时,为串行队列,当执行到 _dispatch_sync_f_slow函数时,会通过_dispatch_lock_is_locked_by函数比对传入线程当前线程tid。如果两个tid值一样,则会抛出异常,因为此时系统也不知道该先调度哪个任务。这样就造成了死锁

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。