iOS 底层学习23 — GCD(一)

前言

iOS 底层第23天的学习。 多线程里用的最多的技术 — GCD 的学习。

What is GCD ?

GCD 全称是 Grand Central Dispatch
纯C语⾔,提供了⾮常多强⼤的函数

  • GCD的优势
  • GCD是苹果公司为多核的并⾏运算提出的解决⽅案
  • GCD会⾃动利⽤更多的CPU内核(⽐如双核、四核)
  • GCD会⾃动管理线程的⽣命周期(创建线程、调度任务、销毁线程

小结:GCD 使用起来非常方便,你只需要告诉 GCD 想要执⾏什么任务,不需要编写任何线程管理代码
代码实现👇

/**
 * 还原最基础的写法,很重要
 */
- (void)syncTest{
    dispatch_block_t block = ^{
        NSLog(@"我是一个 GCD block 任务");
    };
    // 创建一个 串行队列
    dispatch_queue_t queue = dispatch_queue_create("com.queue.cn", NULL);
    // 函数调用
    dispatch_async(queue, block);
    // 同步函数
    // dispatch_sync(queue,block);
}

我们来分析一下👆的代码
dispatch_block_t block = ^{ NSLog(@"我是一个 GCD block 任务"); };
任务使⽤ block 封装, 任务的 block 没有参数也没有返回值
dispatch_queue_t queue 创建了一个队列
dispatch_(a)sync(queue, block); 传入了一个队列和一个 block 任务进行函数同异步调用


dispatch_async 异步函数,会开启线程执⾏ block 的任务。不⽤等待当前语句执⾏完毕,就可以执⾏下⼀条语句,异步是多线程的代名词
dispatch_sync同步函数。必须等待当前语句执⾏完毕,才会执⾏下⼀条语

小结:GCD 代码实现步骤
1.创建一个 block 封装任务,
2.再创建一个 queue,
3.把 blockqueue 传入到 函数 dispatch_sync 进行调用。


队列 (Queue)

队列(Queue)与栈一样,是一种线性存储结构,它具有如下特点:

  • 队列中的数据元素遵循“先进先出”(First In First Out)的原则,简称 FIFO 结构。
  • 在队尾添加元素,在队头删除元素。
串行队列
  • 任务1~n 按顺序进行一个一个执行
并行队列

并行队列 它的出口很大,并不需要按顺序执行,哪个任务先执行完就可以出队


队列 vs 函数
同步函数串行队列
  • 不会开启线程,在当前线程中执行任务
  • 任务串行执行,任务一个接着一个执行
  • 会产生阻塞
同步函数并行队列
  • 不会开启线程,在当前线程中执行任务
  • 任务一个接着一个执行
异步函数串行队列
  • 会开启一个线程
  • 任务一个接着一个执行
异步函数并行队列
  • 开启线程,在当前线程执行任务
  • 任务异步执行,没有顺序,CPU 调度有关

接下来我们看几个Demo更好的理解 串并行队列 vs 函数

在分析前要先知道一个知识点就是:任务的调度和执行都是需要时间的
代码👇

void testMethod(){
    sleep(3);
}
- (void) testTimeDemo {
    CFAbsoluteTime time = CFAbsoluteTimeGetCurrent();
    dispatch_queue_t queue = dispatch_queue_create("com.lgcooci.cn", DISPATCH_QUEUE_SERIAL)
    dispatch_sync(queue, ^{
        testMethod();
    });
    NSLog(@"%f",CFAbsoluteTimeGetCurrent()-time);
}

运行输出👇

001---函数与队列[10780:635648] 3.001044
Demo 1
- (void)textDemo1{
    // 创建一个并行队列
    dispatch_queue_t queue = dispatch_queue_create("queue", DISPATCH_QUEUE_CONCURRENT);
    NSLog(@"1");
    dispatch_async(queue, ^{
        NSLog(@"2");
        dispatch_sync(queue, ^{
            NSLog(@"3");
        });
        NSLog(@"4");
    });
    NSLog(@"5");
}
  • 开始分析: 先输出 1 5 , 并行队列调度 dispatch_async() 异步函数 执行任务需要消耗时间,
    再输出2, 同步函数 dispatch_sync 阻塞 执行 3 , 执行完再 输出 4
    输出结果 1 5 2 3 4
    程序运行进行验证👇
Demo 2

创建一个串行队列 👇

- (void)textDemo2{
    // 串行队列
    dispatch_queue_t queue = dispatch_queue_create("queue", DISPATCH_QUEUE_SERIAL);
    NSLog(@"1");
    // 异步函数
    dispatch_async(queue, ^{
        NSLog(@"2");
        // 同步
        dispatch_sync(queue, ^{
            NSLog(@"3");
        });
        NSLog(@"4");
    });
    NSLog(@"5");
}
  • 开始分析: 先输出 1 5 这个没问题,然后再输出 2,之后就会进行 死锁,因为是串行队列,都在相互等待,如下图

小结: 都在互相等待对方,就会出现死锁,谁都无法出队列

程序运行进行验证👇

_dispatch_sync_f_slow 死锁

Demo 3

把 4 任务给干掉,还会不会死锁👇

- (void)textDemo2{
    // 串行队列
    dispatch_queue_t queue = dispatch_queue_create("queue", DISPATCH_QUEUE_SERIAL);
    NSLog(@"1");
    // 异步函数
    dispatch_async(queue, ^{
        NSLog(@"2");
        // 同步
        dispatch_sync(queue, ^{
            NSLog(@"3");
        });
//        NSLog(@"4");
    });
    NSLog(@"5");
}

程序运行进行验证👇

其实我把 NSLog(@"3"); 也干掉了,还是 _dispatch_sync_f_slow 死锁

  • 这是为什么呢?

其实我也想了好久,我的理解就是 dispatch_async 这块任务 与 内部的 dispatch_sync 在相互等待


GCD 底层探索

GCD 源码下载地址
接下来开始本次最重要的内容,GCD 底层的探索,在探索之前我们肯定要有一个切入点
程序运行起来,经常会看到 👇

这就是程序运行的主队列, 而且它还是串行队列 serial
而当我们运行 dispatch_async , bt 打印堆栈信息👇

发现会来到 libdispatch.dylib
那就以主队列dispatch_get_main_queue() 为切入点
打开 libdispatch.xcodeproj , 全局搜索 dispatch_get_main_queue

dispatch_queue_main_t
dispatch_get_main_queue(void)
{
    return DISPATCH_GLOBAL_OBJECT(dispatch_queue_main_t, _dispatch_main_q);
}

全局搜一下 DISPATCH_GLOBAL_OBJECT

#define DISPATCH_GLOBAL_OBJECT(type, object) ((OS_OBJECT_BRIDGE type)&(object))

根据宏定义得知 dispatch_queue_main_t = type = 类型_dispatch_main_q = object = 对象
那肯定要去分析 _dispatch_main_q 对象 在何时进行赋值
全局 _dispatch_main_q = 👇

struct dispatch_queue_static_s _dispatch_main_q = {
    DISPATCH_GLOBAL_OBJECT_HEADER(queue_main),
#if !DISPATCH_USE_RESOLVERS
    .do_targetq = _dispatch_get_default_queue(true),
#endif
    .dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(1) |
            DISPATCH_QUEUE_ROLE_BASE_ANON,
    .dq_label = "com.apple.main-thread",
    .dq_atomic_flags = DQF_THREAD_BOUND | DQF_WIDTH(1),
    .dq_serialnum = 1,
};

还有一种方法也可以探索来到 👆, 就是 打印输出 mainQueue ,找到 label 然后再源码中全局搜索 com.apple.main-thread

继续分析 _dispatch_main_q 源码
找到 dq_atomic_flags = DQF_WIDTH(1)dq_serialnum = 1

这时就会有个疑问点 DQF_WIDTH(1)是什么? dq_serialnum =1 又是什么? 假设 DQF_WIDTH(1) 决定了 main Queue 是串行,在底层到底是怎么决定的呢?接下来我们开始去验证

串行相对就有并行,在创建队列时肯定会进行区分处理,根据这一点我们开始在底层全局搜索 dispatch_queue_create

dispatch_queue_t
dispatch_queue_create(const char *label, dispatch_queue_attr_t attr)
{
    return _dispatch_lane_create_with_target(label, attr,
            DISPATCH_TARGET_QUEUE_DEFAULT, true);
}

进入 _dispatch_lane_create_with_target

ISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
    // ...  省略部分代码
    // 直接来到 dq 赋值地方
    //对 dq 开辟内存空间
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s));
  // 对 dq 的初始化 函数
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));

    dq->dq_label = label;
    dq->dq_priority = _dispatch_priority_make((dispatch_qos_t)dqai.dqai_qos,
            dqai.dqai_relpri);
    if (overcommit == _dispatch_queue_attr_overcommit_enabled) {
        dq->dq_priority |= DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
    }
    if (!dqai.dqai_inactive) {
        _dispatch_queue_priority_inherit_from_target(dq, tq);
        _dispatch_lane_inherit_wlh_from_target(dq, tq);
    }
    _dispatch_retain(tq);
    dq->do_targetq = tq;
    _dispatch_object_debug(dq, "%s", __func__);
    // 返回值 dq 是核心
    return _dispatch_trace_queue_create(dq)._dq;
}

