前言
iOS-GCD原理分析(一)这篇文章我们分析了GCD的函数与队列,GCD的源码还未分析完,我们接着分析。
同步函数死锁
我们知道GCD中有同步和异步函数,那么它们有什么区别,我们来分析下。
我们可以从能否开辟线程,任务的回调是否具备异步性,同步性,是否阻塞,死锁,我们根据这些问题来分析下源码。
我们在源码中搜索dispatch_sync(dis关键字,找到如下源码:
void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
uintptr_t dc_flags = DC_FLAG_BLOCK;
if (unlikely(_dispatch_block_has_private_data(work))) {
return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
}
_dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}
_dispatch_sync_f中_dispatch_Block_invoke对任务进行了封装,我们在搜索下_dispatch_sync_f(dis这个,如下:
static void
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
uintptr_t dc_flags)
{
_dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}
这里面调用了_dispatch_sync_f_inline函数,我们再看下它的代码,如下:
static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
if (likely(dq->dq_width == 1)) {
return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
}
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}
这里dq_width == 1 是串行队列,我们再看下_dispatch_barrier_sync_f这个函数的代码,如下:
static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
_dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}
再看_dispatch_barrier_sync_f_inline的源码,如下:
static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
dispatch_function_t func, uintptr_t dc_flags)
{
dispatch_tid tid = _dispatch_tid_self();
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// The more correct thing to do would be to merge the qos of the thread
// that just acquired the barrier lock into the queue state.
//
// However this is too expensive for the fast path, so skip doing it.
// The chosen tradeoff is that if an enqueue on a lower priority thread
// contends with this fast path, this thread may receive a useless override.
//
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
DC_FLAG_BARRIER | dc_flags);
}
if (unlikely(dl->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func,
DC_FLAG_BARRIER | dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}
这里有调用_dispatch_sync_f_slow这个函数(死锁发生的时候,堆栈中可以看到有调用这个函数),如下图所示:
这里发生死锁的时候的堆栈。
真正报错的是下图:
我们看下_dispatch_sync_f_slow的源码,如下:
static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
dispatch_function_t func, uintptr_t top_dc_flags,
dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
dispatch_queue_t top_dq = top_dqu._dq;
dispatch_queue_t dq = dqu._dq;
if (unlikely(!dq->do_targetq)) {
return _dispatch_sync_function_invoke(dq, ctxt, func);
}
pthread_priority_t pp = _dispatch_get_priority();
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
_dispatch_trace_item_push(top_dq, &dsc);
__DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);
if (dsc.dsc_func == NULL) {
// dsc_func being cleared means that the block ran on another thread ie.
// case (2) as listed in _dispatch_async_and_wait_f_slow.
dispatch_queue_t stop_dq = dsc.dc_other;
return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
}
_dispatch_introspection_sync_begin(top_dq);
_dispatch_trace_item_pop(top_dq, &dsc);
_dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
DISPATCH_TRACE_ARG(&dsc));
}
上面的图有说明DISPATCH_WAIT_FOR_QUEUE是在这个函数报的错,我们来看下源码:
static void
__DISPATCH_WAIT_FOR_QUEUE__(dispatch_sync_context_t dsc, dispatch_queue_t dq)
{
uint64_t dq_state = _dispatch_wait_prepare(dq);
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
// Blocks submitted to the main thread MUST run on the main thread, and
// dispatch_async_and_wait also executes on the remote context rather than
// the current thread.
//
// For both these cases we need to save the frame linkage for the sake of
// _dispatch_async_and_wait_invoke
_dispatch_thread_frame_save_state(&dsc->dsc_dtf);
if (_dq_state_is_suspended(dq_state) ||
_dq_state_is_base_anon(dq_state)) {
dsc->dc_data = DISPATCH_WLH_ANON;
} else if (_dq_state_is_base_wlh(dq_state)) {
dsc->dc_data = (dispatch_wlh_t)dq;
} else {
_dispatch_wait_compute_wlh(upcast(dq)._dl, dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
dsc->dsc_override_qos_floor = dsc->dsc_override_qos =
(uint8_t)_dispatch_get_basepri_override_qos_floor();
_dispatch_thread_event_init(&dsc->dsc_event);
}
dx_push(dq, dsc, _dispatch_qos_from_pp(dsc->dc_priority));
_dispatch_trace_runtime_event(sync_wait, dq, 0);
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_wait(&dsc->dsc_event); // acquire
} else if (!dsc->dsc_wlh_self_wakeup) {
_dispatch_event_loop_wait_for_ownership(dsc);
}
if (dsc->dc_data == DISPATCH_WLH_ANON) {
_dispatch_thread_event_destroy(&dsc->dsc_event);
// If _dispatch_sync_waiter_wake() gave this thread an override,
// ensure that the root queue sees it.
if (dsc->dsc_override_qos > dsc->dsc_override_qos_floor) {
_dispatch_set_basepri_override_qos(dsc->dsc_override_qos);
}
}
}
其中
if (unlikely(_dq_state_drain_locked_by(dq_state, dsc->dsc_waiter))) {
DISPATCH_CLIENT_CRASH((uintptr_t)dq_state,
"dispatch_sync called on queue "
"already owned by current thread");
}
这段代码就是报错的代码。这是为什么会发生死锁?
我们来看下死锁的条析。
"dispatch_sync called on queue ,already owned by current thread"这段英文解释的是同步函数调起的队列,已经被当前的线程所持有。
dsc->dsc_waiter中的dsc是由_dispatch_sync_f_slow传过来的,
_dispatch_sync_f_slow有段这样的代码
struct dispatch_sync_context_s dsc = {
.dc_flags = DC_FLAG_SYNC_WAITER | dc_flags,
.dc_func = _dispatch_async_and_wait_invoke,
.dc_ctxt = &dsc,
.dc_other = top_dq,
.dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
.dc_voucher = _voucher_get(),
.dsc_func = func,
.dsc_ctxt = ctxt,
.dsc_waiter = _dispatch_tid_self(),
};
dsc_waiter=_dispatch_tid_self(),_dispatch_tid_self*就是当前线程的线程id,如下:
#define _dispatch_tid_self() ((dispatch_tid)_dispatch_thread_port())
DISPATCH_WAIT_FOR_QUEUE函数中的
uint64_t dq_state = _dispatch_wait_prepare(dq);
代码dq就是当前的队列,dq_state是当前队列的状态,
static inline bool
_dq_state_drain_locked_by(uint64_t dq_state, dispatch_tid tid)
{
return _dispatch_lock_is_locked_by((dispatch_lock)dq_state, tid);
}
_dispatch_lock_is_locked_by这个函数的代码如下:
static inline bool
_dispatch_lock_is_locked_by(dispatch_lock lock_value, dispatch_tid tid)
{
// equivalent to _dispatch_lock_owner(lock_value) == tid
return ((lock_value ^ tid) & DLOCK_OWNER_MASK) == 0;
}
检查队列的状态和线程id的匹配。
队列的状态与线程id异或运算和DLOCK_OWNER_MASK进行与运算(#define DLOCK_OWNER_MASK ((dispatch_lock)0xfffffffc))
**也就是说要等待的状态和线程id相同,当前在等待状态,又调起了dq(队列),又要去执行,执行的时候,又发现是在等待状态,产生了死锁。 **
同步函数任务同步
并发,为什么还要保持串联执行呢?我们分析下。
其中这段代码
if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
}
dispatch_lane_t dl = upcast(dq)._dl;
// Global concurrent queues and queues bound to non-dispatch threads
// always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
}
if (unlikely(dq->do_targetq->do_targetq)) {
return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
}
_dispatch_introspection_sync_begin(dl);
_dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
_dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
这段代码是如何执行的,我们利用断点调试,如下图所示:
我们看下_dispatch_sync_invoke_and_complete这个函数,它的源码如下:
static void
_dispatch_sync_invoke_and_complete(dispatch_lane_t dq, void *ctxt,
dispatch_function_t func DISPATCH_TRACE_ARG(void *dc))
{
_dispatch_sync_function_invoke_inline(dq, ctxt, func);
_dispatch_trace_item_complete(dc);
_dispatch_lane_non_barrier_complete(dq, 0);
}
DISPATCH_TRACE_ARG(void dc)这里的ARG就是arg,#define DISPATCH_TRACE_ARG(arg) , arg,这里是可选参数。
经过分析之后,发现走到了_dispatch_sync_f_slow这里,接着调用了_dispatch_sync_function_invoke*这个函数,经过分析,来到了这里
static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
dispatch_function_t func)
{
dispatch_thread_frame_s dtf;
_dispatch_thread_frame_push(&dtf, dq);
_dispatch_client_callout(ctxt, func);
_dispatch_perfmon_workitem_inc();
_dispatch_thread_frame_pop(&dtf);
}
在这里调用_dispatch_client_callout执行回调,然后出来。
同步函数:任务的执行和函数放在一块,中间处理下状态,这就是为什么同步函数可以立马执行。
异步函数分析上
同步函数分析完,我们再来分析下异步函数。
对于异步函数,我们需要关注它的创建线程和任务回调的的异步性。
我们在源码中搜下 dispatch_async(dis找到如下代码:
void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
我们再看下_dispatch_continuation_async这个函数的代码,如下 :
static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
_dispatch_trace_item_push(dqu, dc);
}
#else
(void)dc_flags;
#endif
return dx_push(dqu._dq, dc, qos);
}
这里的qos就是包装的任务,dx_push这里会调用dq_push
,而dq_push会根据你传过来队列的不同导致不同
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
.do_type = DISPATCH_QUEUE_CONCURRENT_TYPE,
.do_dispose = _dispatch_lane_dispose,
.do_debug = _dispatch_queue_debug,
.do_invoke = _dispatch_lane_invoke,
.dq_activate = _dispatch_lane_activate,
.dq_wakeup = _dispatch_lane_wakeup,
.dq_push = _dispatch_lane_concurrent_push,
);
这里是并发的。
搜索_dispatch_lane_concurrent_push找到以下代码:
void
_dispatch_lane_concurrent_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
// <rdar://problem/24738102&24743140> reserving non barrier width
// doesn't fail if only the ENQUEUED bit is set (unlike its barrier
// width equivalent), so we have to check that this thread hasn't
// enqueued anything ahead of this call or we can break ordering
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
_dispatch_lane_push(dq, dou, qos);
}
_dispatch_lane_push这个函数,并发和串行的都会来到这里。
if (dq->dq_items_tail == NULL &&
!_dispatch_object_is_waiter(dou) &&
!_dispatch_object_is_barrier(dou) &&
_dispatch_queue_try_acquire_async(dq)) {
return _dispatch_continuation_redirect_push(dq, dou, qos);
}
区别来于这里栅栏的处理。
_dispatch_lane_push的源码如下:
void
_dispatch_lane_push(dispatch_lane_t dq, dispatch_object_t dou,
dispatch_qos_t qos)
{
dispatch_wakeup_flags_t flags = 0;
struct dispatch_object_s *prev;
if (unlikely(_dispatch_object_is_waiter(dou))) {
return _dispatch_lane_push_waiter(dq, dou._dsc, qos);
}
dispatch_assert(!_dispatch_object_is_global(dq));
qos = _dispatch_queue_push_qos(dq, qos);
// If we are going to call dx_wakeup(), the queue must be retained before
// the item we're pushing can be dequeued, which means:
// - before we exchange the tail if we have to override
// - before we set the head if we made the queue non empty.
// Otherwise, if preempted between one of these and the call to dx_wakeup()
// the blocks submitted to the queue may release the last reference to the
// queue when invoked by _dispatch_lane_drain. <rdar://problem/6932776>
prev = os_mpsc_push_update_tail(os_mpsc(dq, dq_items), dou._do, do_next);
if (unlikely(os_mpsc_push_was_empty(prev))) {
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2 | DISPATCH_WAKEUP_MAKE_DIRTY;
} else if (unlikely(_dispatch_queue_need_override(dq, qos))) {
// There's a race here, _dispatch_queue_need_override may read a stale
// dq_state value.
//
// If it's a stale load from the same drain streak, given that
// the max qos is monotonic, too old a read can only cause an
// unnecessary attempt at overriding which is harmless.
//
// We'll assume here that a stale load from an a previous drain streak
// never happens in practice.
_dispatch_retain_2_unsafe(dq);
flags = DISPATCH_WAKEUP_CONSUME_2;
}
os_mpsc_push_update_prev(os_mpsc(dq, dq_items), prev, dou._do, do_next);
if (flags) {
return dx_wakeup(dq, qos, flags);
}
}
我们通过符号断点_dispatch_lane_push,_dispatch_lane_push_waiter,dx_wakeup是dq_wakeup通过查找是_dispatch_lane_wakeup这个函数,我们对它进行符号断点,发现进入了_dispatch_lane_wakeup这个函数。
我们看下这个函数的代码:
void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags)
{
dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
if (_dispatch_queue_class_probe(dqu)) {
target = DISPATCH_QUEUE_WAKEUP_TARGET;
}
return _dispatch_queue_wakeup(dqu, qos, flags, target);
}
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
return _dispatch_lane_barrier_complete(dqu, qos, flags);
}
如果添加了barrier函数会走这个流程。
之后会调用_dispatch_queue_wakeup这个函数
void
_dispatch_queue_wakeup(dispatch_queue_class_t dqu, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target)
{
dispatch_queue_t dq = dqu._dq;
uint64_t old_state, new_state, enqueue = DISPATCH_QUEUE_ENQUEUED;
dispatch_assert(target != DISPATCH_QUEUE_WAKEUP_WAIT_FOR_EVENT);
if (target && !(flags & DISPATCH_WAKEUP_CONSUME_2)) {
_dispatch_retain_2(dq);
flags |= DISPATCH_WAKEUP_CONSUME_2;
}
if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
//
// _dispatch_lane_class_barrier_complete() is about what both regular
// queues and sources needs to evaluate, but the former can have sync
// handoffs to perform which _dispatch_lane_class_barrier_complete()
// doesn't handle, only _dispatch_lane_barrier_complete() does.
//
// _dispatch_lane_wakeup() is the one for plain queues that calls
// _dispatch_lane_barrier_complete(), and this is only taken for non
// queue types.
//
dispatch_assert(dx_metatype(dq) == _DISPATCH_SOURCE_TYPE);
qos = _dispatch_queue_wakeup_qos(dq, qos);
return _dispatch_lane_class_barrier_complete(upcast(dq)._dl, qos,
flags, target, DISPATCH_QUEUE_SERIAL_DRAIN_OWNED);
}
if (target) {
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
}
qos = _dispatch_queue_wakeup_qos(dq, qos);
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
new_state = _dq_state_merge_qos(old_state, qos);
if (flags & DISPATCH_WAKEUP_CLEAR_ACTIVATING) {
// When an event is being delivered to a source because its
// unote was being registered before the ACTIVATING state
// had a chance to be cleared, we don't want to fail the wakeup
// which could lead to a priority inversion.
//
// Instead, these wakeups are allowed to finish the pending
// activation.
if (_dq_state_is_activating(old_state)) {
new_state &= ~DISPATCH_QUEUE_ACTIVATING;
}
}
if (likely(!_dq_state_is_suspended(new_state) &&
!_dq_state_is_enqueued(old_state) &&
(!_dq_state_drain_locked(old_state) ||
enqueue != DISPATCH_QUEUE_ENQUEUED_ON_MGR))) {
// Always set the enqueued bit for async enqueues on all queues
// in the hierachy
new_state |= enqueue;
}
if (flags & DISPATCH_WAKEUP_MAKE_DIRTY) {
new_state |= DISPATCH_QUEUE_DIRTY;
} else if (new_state == old_state) {
os_atomic_rmw_loop_give_up(goto done);
}
});
#if HAVE_PTHREAD_WORKQUEUE_QOS
} else if (qos) {
//
// Someone is trying to override the last work item of the queue.
//
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
// Avoid spurious override if the item was drained before we could
// apply an override
if (!_dq_state_drain_locked(old_state) &&
!_dq_state_is_enqueued(old_state)) {
os_atomic_rmw_loop_give_up(goto done);
}
new_state = _dq_state_merge_qos(old_state, qos);
if (_dq_state_is_base_wlh(old_state) &&
!_dq_state_is_suspended(old_state) &&
/* <rdar://problem/63179930> */
!_dq_state_is_enqueued_on_manager(old_state)) {
// Always set the enqueued bit for async enqueues on all queues
// in the hierachy (rdar://62447289)
//
// Scenario:
// - mach channel DM
// - targetting TQ
//
// Thread 1:
// - has the lock on (TQ), uncontended sync
// - causes a wakeup at a low QoS on DM, causing it to have:
// max_qos = UT, enqueued = 1
// - the enqueue of DM onto TQ hasn't happened yet.
//
// Thread 2:
// - an incoming IN IPC is being merged on the servicer
// - DM having qos=UT, enqueud=1, no further enqueue happens,
// but we need an extra override and go through this code for
// TQ.
// - this causes TQ to be "stashed", which requires the enqueued
// bit set, else try_lock_wlh() will complain and the
// wakeup refcounting will be off.
new_state |= enqueue;
}
if (new_state == old_state) {
os_atomic_rmw_loop_give_up(goto done);
}
});
target = DISPATCH_QUEUE_WAKEUP_TARGET;
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
} else {
goto done;
}
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_TARGET) {
// the rmw_loop above has no acquire barrier, as the last block
// of a queue asyncing to that queue is not an uncommon pattern
// and in that case the acquire would be completely useless
//
// so instead use depdendency ordering to read
// the targetq pointer.
os_atomic_thread_fence(dependency);
tq = os_atomic_load_with_dependency_on2o(dq, do_targetq,
(long)new_state);
} else {
tq = target;
}
dispatch_assert(_dq_state_is_enqueued(new_state));
return _dispatch_queue_push_queue(tq, dq, new_state);
}
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (unlikely((old_state ^ new_state) & DISPATCH_QUEUE_MAX_QOS_MASK)) {
if (_dq_state_should_override(new_state)) {
return _dispatch_queue_wakeup_with_override(dq, new_state,
flags);
}
}
#endif // HAVE_PTHREAD_WORKQUEUE_QOS
done:
if (likely(flags & DISPATCH_WAKEUP_CONSUME_2)) {
return _dispatch_release_2_tailcall(dq);
}
}
我们再通过符号断点,发现,有时会不断的调用_dispatch_lane_push,_dispatch_lane_wakeup,这是后台会不断调用任务导致。
当我们走到_dispatch_lane_class_barrier_complete,我们搜下_dispatch_lane_class_barrier_complete这个函数,代码如下:
static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
uint64_t owned)
{
uint64_t old_state, new_state, enqueue;
dispatch_queue_t tq;
if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
tq = _dispatch_mgr_q._as_dq;
enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
} else if (target) {
tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
enqueue = DISPATCH_QUEUE_ENQUEUED;
} else {
tq = NULL;
enqueue = 0;
}
again:
os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, release, {
if (unlikely(_dq_state_needs_ensure_ownership(old_state))) {
_dispatch_event_loop_ensure_ownership((dispatch_wlh_t)dq);
_dispatch_queue_move_to_contended_sync(dq->_as_dq);
os_atomic_rmw_loop_give_up(goto again);
}
new_state = _dq_state_merge_qos(old_state - owned, qos);
new_state &= ~DISPATCH_QUEUE_DRAIN_UNLOCK_MASK;
if (unlikely(_dq_state_is_suspended(old_state))) {
if (likely(_dq_state_is_base_wlh(old_state))) {
new_state &= ~DISPATCH_QUEUE_ENQUEUED;
}
} else if (enqueue) {
if (!_dq_state_is_enqueued(old_state)) {
new_state |= enqueue;
}
} else if (unlikely(_dq_state_is_dirty(old_state))) {
os_atomic_rmw_loop_give_up({
// just renew the drain lock with an acquire barrier, to see
// what the enqueuer that set DIRTY has done.
// the xor generates better assembly as DISPATCH_QUEUE_DIRTY
// is already in a register
os_atomic_xor2o(dq, dq_state, DISPATCH_QUEUE_DIRTY, acquire);
flags |= DISPATCH_WAKEUP_BARRIER_COMPLETE;
return dx_wakeup(dq, qos, flags);
});
} else {
new_state &= ~DISPATCH_QUEUE_MAX_QOS_MASK;
}
});
old_state -= owned;
dispatch_assert(_dq_state_drain_locked_by_self(old_state));
dispatch_assert(!_dq_state_is_enqueued_on_manager(old_state));
if (_dq_state_is_enqueued(new_state)) {
_dispatch_trace_runtime_event(sync_async_handoff, dq, 0);
}
#if DISPATCH_USE_KEVENT_WORKLOOP
if (_dq_state_is_base_wlh(old_state)) {
// - Only non-"du_is_direct" sources & mach channels can be enqueued
// on the manager.
//
// - Only dispatch_source_cancel_and_wait() and
// dispatch_source_set_*_handler() use the barrier complete codepath,
// none of which are used by mach channels.
//
// Hence no source-ish object can both be a workloop and need to use the
// manager at the same time.
dispatch_assert(!_dq_state_is_enqueued_on_manager(new_state));
if (_dq_state_is_enqueued_on_target(old_state) ||
_dq_state_is_enqueued_on_target(new_state) ||
!_dq_state_in_uncontended_sync(old_state)) {
return _dispatch_event_loop_end_ownership((dispatch_wlh_t)dq,
old_state, new_state, flags);
}
_dispatch_event_loop_assert_not_owned((dispatch_wlh_t)dq);
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
return;
}
#endif
if (_dq_state_received_override(old_state)) {
// Ensure that the root queue sees that this thread was overridden.
_dispatch_set_basepri_override_qos(_dq_state_max_qos(old_state));
}
if (tq) {
if (likely((old_state ^ new_state) & enqueue)) {
dispatch_assert(_dq_state_is_enqueued(new_state));
dispatch_assert(flags & DISPATCH_WAKEUP_CONSUME_2);
return _dispatch_queue_push_queue(tq, dq, new_state);
}
#if HAVE_PTHREAD_WORKQUEUE_QOS
// <rdar://problem/27694093> when doing sync to async handoff
// if the queue received an override we have to forecefully redrive
// the same override so that a new stealer is enqueued because
// the previous one may be gone already
if (_dq_state_should_override(new_state)) {
return _dispatch_queue_wakeup_with_override(dq, new_state, flags);
}
#endif
}
if (flags & DISPATCH_WAKEUP_CONSUME_2) {
return _dispatch_release_2_tailcall(dq);
}
}
os_atomic_rmw_loop2o这里不断的递归。
这里最终会调用_dispatch_root_queue_push这个函数,代码如下:
void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
dispatch_qos_t qos)
{
#if DISPATCH_USE_KEVENT_WORKQUEUE
dispatch_deferred_items_t ddi = _dispatch_deferred_items_get();
if (unlikely(ddi && ddi->ddi_can_stash)) {
dispatch_object_t old_dou = ddi->ddi_stashed_dou;
dispatch_priority_t rq_overcommit;
rq_overcommit = rq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (likely(!old_dou._do || rq_overcommit)) {
dispatch_queue_global_t old_rq = ddi->ddi_stashed_rq;
dispatch_qos_t old_qos = ddi->ddi_stashed_qos;
ddi->ddi_stashed_rq = rq;
ddi->ddi_stashed_dou = dou;
ddi->ddi_stashed_qos = qos;
_dispatch_debug("deferring item %p, rq %p, qos %d",
dou._do, rq, qos);
if (rq_overcommit) {
ddi->ddi_can_stash = false;
}
if (likely(!old_dou._do)) {
return;
}
// push the previously stashed item
qos = old_qos;
rq = old_rq;
dou = old_dou;
}
}
#endif
#if HAVE_PTHREAD_WORKQUEUE_QOS
if (_dispatch_root_queue_push_needs_override(rq, qos)) {
return _dispatch_root_queue_push_override(rq, dou, qos);
}
#else
(void)qos;
#endif
_dispatch_root_queue_push_inline(rq, dou, dou, 1);
}
这里调用了_dispatch_root_queue_push_inline这个函数,代码如下:
static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
dispatch_object_t _head, dispatch_object_t _tail, int n)
{
struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
return _dispatch_root_queue_poke(dq, n, 0);
}
}
这里又调用了_dispatch_root_queue_poke这个函数,代码如下:
void
_dispatch_root_queue_poke(dispatch_queue_global_t dq, int n, int floor)
{
if (!_dispatch_queue_class_probe(dq)) {
return;
}
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
{
if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
_dispatch_root_queue_debug("worker thread request still pending "
"for global queue: %p", dq);
return;
}
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
return _dispatch_root_queue_poke_slow(dq, n, floor);
}
这里调用了_dispatch_root_queue_poke_slow,它的代码如下:
static void
_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
int remaining = n;
#if !defined(_WIN32)
int r = ENOSYS;
#endif
_dispatch_root_queues_init();
_dispatch_debug_root_queue(dq, __func__);
_dispatch_trace_runtime_event(worker_request, dq, (uint64_t)n);
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
dispatch_pthread_root_queue_context_t pqc = dq->do_ctxt;
if (likely(pqc->dpq_thread_mediator.do_vtable)) {
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
_dispatch_root_queue_debug("signaled sleeping worker for "
"global queue: %p", dq);
if (!--remaining) {
return;
}
}
}
bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
if (overcommit) {
os_atomic_add2o(dq, dgq_pending, remaining, relaxed);
} else {
if (!os_atomic_cmpxchg2o(dq, dgq_pending, 0, remaining, relaxed)) {
_dispatch_root_queue_debug("worker thread request still pending for "
"global queue: %p", dq);
return;
}
}
int can_request, t_count;
// seq_cst with atomic store to tail <rdar://problem/16932833>
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
#if !defined(_WIN32)
pthread_attr_t *attr = &pqc->dpq_thread_attr;
pthread_t tid, *pthr = &tid;
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
pthr = _dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
if (r != EAGAIN) {
(void)dispatch_assume_zero(r);
}
_dispatch_temporary_resource_shortage();
}
} while (--remaining);
#else // defined(_WIN32)
#if DISPATCH_USE_MGR_THREAD && DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (unlikely(dq == &_dispatch_mgr_root_queue)) {
_dispatch_mgr_root_queue_init();
}
#endif
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
if (errno != EAGAIN) {
(void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
#endif
CloseHandle((HANDLE)hThread);
} while (--remaining);
#endif // defined(_WIN32)
#else
(void)floor;
#endif // DISPATCH_USE_PTHREAD_POOL
}
_dispatch_root_queues_init这里的函数比较重要,分析它之前,我们先要讲下GCD单例的原理。
单例底层原理
我们看下_dispatch_root_queues_init这个函数的代码,如下所示:
static inline void
_dispatch_root_queues_init(void)
{
dispatch_once_f(&_dispatch_root_queues_pred, NULL,
_dispatch_root_queues_init_once);
}
这里其实是单例,我们来分析下。
单例的代码:
static dispatch_once_t onceToken;
dispatch_once(&onceToken, ^{
})
我们看下dispatch_once的源码,如下:
void
dispatch_once(dispatch_once_t *val, dispatch_block_t block)
{
dispatch_once_f(val, block, _dispatch_Block_invoke(block));
}
这里调用的dispatch_once_f这个函数,它的源码如下:
void
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)
{
dispatch_once_gate_t l = (dispatch_once_gate_t)val;
#if !DISPATCH_ONCE_INLINE_FASTPATH || DISPATCH_ONCE_USE_QUIESCENT_COUNTER
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
if (likely(DISPATCH_ONCE_IS_GEN(v))) {
return _dispatch_once_mark_done_if_quiesced(l, v);
}
#endif
#endif
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
return _dispatch_once_wait(l);
}
这里的val就是onceToken,就是全局静态变量。
dispatch_once_gate_t,强制转的,关或开。
uintptr_t v = os_atomic_load(&l->dgo_once, acquire);
if (likely(v == DLOCK_ONCE_DONE)) {
return;
}
os_atomic_load判断如果做了一次,就不再进来 ,return;
如果没有就进入 了
if (_dispatch_once_gate_tryenter(l)) {
return _dispatch_once_callout(l, ctxt, func);
}
这个流程,这里判断是否进去_dispatch_once_gate_tryenter,
_dispatch_once_callout又调用了_dispatch_client_callout执地回调。
static inline bool
_dispatch_once_gate_tryenter(dispatch_once_gate_t l)
{
return os_atomic_cmpxchg(&l->dgo_once, DLOCK_ONCE_UNLOCKED,
(uintptr_t)_dispatch_lock_value_for_self(), relaxed);
}
这里又调用了os_atomic_cmpxchg这里会进行锁的处理,这里是由线程锁控制。
_dispatch_lock_value_for_self它的代码如下:
static inline dispatch_lock
_dispatch_lock_value_for_self(void)
{
return _dispatch_lock_value_from_tid(_dispatch_tid_self());
}
所以GCD的单列是多线程安全的。
_dispatch_once_callout又调起了_dispatch_once_gate_broadcast,这个函数的代码如下:
static inline void
_dispatch_once_gate_broadcast(dispatch_once_gate_t l)
{
dispatch_lock value_self = _dispatch_lock_value_for_self();
uintptr_t v;
#if DISPATCH_ONCE_USE_QUIESCENT_COUNTER
v = _dispatch_once_mark_quiescing(l);
#else
v = _dispatch_once_mark_done(l);
#endif
if (likely((dispatch_lock)v == value_self)) return;
_dispatch_gate_broadcast_slow(&l->dgo_gate, (dispatch_lock)v);
}
这里又调了_dispatch_once_mark_done这个函数,代码如下:
static inline uintptr_t
_dispatch_once_mark_done(dispatch_once_gate_t dgo)
{
return os_atomic_xchg(&dgo->dgo_once, DLOCK_ONCE_DONE, release);
}
这里会值DLOCK_ONCE_DONE进去,下次就不会再进来了。
如果dispatch_once_f里面,如果没有标记DLOCK_ONCE_DONE,_dispatch_once_gate_tryenter也不能进去,那么就会调用_dispatch_once_wait无限制等待开锁。
通过单列底层的分析,* _dispatch_root_queues_init*会让初始化只进来一次。
异步函数分析下
_dispatch_root_queues_init这个函数调用了_dispatch_root_queues_init_once,我们看下它的源码
static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
_dispatch_fork_becomes_unsafe();
#if DISPATCH_USE_INTERNAL_WORKQUEUE
size_t i;
for (i = 0; i < DISPATCH_ROOT_QUEUE_COUNT; i++) {
_dispatch_root_queue_init_pthread_pool(&_dispatch_root_queues[i], 0,
_dispatch_root_queues[i].dq_priority);
}
#else
int wq_supported = _pthread_workqueue_supported();
int r = ENOTSUP;
if (!(wq_supported & WORKQ_FEATURE_MAINTENANCE)) {
DISPATCH_INTERNAL_CRASH(wq_supported,
"QoS Maintenance support required");
}
#if DISPATCH_USE_KEVENT_SETUP
struct pthread_workqueue_config cfg = {
.version = PTHREAD_WORKQUEUE_CONFIG_VERSION,
.flags = 0,
.workq_cb = 0,
.kevent_cb = 0,
.workloop_cb = 0,
.queue_serialno_offs = dispatch_queue_offsets.dqo_serialnum,
#if PTHREAD_WORKQUEUE_CONFIG_VERSION >= 2
.queue_label_offs = dispatch_queue_offsets.dqo_label,
#endif
};
#endif
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunreachable-code"
if (unlikely(!_dispatch_kevent_workqueue_enabled)) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init(_dispatch_worker_thread2,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#if DISPATCH_USE_KEVENT_WORKLOOP
} else if (wq_supported & WORKQ_FEATURE_WORKLOOP) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
cfg.workloop_cb = (pthread_workqueue_function_workloop_t) _dispatch_workloop_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
(pthread_workqueue_function_workloop_t)
_dispatch_workloop_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif // DISPATCH_USE_KEVENT_WORKLOOP
#if DISPATCH_USE_KEVENT_WORKQUEUE
} else if (wq_supported & WORKQ_FEATURE_KEVENT) {
#if DISPATCH_USE_KEVENT_SETUP
cfg.workq_cb = _dispatch_worker_thread2;
cfg.kevent_cb = (pthread_workqueue_function_kevent_t) _dispatch_kevent_worker_thread;
r = pthread_workqueue_setup(&cfg, sizeof(cfg));
#else
r = _pthread_workqueue_init_with_kevent(_dispatch_worker_thread2,
(pthread_workqueue_function_kevent_t)
_dispatch_kevent_worker_thread,
offsetof(struct dispatch_queue_s, dq_serialnum), 0);
#endif // DISPATCH_USE_KEVENT_SETUP
#endif
} else {
DISPATCH_INTERNAL_CRASH(wq_supported, "Missing Kevent WORKQ support");
}
#pragma clang diagnostic pop
if (r != 0) {
DISPATCH_INTERNAL_CRASH((r << 16) | wq_supported,
"Root queue initialization failed");
}
#endif // DISPATCH_USE_INTERNAL_WORKQUEUE
}
接着我们再看下dispatch_async的调用堆栈,如图所示:
frame1到frame5就是任务的执行,那么它到底是在哪调用的,我们来看下。
这个任务是包装在_dispatch_worker_thread2这里,其实包装在pthread中API中,GCD是封装了pthread。
这里有_pthread_workqueue_init_with_workloop工作循环调起, 不是立马调用的,受我们当前的OS管控。
异步线程的回调是异步的
我们再来看下_dispatch_root_queue_poke_slow这个函数,
if (dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE)
#endif
{
_dispatch_root_queue_debug("requesting new worker thread for global "
"queue: %p", dq);
r = _pthread_workqueue_addthreads(remaining,
_dispatch_priority_to_pp_prefer_fallback(dq->dq_priority));
(void)dispatch_assume_zero(r);
return;
}
这里如果是全局并发类型,这里会_pthread_workqueue_addthreads调用它创建线程,执行。
如果是普通的,
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
这里是do while循环。
while的条件os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire)
dgq_thread_pool_size 标记为1。
如下:
const struct dispatch_queue_global_s _dispatch_custom_workloop_root_queue = {
DISPATCH_GLOBAL_OBJECT_HEADER(queue_global),
.dq_state = DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE,
.do_ctxt = NULL,
.dq_label = "com.apple.root.workloop-custom",
.dq_atomic_flags = DQF_WIDTH(DISPATCH_QUEUE_WIDTH_POOL),
.dq_priority = _dispatch_priority_make_fallback(DISPATCH_QOS_DEFAULT) |
DISPATCH_PRIORITY_SATURATED_OVERRIDE,
.dq_serialnum = DISPATCH_QUEUE_SERIAL_NUMBER_WLF,
.dgq_thread_pool_size = 1,
};
全局并发队列要比并发队列大1
#define DISPATCH_QUEUE_WIDTH_POOL (DISPATCH_QUEUE_WIDTH_FULL - 1) // 全局并发队列
#define DISPATCH_QUEUE_WIDTH_MAX (DISPATCH_QUEUE_WIDTH_FULL - 2) // 并发队列
全局并发队列的pool_size是从1开始的。
dgq_thread_pool_size会不断的赋值,++操作。
_dispatch_root_queue_poke_slow的
t_count = os_atomic_load2o(dq, dgq_thread_pool_size, ordered);
do {
can_request = t_count < floor ? 0 : t_count - floor;
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
} while (!os_atomic_cmpxchgv2o(dq, dgq_thread_pool_size, t_count,
t_count - remaining, &t_count, acquire));
这段代码中,remaining = can_request;空余的数量=我能够请求的数量,can_request来之于can_request = t_count < floor ? 0 : t_count - floor;这里,如果floor传过来的,t_count是加载过来的。
remaing来之于参数n,经过分析传的是1,异步线程进来,创建一个线程,所以是1 。
所以这里
if (remaining > can_request) {
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
remaining, can_request);
os_atomic_sub2o(dq, dgq_pending, remaining - can_request, relaxed);
remaining = can_request;
}
不大于能够创建的,如果大于就说明有异常了,如果
if (remaining == 0) {
_dispatch_root_queue_debug("pthread pool is full for root queue: "
"%p", dq);
return;
}
也会报经常,说明线程池队列满了,就返回,不执行。
那么最多可以开多少线程呢,我们分析下。
do {
_dispatch_retain(dq); // released in _dispatch_worker_thread
#if DISPATCH_DEBUG
unsigned dwStackSize = 0;
#else
unsigned dwStackSize = 64 * 1024;
#endif
uintptr_t hThread = 0;
while (!(hThread = _beginthreadex(NULL, dwStackSize, _dispatch_worker_thread_thunk, dq, STACK_SIZE_PARAM_IS_A_RESERVATION, NULL))) {
if (errno != EAGAIN) {
(void)dispatch_assume(hThread);
}
_dispatch_temporary_resource_shortage();
}
#if DISPATCH_USE_PTHREAD_ROOT_QUEUES
if (_dispatch_mgr_sched.prio > _dispatch_mgr_sched.default_prio) {
(void)dispatch_assume_zero(SetThreadPriority((HANDLE)hThread, _dispatch_mgr_sched.prio) == TRUE);
}
#endif
CloseHandle((HANDLE)hThread);
} while (--remaining);
这段代码中,我们来看下,
wStackSize = 64 * 1024这是目前看到的栈的大小,是不是这样, 我们还需要验证下。
os_atomic_cmpxchgv2o判断是否已经满了,我们看下dgq_thread_pool_size大小
#ifndef DISPATCH_WORKQ_MAX_PTHREAD_COUNT
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
#endif
这里定义的255,线程池最大数,但是不代表能开辟这么多。
我们看下官方文档的说明,如图:
这里写的是512k,至少是16k,如果栈越大,开辟线程的内存占用就越大,那么开辟的线程数量就越小。
如果4gb内存,系统内核态1gb
如果是16k,1024*1024/16这么多,线程默认的大小一般是512k,可以开到2048,但是显然开不了这么多。
总结
这篇文章主要介绍了同步函数死锁,同步函数任务同步,异步函数,单列底层原理分析,后续还会再介绍GCD的其它相关知识,文章中有错误地方,请大家指正。