来到这个函数 _dispatch_queue_init() 的第 3 个参数 , dqai.dqai_concurrent ? DISPATCH_QUEUE_WIDTH_MAX : 1, 如果是并发= DISPATCH_QUEUE_WIDTH_MAX 否则 = 1 = 串行
进入 _dispatch_queue_init

static inline dispatch_queue_class_t
_dispatch_queue_init(dispatch_queue_class_t dqu, dispatch_queue_flags_t dqf,
        uint16_t width, uint64_t initial_state_bits)
{
    uint64_t dq_state = DISPATCH_QUEUE_STATE_INIT_VALUE(width);
    dispatch_queue_t dq = dqu._dq;

    dispatch_assert((initial_state_bits & ~(DISPATCH_QUEUE_ROLE_MASK |
            DISPATCH_QUEUE_INACTIVE)) == 0);

    if (initial_state_bits & DISPATCH_QUEUE_INACTIVE) {
        dq->do_ref_cnt += 2; // rdar://8181908 see _dispatch_lane_resume
        if (dx_metatype(dq) == _DISPATCH_SOURCE_TYPE) {
            dq->do_ref_cnt++; // released when DSF_DELETED is set
        }
    }

    dq_state |= initial_state_bits;
    dq->do_next = DISPATCH_OBJECT_LISTLESS;
    // 第3个参数 width =1 => DQF_WIDTH(1);
    dqf |= DQF_WIDTH(width);
    os_atomic_store2o(dq, dq_atomic_flags, dqf, relaxed);
    dq->dq_state = dq_state;
    dq->dq_serialnum =
            os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);
    return dqu;
}

根据 第3 个参数 uint16_t width ,当 width = 1 => DQF_WIDTH(1) => 串行队列
接下来看一下 dq_serialnum = os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed); => dq_serialnum 来自于 _dispatch_queue_serial_numbers
全局搜一下 _dispatch_queue_serial_numbers

// skip zero
// 1 - main_q
// 2 - mgr_q
// 3 - mgr_root_q
// 4,5,6,7,8,9,10,11,12,13,14,15 - global queues
// 17 - workloop_fallback_q
// we use 'xadd' on Intel, so the initial value == next assigned
#define DISPATCH_QUEUE_SERIAL_NUMBER_INIT 17
extern unsigned long volatile _dispatch_queue_serial_numbers;

根据注解 得知 dq_serialnum 只是一个标识 dq_serialnum = 1 代表 main_q
小结 :
DQF_WIDTH(1) => 串行队列,
dq_serialnum 只是一个标识 dq_serialnum = 1 代表 mainQueue


解决了疑问后,我们再回到 _dispatch_lane_create_with_target 这个函数

ISPATCH_NOINLINE
static dispatch_queue_t
_dispatch_lane_create_with_target(const char *label, dispatch_queue_attr_t dqa,
        dispatch_queue_t tq, bool legacy)
{
   // 对 dqai 的包装
    dispatch_queue_attr_info_t dqai = _dispatch_queue_attr_to_info(dqa);

    //
    // Step 1: Normalize arguments (qos, overcommit, tq)

    // { ... } 一些优先级的处理

    // Step 2: Initialize the queue
    const void *vtable;
    dispatch_queue_flags_t dqf = legacy ? DQF_MUTABLE : 0;
    if (dqai.dqai_concurrent) {
        vtable = DISPATCH_VTABLE(queue_concurrent);
    } else {
        vtable = DISPATCH_VTABLE(queue_serial);
    }
     //  dispatch_lane_t ?  _dispatch_object_alloc 
    dispatch_lane_t dq = _dispatch_object_alloc(vtable,
            sizeof(struct dispatch_lane_s));
    _dispatch_queue_init(dq, dqf, dqai.dqai_concurrent ?
            DISPATCH_QUEUE_WIDTH_MAX : 1, DISPATCH_QUEUE_ROLE_INNER |
            (dqai.dqai_inactive ? DISPATCH_QUEUE_INACTIVE : 0));
      //  { ... }
     return _dispatch_trace_queue_create(dq)._dq;
}

看了上述代码 发现要研究的是 dispath_queue,那这个 dispatch_lane_t ❓ 先留个伏笔我们换个思路继续探究

dispatch_queue 层级关系

dispatch_queue_t
    dispatch_queue_t mainQueue = dispatch_get_main_queue();
    dispatch_queue_t globQueue = dispatch_get_global_queue(0, 0);

无论是 global_queue 还是 main_queue 都是以 dispatch_queue_t 进行赋值 ,接下来就以 dispatch_queue_t 为接入点进行分析
进入 dispatch_queue_t 👇

DISPATCH_DECL(dispatch_queue);

进入 DISPATCH_DECL()

#define DISPATCH_DECL(name) OS_OBJECT_DECL_SUBCLASS(name, dispatch_object)

无法再次进入后,去 GCD 底层源码 ,继续搜索 OS_OBJECT_DECL_SUBCLASS

#define OS_OBJECT_DECL_SUBCLASS(name, super) \
        OS_OBJECT_DECL_IMPL(name, NSObject, <OS_OBJECT_CLASS(super)>)

OS_OBJECT_DECL_SUBCLASS
参数值:name = dispatch_queue, super =dispatch_object
进入 OS_OBJECT_DECL_IMPL
传入值:(name = dispatch_queue, NSObject, <OS_OBJECT_CLASS(super = OC_dispatch_object)>)

#define OS_OBJECT_DECL_IMPL(name, adhere, ...) \
        OS_OBJECT_DECL_PROTOCOL(name, __VA_ARGS__) \
        typedef adhere<OS_OBJECT_CLASS(name)> \
                * OS_OBJC_INDEPENDENT_CLASS name##_t

OS_OBJECT_DECL_IMPL 参数

  • name = dispatch_queue
  • adhere = NSObject,
    OS_OBJECT_DECL_PROTOCOL 参数
  • name = dispatch_queue
  • VA_ARGS = OC_dispatch_object

全局搜索 OS_OBJECT_DECL_PROTOCOL

#define OS_OBJECT_CLASS(name) OS_##name
#define OS_OBJECT_DECL_PROTOCOL(name, ...) \
        @protocol OS_OBJECT_CLASS(name) __VA_ARGS__ \
        @end

根据上面代码
OS_OBJECT_DECL_IMPL(dispatch_queue, NSObject)
=> OS_OBJECT_DECL_PROTOCOL(dispatch_queue, OC_dispatch_object) = @protocol OS_dispatch_queue OC_dispatch_object
=> typedef NSObject<OC_dispatch_queue> * OS_OBJC_INDEPENDENT_CLASS dispatch_queue_t
小结:就是定义了一个 OC_dispatch_queue 泛型去接收 dispatch_queue_t


接下来再在 GCD 源码里直接全局搜索 DISPATCH_DECL(name)

#define DISPATCH_DECL(name) \
        typedef struct name##_s : public dispatch_object_s {} *name##_t

已知 name = dispatch_queue
根据👆代码,传入 dispatch_queue
typedef struct name dispatch_queue_s : public dispatch_object_s {} * dispatch_queue_t
可累计为定义了 dispatch_queue_t,本质是dispatch_queue_s, 继承于 dispatch_object_s
类似于 class -> objc_class -> objc_object

dispatch_object_s

全局搜索 struct dispatch_object_s 下面

typedef struct dispatch_object_s {
private:
    dispatch_object_s();
    ~dispatch_object_s();
    dispatch_object_s(const dispatch_object_s &);
    void operator=(const dispatch_object_s &);
} *dispatch_object_t;
  • dispatch_object_s 继承于 dispatch_object_t
    继续探索 dispatch_object_t
dispatch_queue_s

继续往下全局搜索 struct dispatch_queue_s

struct dispatch_queue_s {
    DISPATCH_QUEUE_CLASS_HEADER(queue, void *__dq_opaque1);
    /* 32bit hole on LP64 */
} DISPATCH_ATOMIC64_ALIGN;
  • 进入 _DISPATCH_QUEUE_CLASS_HEADER
#define _DISPATCH_QUEUE_CLASS_HEADER(x, __pointer_sized_field__) \
    DISPATCH_OBJECT_HEADER(x); \
    __pointer_sized_field__; \
    DISPATCH_UNION_LE(uint64_t volatile dq_state, \
            dispatch_lock dq_state_lock, \
            uint32_t dq_state_bits \
    )
#endif

再进入 DISPATCH_OBJECT_HEADER

#define DISPATCH_OBJECT_HEADER(x) \
    struct dispatch_object_s _as_do[0]; \
    _DISPATCH_OBJECT_HEADER(x)

// Swift-unavailable -init requires method in each class.
#define DISPATCH_UNAVAILABLE_INIT() \
    - (instancetype)init { \
        DISPATCH_CLIENT_CRASH(0, "-init called directly"); \
        return [super init]; \
    }

进入 _DISPATCH_OBJECT_HEADER

#define _DISPATCH_OBJECT_HEADER(x) \
    struct _os_object_s _as_os_obj[0]; \
    OS_OBJECT_STRUCT_HEADER(dispatch_##x); \
    struct dispatch_##x##_s *volatile do_next; \
    struct dispatch_queue_s *do_targetq; \
    void *do_ctxt; \
    union { \
        dispatch_function_t DISPATCH_FUNCTION_POINTER do_finalizer; \
        void *do_introspection_ctxt; \
    }

探索到这又发现 dispatch_queue_s 继承 _os_object_s
继续进入 OS_OBJECT_STRUCT_HEADER

#define OS_OBJECT_STRUCT_HEADER(x) \
    _OS_OBJECT_HEADER(\
    const struct x##_vtable_s *__ptrauth_objc_isa_pointer do_vtable, \
    do_ref_cnt, \
    do_xref_cnt)
#endif

来到这 就能看到 OS_OBJECT_STRUCT_HEADER 来自于 _OS_OBJECT_HEADER
进入 _OS_OBJECT_HEADER

#define _OS_OBJECT_HEADER(isa, ref_cnt, xref_cnt) \
        isa; /* must be pointer-sized and use __ptrauth_objc_isa_pointer */ \
        int volatile ref_cnt; \
        int volatile xref_cnt

找到 _OS_OBJECT_HEADER 定义,里面有 isa , ref_cnt, xref_cnt

dispatch_层级小结

dispatch 层级关系:我们一开始以 dispatch_queue_t 为入口,一层一层探索得知 dispatch_queue_t --> dispatch_queue_s --> _oc_object_s -->
dispatch_object_t(ps: --> = 继承)
在探索 dispatch_object_t 得知 dispatch_object_t 可以代表很多结构体 比如 _oc_object_s,它是最基础的结构体
在探索 dispatch_queue_s 时,继承了很多宏一层一层嵌套最后来到了 _OS_OBJECT_HEADER,发现其内部有 isa,ref_cnt,xref_cnt


dispatch_(a)sync 任务执行流程

dispatch_sync
    dispatch_async(dispatch_get_global_queue(0, 0), ^{
        NSLog(@" 函数分析");
    });

开始研究👆的 block 在何时被执行
进入底层源码全局搜素 dispatch_sync

void
dispatch_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

这里我们重点要注意的就是和参数 dispatch_block_t work 有关的方法
进入 _dispatch_Block_invoke

#define _dispatch_Block_invoke(bb) \
        ((dispatch_function_t)((struct Block_layout *)bb)->invoke)

_dispatch_Block_invoke 返回了 dispatch_function_t 类型的block
回到 dispatch_sync , 进入_dispatch_sync_f

static void
// void *ctxt = work , func = _dispatch_Block_invoke
_dispatch_sync_f(dispatch_queue_t dq, void *ctxt, dispatch_function_t func,
        uintptr_t dc_flags)
{
    _dispatch_sync_f_inline(dq, ctxt, func, dc_flags);
}

进入 _dispatch_sync_f_inline

static inline void
_dispatch_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    if (likely(dq->dq_width == 1)) {
        return _dispatch_barrier_sync_f(dq, ctxt, func, dc_flags);
    }

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }

    dispatch_lane_t dl = upcast(dq)._dl;
    // Global concurrent queues and queues bound to non-dispatch threads
    // always fall into the slow case, see DISPATCH_ROOT_QUEUE_STATE_INIT_VALUE
    if (unlikely(!_dispatch_queue_try_reserve_sync_width(dl))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, 0, dl, dc_flags);
    }

    if (unlikely(dq->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func, dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_sync_invoke_and_complete(dl, ctxt, func DISPATCH_TRACE_ARG(
            _dispatch_trace_item_sync_push_pop(dq, ctxt, func, dc_flags)));
}

发现对 ctxt & func 处理的分支有很多,无法定位下一步。
这时可以动态调试 ,在调用dispatch_sync 添加 相关方法的 符号断点
先从 _dispatch_barrier_sync_f 开始

继续下一步,发现没有来到 _dispatch_barrier_sync_f
继续尝试下一个 _dispatch_sync_f_slow

发现来到了 libdispatch.dylib:_dispatch_sync_f_slow: 接下来我们继续探索进入 _dispatch_sync_f_slow

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}
  • 继续观察在方法内部 哪里调用了 ctxt, func
    第一处: _dispatch_sync_function_invoke(dq, ctxt, func);
    第二处: _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags DISPATCH_TRACE_ARG(&dsc));
    继续在上面两处添加符号断点进行调试👇

下一步👇


发现来到了 libdispatch.dylib:_dispatch_sync_function_invoke:: 接下来我们继续探索进入 _dispatch_sync_function_invoke

static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
}

进入 _dispatch_sync_function_invoke_inline

_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

进入 _dispatch_client_callout

static inline void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    return f(ctxt);
}

发现在 _dispatch_client_callout 会对 block 进行调用

dispatch_async

进入底层源码全局搜素 dispatch_async

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;
        // 配置信息
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

进入 _dispatch_continuation_init 看看做了哪些配置

static inline dispatch_qos_t
_dispatch_continuation_init(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, dispatch_block_t work,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
      // 对  work 封装
    void *ctxt = _dispatch_Block_copy(work);

    dc_flags |= DC_FLAG_BLOCK | DC_FLAG_ALLOCATED;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        dc->dc_flags = dc_flags;
        dc->dc_ctxt = ctxt;
        // will initialize all fields but requires dc_flags & dc_ctxt to be set
        return _dispatch_continuation_init_slow(dc, dqu, flags);
    }
      // 封装成 dispatch_function_t
    dispatch_function_t func = _dispatch_Block_invoke(work);
    if (dc_flags & DC_FLAG_CONSUME) {
        func = _dispatch_call_block_and_release;
    }
    return _dispatch_continuation_init_f(dc, dqu, ctxt, func, flags, dc_flags);
}

进入 _dispatch_continuation_init_f

static inline dispatch_qos_t
_dispatch_continuation_init_f(dispatch_continuation_t dc,
        dispatch_queue_class_t dqu, void *ctxt, dispatch_function_t f,
        dispatch_block_flags_t flags, uintptr_t dc_flags)
{
    pthread_priority_t pp = 0;
    dc->dc_flags = dc_flags | DC_FLAG_ALLOCATED;
    dc->dc_func = f;
    dc->dc_ctxt = ctxt;
    // in this context DISPATCH_BLOCK_HAS_PRIORITY means that the priority
    // should not be propagated, only taken from the handler if it has one
    if (!(flags & DISPATCH_BLOCK_HAS_PRIORITY)) {
        pp = _dispatch_priority_propagate();
    }
    _dispatch_continuation_voucher_set(dc, flags);
        // 优先级处理
    return _dispatch_continuation_priority_set(dc, dqu, pp, flags);
}

dispatch_continuation_tfunc 以及 ctxt 进行了包装,
return _dispatch_continuation_priority_set() 优先级进行了处理
再回到 dispatch_async() 👇

void
dispatch_async(dispatch_queue_t dq, dispatch_block_t work)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME;
    dispatch_qos_t qos;
     //  任务的封装和优先级的封装
    qos = _dispatch_continuation_init(dc, dq, work, 0, dc_flags);
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
  • 分析_dispatch_continuation_init 得知在里面做了任务的封装和优先级的封装

为什么在调用 _dispatch_continuation_async 前要做任务的封装和优先级的封装呢?

  • 因为 dispatch_async 执行是异步,异步调用是无序的,优先级就是调用的衡量依据。另外异步调用说明任务的执行也是异步的,而任务异步的执行是根据 CPU 的调度而决定。

分析完 _dispatch_continuation_init 后,继续进行探索进入_dispatch_continuation_async

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

进入 dx_push

#define dx_push(x, y, z) dx_vtable(x)->dq_push(x, y, z)

来到 dx_push ,找到重点我们要分析的是 qos 也就是这里的值 z
直接全局搜索 dq_push

DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
    .do_type        = DISPATCH_QUEUE_GLOBAL_ROOT_TYPE,
    .do_dispose     = _dispatch_object_no_dispose,
    .do_debug       = _dispatch_queue_debug,
    .do_invoke      = _dispatch_object_no_invoke,

    .dq_activate    = _dispatch_queue_no_activate,
    .dq_wakeup      = _dispatch_root_queue_wakeup,
    .dq_push        = _dispatch_root_queue_push,
);

如果是全局并发队列,dq_push = _dispatch_root_queue_push
全局搜索 _dispatch_root_queue_push

void
_dispatch_root_queue_push(dispatch_queue_global_t rq, dispatch_object_t dou,
        dispatch_qos_t qos)
{   
    // { ... }
    #if HAVE_PTHREAD_WORKQUEUE_QOS
    if (_dispatch_root_queue_push_needs_override(rq, qos)) {
        return _dispatch_root_queue_push_override(rq, dou, qos);
    }
    #else
         (void)qos;
    #endif
    _dispatch_root_queue_push_inline(rq, dou, dou, 1);

这里有一个 _dispatch_root_queue_push_override_dispatch_root_queue_push_inline
_dispatch_root_queue_push_override 内部最终也会执行 _dispatch_root_queue_push_inline
直接进入 _dispatch_root_queue_push_inline

static inline void
_dispatch_root_queue_push_inline(dispatch_queue_global_t dq,
        dispatch_object_t _head, dispatch_object_t _tail, int n)
{
    struct dispatch_object_s *hd = _head._do, *tl = _tail._do;
    if (unlikely(os_mpsc_push_list(os_mpsc(dq, dq_items), hd, tl, do_next))) {
        return _dispatch_root_queue_poke(dq, n, 0);
    }
}

进入 _dispatch_root_queue_poke

    if (!_dispatch_queue_class_probe(dq)) {
        return;
    }
#if !DISPATCH_USE_INTERNAL_WORKQUEUE
#if DISPATCH_USE_PTHREAD_POOL
    if (likely(dx_type(dq) == DISPATCH_QUEUE_GLOBAL_ROOT_TYPE))
#endif
    {
        if (unlikely(!os_atomic_cmpxchg2o(dq, dgq_pending, 0, n, relaxed))) {
            _dispatch_root_queue_debug("worker thread request still pending "
                    "for global queue: %p", dq);
            return;
        }
    }
#endif // !DISPATCH_USE_INTERNAL_WORKQUEUE
    return _dispatch_root_queue_poke_slow(dq, n, floor);

进入 _dispatch_root_queue_poke_slow

_dispatch_root_queue_poke_slow(dispatch_queue_global_t dq, int n, int floor)
{
    int remaining = n;
#if !defined(_WIN32)
    int r = ENOSYS;
#endif
    _dispatch_root_queues_init();
      // { ... 200 多行代码}
}

来到了 _dispatch_root_queue_poke_slow 发现了有200多行代码。不可能一步一步看的。这是只能采用逆推法
运行 dispatch_asyncbt 打印 👇

libdispatch.dylib:_dispatch_worker_thread2 为入口全局搜索 dispatch_worker_thread2 找到在哪里 调用了 它

static void
_dispatch_root_queues_init_once(void *context DISPATCH_UNUSED)
{
    // {...}
    // 对线程池的配置
    struct pthread_workqueue_config cfg = {  ... }
    r = _pthread_workqueue_init_with_workloop(_dispatch_worker_thread2,
                (pthread_workqueue_function_kevent_t)
                _dispatch_kevent_worker_thread,
                (pthread_workqueue_function_workloop_t)
                _dispatch_workloop_worker_thread,
                offsetof(struct dispatch_queue_s, dq_serialnum), 0);
      // {...}
}

发现在 _dispatch_root_queues_init_once 调用了 _dispatch_worker_thread2
全局搜索 _dispatch_root_queues_init_once

DISPATCH_STATIC_GLOBAL(dispatch_once_t _dispatch_root_queues_pred);
DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_root_queues_init(void)
{
   dispatch_once_f(&_dispatch_root_queues_pred, NULL,
           _dispatch_root_queues_init_once);
}

_dispatch_root_queues_init 调用了 _dispatch_root_queues_init_once
_dispatch_root_queues_init 是在 _dispatch_root_queue_poke_slow 进行调用的,这下流程也就通了
继续回到 _dispatch_worker_thread2 定义

static void
_dispatch_worker_thread2(pthread_priority_t pp)
{
    bool overcommit = pp & _PTHREAD_PRIORITY_OVERCOMMIT_FLAG;
    dispatch_queue_global_t dq;

    pp &= _PTHREAD_PRIORITY_OVERCOMMIT_FLAG | ~_PTHREAD_PRIORITY_FLAGS_MASK;
    _dispatch_thread_setspecific(dispatch_priority_key, (void *)(uintptr_t)pp);
    dq = _dispatch_get_root_queue(_dispatch_qos_from_pp(pp), overcommit);

    _dispatch_introspection_thread_add();
    _dispatch_trace_runtime_event(worker_unpark, dq, 0);

    int pending = os_atomic_dec2o(dq, dgq_pending, relaxed);
    dispatch_assert(pending >= 0);
    _dispatch_root_queue_drain(dq, dq->dq_priority,
            DISPATCH_INVOKE_WORKER_DRAIN | DISPATCH_INVOKE_REDIRECTING_DRAIN);
    _dispatch_voucher_debug("root queue clear", NULL);
    _dispatch_reset_voucher(NULL, DISPATCH_THREAD_PARK);
    _dispatch_trace_runtime_event(worker_park, NULL, 0);
}

找到对 dq 的调用, 进入 _dispatch_root_queue_drain

static void
_dispatch_root_queue_drain(dispatch_queue_global_t dq,
        dispatch_priority_t pri, dispatch_invoke_flags_t flags)
{
#if DISPATCH_DEBUG
    dispatch_queue_t cq;
    if (unlikely(cq = _dispatch_queue_get_current())) {
        DISPATCH_INTERNAL_CRASH(cq, "Premature thread recycling");
    }
#endif
    _dispatch_queue_set_current(dq);
    _dispatch_init_basepri(pri);
    _dispatch_adopt_wlh_anon();

    struct dispatch_object_s *item;
    bool reset = false;
    dispatch_invoke_context_s dic = { };
#if DISPATCH_COCOA_COMPAT
    _dispatch_last_resort_autorelease_pool_push(&dic);
#endif // DISPATCH_COCOA_COMPAT
    _dispatch_queue_drain_init_narrowing_check_deadline(&dic, pri);
    _dispatch_perfmon_start();
    while (likely(item = _dispatch_root_queue_drain_one(dq))) {
        if (reset) _dispatch_wqthread_override_reset();
        _dispatch_continuation_pop_inline(item, &dic, flags, dq);
        reset = _dispatch_reset_basepri_override();
        if (unlikely(_dispatch_queue_drain_should_narrow(&dic))) {
            break;
        }
    }

    // overcommit or not. worker thread
       { ... }

找到对 dq 调用, 进入 _dispatch_continuation_pop_inline

static inline void
_dispatch_continuation_pop_inline(dispatch_object_t dou,
        dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags,
        dispatch_queue_class_t dqu)
{
    dispatch_pthread_root_queue_observer_hooks_t observer_hooks =
            _dispatch_get_pthread_root_queue_observer_hooks();
    if (observer_hooks) observer_hooks->queue_will_execute(dqu._dq);
    flags &= _DISPATCH_INVOKE_PROPAGATE_MASK;
    if (_dispatch_object_has_vtable(dou)) {
        dx_invoke(dou._dq, dic, flags);
    } else {
        _dispatch_continuation_invoke_inline(dou, flags, dqu);
    }
    if (observer_hooks) observer_hooks->queue_did_execute(dqu._dq);
}

进入 _dispatch_continuation_invoke_inline

static inline void
_dispatch_continuation_invoke_inline(dispatch_object_t dou,
        dispatch_invoke_flags_t flags, dispatch_queue_class_t dqu)
{
    dispatch_continuation_t dc = dou._dc, dc1;
    dispatch_invoke_with_autoreleasepool(flags, {
        uintptr_t dc_flags = dc->dc_flags;
        // Add the item back to the cache before calling the function. This
        // allows the 'hot' continuation to be used for a quick callback.
        //
        // The ccache version is per-thread.
        // Therefore, the object has not been reused yet.
        // This generates better assembly.
        _dispatch_continuation_voucher_adopt(dc, dc_flags);
        if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
            _dispatch_trace_item_pop(dqu, dou);
        }
        if (dc_flags & DC_FLAG_CONSUME) {
            dc1 = _dispatch_continuation_free_cacheonly(dc);
        } else {
            dc1 = NULL;
        }
        if (unlikely(dc_flags & DC_FLAG_GROUP_ASYNC)) {
            _dispatch_continuation_with_group_invoke(dc);
        } else {
            _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
            _dispatch_trace_item_complete(dc);
        }
        if (unlikely(dc1)) {
            _dispatch_continuation_free_to_cache_limit(dc1);
        }
    });
    _dispatch_perfmon_workitem_inc();
}

进入 _dispatch_client_callout

void
_dispatch_client_callout(void *ctxt, dispatch_function_t f)
{
    _dispatch_get_tsd_base();
    void *u = _dispatch_get_unwind_tsd();
    if (likely(!u)) return f(ctxt);
    _dispatch_set_unwind_tsd(NULL);
    f(ctxt);
    _dispatch_free_unwind_tsd();
    _dispatch_set_unwind_tsd(u);
}

核心 return f(ctxt); 找到了对 block() 的调用

dispatch_(a)sync 流程小结

dispatch_sync 流程:dispatch_sync -> _dispatch_sync_f_slow -> _dispatch_sync_function_invoke -> _dispatch_client_callout -> return f(ctxt);
dispatch_async 流程: dispatch_async -> _dispatch_continuation_async -> dq_push -> _dispatch_root_queue_push -> _dispatch_root_queue_push_inline
-> _dispatch_root_queue_poke -> _dispatch_root_queues_init ->
_dispatch_root_queues_init_once -> _dispatch_worker_thread2 ->
_dispatch_root_queue_drain -> _dispatch_continuation_invoke_inline-> _dispatch_client_callout -> return f(ctxt);
根据流程可知 dispatch_async 要比 dispatch_async 复杂太多了


总结

这次主要分享的内容大致可分为四部分
第一部分: GCD 基本概念 :全称是 Grand Central Dispatch, 纯C语⾔,提供了⾮常多强⼤的函数。
第二部分: 并行串行队列的定义以及区分
第三部分: GCD 源码探索 —dispath_queue 层级结构。探索的结果就是: dispatch_queue_t 继承 dispatch_queue_s 继承 _oc_object_s 继承 dispatch_object_t
在探索 dispatch_queue_s 时,其内部来自于很多宏的嵌套最后来到了 _OS_OBJECT_HEADER,发现其内部有 isa,ref_cnt,xref_cnt
第四部分: GCD 源码探索 —dispath_(a)sync 底层的流程结构
还留下一个疑问点: dispatch_lane_t ❓ 下篇文章会具体分析


知识点补充

os_atomic_inc_orig

在分析 dispatch_queue_create 时会来到 os_atomic_inc_orig 👇

dq_serialnum =
            os_atomic_inc_orig(&_dispatch_queue_serial_numbers, relaxed);`

进入 os_atomic_inc_orig,假设参数 p = 17

#define os_atomic_inc_orig(p, m) \
        os_atomic_add_orig((p), 1, m)

传入参数: os_atomic_inc_orig( p = 17, m = relaxed)
=> os_atomic_add_orig((17), 1, relaxed)

进入 os_atomic_add_orig

#define os_atomic_add_orig(p, v, m) \
        _os_atomic_c11_op_orig((p), (v), m, add, +)

传入参数: os_atomic_add_orig((17), 1, relaxed)
=> _os_atomic_c11_op_orig((17), (1), relaxed, add, +)

进入 _os_atomic_c11_op_orig

#define _os_atomic_c11_op_orig(p, v, m, o, op) \
        atomic_fetch_##o##_explicit(_os_atomic_c11_atomic(p), v, \
        memory_order_##m)

传入参数: _os_atomic_c11_op_orig(p = 17,v = 1,m = relaxed, o = add, op = +)
=> atomic_fetch_ (0 = add) _explicit(_os_atomic_c11_atomic(p = 17 ), v = 1, memory_order_ (m = relaxed)
=> atomic_fetch_add_explicit(_Atomic *)17, 1, memory_order_relaxed)
atomic_fetch_add_explicit()是一个 c 的 原子操作可以理解就是对 17 + 1 操作 ,具体可以看下这篇文章
小结:为何要怎么做? 其实就是为了兼容让 oc 层代码不受影响

多线程题目分析 1

- (void)wbinterDemo{
    dispatch_queue_t queue = dispatch_queue_create("com.lg.cooci.cn", DISPATCH_QUEUE_SERIAL);

    dispatch_async(queue, ^{
        NSLog(@"1");
    });
    
    dispatch_async(queue, ^{
        NSLog(@"2");
    });

    dispatch_sync(queue, ^{ NSLog(@"3"); });
    
    NSLog(@"0");

    dispatch_async(queue, ^{
        NSLog(@"7");
    });
    dispatch_async(queue, ^{
        NSLog(@"8");
    });
    dispatch_async(queue, ^{
        NSLog(@"9");
    });
}
  • 输出结果选项
    A: 1230789
    B: 1237890
    C: 3120798
    D: 2137890

我们先不看选项,直接进行分析
第1步: DISPATCH_QUEUE_SERIAL 串行队列,确定队列出口很窄,只能一个一个出去
第2步: dispatch_async 执行任务 1,2dispatch_sync 执行任务 3 ,而且执行任务是需要时间的 ,但是是串行队列,所以能确定 1,2,3 是按顺序执行
第3步: dispatch_sync { 任务 3 } 同步函数,堵塞在NSLog(@"0");之前,所以能确定执行完dispatch_sync后才会执行下面的代码。
第4步: dispatch_async 执行任务 7,8,9 但在串行队列中。
所以能确定 7,8,9 也是按顺序执行,并在 0 之后。

  • 最后结论::123 按顺序 -> 0 -> 789按顺序
    故选择 A: 1230789

多线程题目分析 2

- (void)MTDemo{
    while (self.num < 5) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            self.num++;
        });
    }
    NSLog(@"end : %d",self.num);
}

这是 kc老师给我们学员出的一道 MT 面试题,都是非常基础的知识点,基础很差的我还是没有做出来。 有些东西只知道会用,但再深挖一下,就全然不知了
就拿 while {}来说,大家都知道就是一个 循环吗,但当你再往深一点思考一下,while 只要不满足条件,就会一直循环下去,说白了就是一个死循环。

  • 开始结合这道题来分析 while (self.num < 5) {} => 只有当 self.num >= 5 时候才会结束 循环,所以 self.num 肯定 不会小于 5 👇
  • 最后结论就是:num >= 5
    运行验证分析结果👇

多线程题目分析 3

- (void)KSDemo{
    for (int i= 0; i<10000; i++) {
        dispatch_async(dispatch_get_global_queue(0, 0), ^{
            self.num++;
        });
    }
    NSLog(@"end : %d",self.num);
}

这个题目是 KS 多线程面试题。把 while 改成了 for
开始分析: for 循环肯定不会死循环。那 NSLog() 肯定能执行到的
情况1:线程开始执行任务,任务都没完成,直接输出 NSLog => num = 0
情况2:循环开始, 开启了10000条线程,其中部分条线程任务执行的很快在 NSLog 之前就执行完毕. => self.num < 10000
情况3:循环开始, 开启了10000条线程,所有线程任务执行的很快在 NSLog 之前全都执行完毕. => self.num = 10000

  • 最后结论就是:0 <= num <= 10000
    小结:这两道题说明一个很重要的点:多线程的读写是很不安全的。容易导致数据读写的混乱,为了避免发生这种情况就引入。我们会在下次学习中进行分享。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容