iOS 底层学习24 — GCD(三)

前言

iOS 底层第25天的学习。今天将迎来GCD 底层原理探索的最后一个篇章,将要学习的内容主要有 dispatch_barrier_async 栅栏函数底层源码的探索以及信号量等。

dispatch_barrier_(a)sync

不知你是否在开发过程中会用到 dispatch_barrier_(a)sync,我们先来看一下它的定义
栅栏函数:主要是控制任务的执行顺序,同步。

dispatch_barrier_sync:前面的任务执行完毕,才会执行 dispatch_barrier_sync,但会堵塞线程,影响当前线程的执行。

dispatch_barrier_async: 前面的任务执行完毕,才会执行 dispatch_barrier_async 里面的任务


dispatch_barrier_(a)sync 应用

   dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_async(concurrentQueue, ^{
        sleep(1);
        NSLog(@">>>>>>> 执行任务1");
    });

    dispatch_async(concurrentQueue, ^{
        NSLog(@">>>>>>> 执行任务2");
    });
    
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_async(concurrentQueue, ^{
        NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
    });
    
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NSLog(@">>>>>>> 执行任务3");
    });
    NSLog(@">>>>>>> 执行任务4");
    

打印输出👇

验证了一下:栅栏函数任务 会在 任务1任务2 之后进行执行
dispatch_barrier_async 改成 dispatch_barrier_sync 打印输出👇

改成 dispatch_barrier_sync 后,栅栏函数任务 会在 任务1任务2 之后进行执行,但会堵塞之后的 任务3任务4.

这里留下一个疑问点:栅栏函数是如何让任务1任务2执行完之后,再去执行其内部的任务的呢 ❓

我们把 queue 改成 global_queue

  dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
    
    dispatch_async(concurrentQueue, ^{
        sleep(1);
        NSLog(@">>>>>>> 执行任务1");
    });

    dispatch_async(concurrentQueue, ^{
        NSLog(@">>>>>>> 执行任务2");
    });
    
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_sync(concurrentQueue, ^{
        NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
    });
    
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NSLog(@">>>>>>> 执行任务3");
    });
    NSLog(@">>>>>>> 执行任务4");

输出 👇

发现了在全局并行队列里,栅栏函数没起作用

为何 dispatch_barrier_sync 在全局并行队列无效呢❓

在修改一下代码👇 , 验证一下在不同的列队,栅栏函数是否有效

 dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
    // 新增 concurrentQueue2
  dispatch_queue_t concurrentQueue2 = dispatch_queue_create("xkQueue2", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_async(concurrentQueue, ^{
        sleep(1);
        NSLog(@">>>>>>> 执行任务1");
    });

    dispatch_async(concurrentQueue, ^{
        NSLog(@">>>>>>> 执行任务2");
    });
    
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_sync(concurrentQueue2, ^{
        NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
    });
    
    dispatch_async(concurrentQueue, ^{
        sleep(2);
        NSLog(@">>>>>>> 执行任务3");
    });
    NSLog(@">>>>>>> 执行任务4");

打印输出👇

验证结果:栅栏函数在不同的队列里是无效的。

小结
栅栏函数要起到作用,必须在同一个队列并且当前队列不是全局的。
还留下了2个问题

  • 为何全局并发无效❓
  • 为何栅栏函数能让前面的任务执行,再执行自己的任务❓
    会在下面的底层探索里进行解答

dispatch_barrier_(a)sync 底层探索

进入源码,全局搜索 dispatch_barrier_sync

void
dispatch_barrier_sync(dispatch_queue_t dq, dispatch_block_t work)
{
    uintptr_t dc_flags = DC_FLAG_BARRIER | DC_FLAG_BLOCK;
    if (unlikely(_dispatch_block_has_private_data(work))) {
        return _dispatch_sync_block_with_privdata(dq, work, dc_flags);
    }
    _dispatch_barrier_sync_f(dq, work, _dispatch_Block_invoke(work), dc_flags);
}

进入 _dispatch_barrier_sync_f

static void
_dispatch_barrier_sync_f(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    _dispatch_barrier_sync_f_inline(dq, ctxt, func, dc_flags);
}

进入 _dispatch_barrier_sync_f_inline

static inline void
_dispatch_barrier_sync_f_inline(dispatch_queue_t dq, void *ctxt,
        dispatch_function_t func, uintptr_t dc_flags)
{
    dispatch_tid tid = _dispatch_tid_self();

    if (unlikely(dx_metatype(dq) != _DISPATCH_LANE_TYPE)) {
        DISPATCH_CLIENT_CRASH(0, "Queue type doesn't support dispatch_sync");
    }
    dispatch_lane_t dl = upcast(dq)._dl;
    if (unlikely(!_dispatch_queue_try_acquire_barrier_sync(dl, tid))) {
        return _dispatch_sync_f_slow(dl, ctxt, func, DC_FLAG_BARRIER, dl,
                DC_FLAG_BARRIER | dc_flags);
    }

    if (unlikely(dl->do_targetq->do_targetq)) {
        return _dispatch_sync_recurse(dl, ctxt, func,
                DC_FLAG_BARRIER | dc_flags);
    }
    _dispatch_introspection_sync_begin(dl);
    _dispatch_lane_barrier_sync_invoke_and_complete(dl, ctxt, func
            DISPATCH_TRACE_ARG(_dispatch_trace_item_sync_push_pop(
                    dq, ctxt, func, dc_flags | DC_FLAG_BARRIER)));
}

添加符号断点 _dispatch_sync_f_slow & _dispatch_sync_recurse

进入 _dispatch_sync_f_slow

static void
_dispatch_sync_f_slow(dispatch_queue_class_t top_dqu, void *ctxt,
        dispatch_function_t func, uintptr_t top_dc_flags,
        dispatch_queue_class_t dqu, uintptr_t dc_flags)
{
    dispatch_queue_t top_dq = top_dqu._dq;
    dispatch_queue_t dq = dqu._dq;
    if (unlikely(!dq->do_targetq)) {
        return _dispatch_sync_function_invoke(dq, ctxt, func);
    }

    pthread_priority_t pp = _dispatch_get_priority();
    struct dispatch_sync_context_s dsc = {
        .dc_flags    = DC_FLAG_SYNC_WAITER | dc_flags,
        .dc_func     = _dispatch_async_and_wait_invoke,
        .dc_ctxt     = &dsc,
        .dc_other    = top_dq,
        .dc_priority = pp | _PTHREAD_PRIORITY_ENFORCE_FLAG,
        .dc_voucher  = _voucher_get(),
        .dsc_func    = func,
        .dsc_ctxt    = ctxt,
        .dsc_waiter  = _dispatch_tid_self(),
    };

    _dispatch_trace_item_push(top_dq, &dsc);
    __DISPATCH_WAIT_FOR_QUEUE__(&dsc, dq);

    if (dsc.dsc_func == NULL) {
        // dsc_func being cleared means that the block ran on another thread ie.
        // case (2) as listed in _dispatch_async_and_wait_f_slow.
        dispatch_queue_t stop_dq = dsc.dc_other;
        return _dispatch_sync_complete_recurse(top_dq, stop_dq, top_dc_flags);
    }

    _dispatch_introspection_sync_begin(top_dq);
    _dispatch_trace_item_pop(top_dq, &dsc);
    _dispatch_sync_invoke_and_complete_recurse(top_dq, ctxt, func,top_dc_flags
            DISPATCH_TRACE_ARG(&dsc));
}

发现 dispatch_barrier_syncdispatch_sync 都会来到 _dispatch_sync_f_slow ,也会来到 __DISPATCH_WAIT_FOR_QUEUE__ 死锁的报错
继续添加符号断点👇


怎么直接来到了 _dispatch_sync_invoke_and_complete_recurse

这时感觉很奇怪 :跳了步骤了,怎么就直接完成了呢?
肯定是哪里出了问题 符号断点没断住

我们带着疑问回到 _dispatch_sync_invoke_and_complete_recurse 之前的调用的函数 _dispatch_sync_complete_recurse
直接进入 _dispatch_sync_complete_recurse

static void
_dispatch_sync_complete_recurse(dispatch_queue_t dq, dispatch_queue_t stop_dq,
        uintptr_t dc_flags)
{
    bool barrier = (dc_flags & DC_FLAG_BARRIER);
    do {
        if (dq == stop_dq) return;
              // 如果是栅栏就调用 dx_wakeup
        if (barrier) {
            dx_wakeup(dq, 0, DISPATCH_WAKEUP_BARRIER_COMPLETE);
        } else {
            _dispatch_lane_non_barrier_complete(upcast(dq)._dl, 0);
        }
        dq = dq->do_targetq;
        barrier = (dq->dq_width == 1);
    } while (unlikely(dq->do_targetq));
}

看代码可知 do while 死循环 if (barrier) 有栅栏 调用 dx_wake 唤醒
else 没有栅栏,调用 _dispatch_lane_non_barrier_complete

我们在之前已经知道了dx_wakeup -> dq_wakeup
全局 搜索 dq_wakeup

// 并发
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_concurrent, lane,
      // ....
    .dq_wakeup      = _dispatch_lane_wakeup,
);
//  全局并发
DISPATCH_VTABLE_SUBCLASS_INSTANCE(queue_global, lane,
      // ....
    .dq_wakeup      = _dispatch_root_queue_wakeup,
);

进行验证添加符号断点 _dispatch_lane_wakeup

解决了疑问点: 发现在执行 _dispatch_sync_invoke_and_complete_recurse 之前会先来到 _dispatch_sync_complete_recurse ->_dispatch_lane_wakeup
进入 _dispatch_lane_wakeup

void
_dispatch_lane_wakeup(dispatch_lane_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;

    if (unlikely(flags & DISPATCH_WAKEUP_BARRIER_COMPLETE)) {
        return _dispatch_lane_barrier_complete(dqu, qos, flags);
    }
    if (_dispatch_queue_class_probe(dqu)) {
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }
    return _dispatch_queue_wakeup(dqu, qos, flags, target);
}

符号断点会来到 _dispatch_queue_wakeup 调度队列唤醒任务
当队列任务唤醒完以后 还会来到 _dispatch_lane_non_barrier_complete添加符号断点进行验证


进入 _dispatch_lane_non_barrier_complete

static void
_dispatch_lane_non_barrier_complete(dispatch_lane_t dq,
        dispatch_wakeup_flags_t flags)
{
    uint64_t old_state, new_state, owner_self = _dispatch_lock_value_for_self();

    // see _dispatch_lane_resume()
    os_atomic_rmw_loop2o(dq, dq_state, old_state, new_state, relaxed, {
        new_state = old_state - DISPATCH_QUEUE_WIDTH_INTERVAL;
        if (unlikely(_dq_state_drain_locked(old_state))) {
            // make drain_try_unlock() fail and reconsider whether there's
            // enough width now for a new item
            new_state |= DISPATCH_QUEUE_DIRTY;
        } else if (likely(_dq_state_is_runnable(new_state))) {
            new_state = _dispatch_lane_non_barrier_complete_try_lock(dq,
                    old_state, new_state, owner_self);
        }
    });

    _dispatch_lane_non_barrier_complete_finish(dq, flags, old_state, new_state);
}

进入 _dispatch_lane_non_barrier_complete_finish

static void
_dispatch_lane_non_barrier_complete_finish(dispatch_lane_t dq,
        dispatch_wakeup_flags_t flags, uint64_t old_state, uint64_t new_state)
{. // ...
    if ((old_state ^ new_state) & DISPATCH_QUEUE_IN_BARRIER) {
        if (_dq_state_is_dirty(old_state)) {
            // <rdar://problem/14637483>
            // dependency ordering for dq state changes that were flushed
            // and not acted upon
            os_atomic_thread_fence(dependency);
            dq = os_atomic_inject_dependency(dq, (unsigned long)old_state);
        }
        return _dispatch_lane_barrier_complete(dq, 0, flags);
    }
  // ...
}

添加符号断点 _dispatch_lane_barrier_complete

进入 _dispatch_lane_barrier_complete

static void
_dispatch_lane_barrier_complete(dispatch_lane_class_t dqu, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags)
{
    dispatch_queue_wakeup_target_t target = DISPATCH_QUEUE_WAKEUP_NONE;
    dispatch_lane_t dq = dqu._dl;

    if (dq->dq_items_tail && !DISPATCH_QUEUE_IS_SUSPENDED(dq)) {
        struct dispatch_object_s *dc = _dispatch_queue_get_head(dq);
        if (likely(dq->dq_width == 1 || _dispatch_object_is_barrier(dc))) {
            if (_dispatch_object_is_waiter(dc)) {
                return _dispatch_lane_drain_barrier_waiter(dq, dc, flags, 0);
            }
        } else if (dq->dq_width > 1 && !_dispatch_object_is_barrier(dc)) {
            return _dispatch_lane_drain_non_barriers(dq, dc, flags);
        }

        if (!(flags & DISPATCH_WAKEUP_CONSUME_2)) {
            _dispatch_retain_2(dq);
            flags |= DISPATCH_WAKEUP_CONSUME_2;
        }
        target = DISPATCH_QUEUE_WAKEUP_TARGET;
    }

    uint64_t owned = DISPATCH_QUEUE_IN_BARRIER +
            dq->dq_width * DISPATCH_QUEUE_WIDTH_INTERVAL;
    return _dispatch_lane_class_barrier_complete(dq, qos, flags, target, owned);
}

最后进入 _dispatch_lane_class_barrier_complete

static void
_dispatch_lane_class_barrier_complete(dispatch_lane_t dq, dispatch_qos_t qos,
        dispatch_wakeup_flags_t flags, dispatch_queue_wakeup_target_t target,
        uint64_t owned)
{
    uint64_t old_state, new_state, enqueue;
    dispatch_queue_t tq;

    if (target == DISPATCH_QUEUE_WAKEUP_MGR) {
        tq = _dispatch_mgr_q._as_dq;
        enqueue = DISPATCH_QUEUE_ENQUEUED_ON_MGR;
    } else if (target) {
        tq = (target == DISPATCH_QUEUE_WAKEUP_TARGET) ? dq->do_targetq : target;
        enqueue = DISPATCH_QUEUE_ENQUEUED;
    } else {
        tq = NULL;
        enqueue = 0;
    }
      // ....
}

_dispatch_lane_class_barrier_complete里面主要做了对栅栏函数的清空处理,让队列中下面的任务不受影响

再次添加 _dispatch_sync_invoke_and_complete_recurse 符号断点,进行验证

进入 _dispatch_sync_invoke_and_complete_recurse

static void
_dispatch_sync_invoke_and_complete_recurse(dispatch_queue_class_t dq,
        void *ctxt, dispatch_function_t func, uintptr_t dc_flags
        DISPATCH_TRACE_ARG(void *dc))
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
    _dispatch_trace_item_complete(dc);
    _dispatch_sync_complete_recurse(dq._dq, NULL, dc_flags);
}

进入 _dispatch_sync_function_invoke_inline

static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func); // callout
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

小结
根据 dispatch_barrier_sync 底层 探索得知👇

我探索得出的结论是这样的一个流程,其实我也不确定真假,或许这只是 dispatch_barrier_sync一种情况而已.

梳理一下把疑问点:为何栅栏函数能让前面的任务执行完毕,再执行自己的任务

在调用dispatch_barrier_sync 会进入一个死循环,当栅栏函数前面的任务都唤醒完毕后会进入_dispatch_lane_non_barrier_complete 里面,目的是为了删除栅栏,删除完栅栏跳出循环发起 callout 再去执行栅栏函数里任务。
而删除栅栏的目的就为了不影响后面任务的执行。

探索的代码👇

  dispatch_queue_t concurrentQueue = dispatch_queue_create("xkQueue", DISPATCH_QUEUE_CONCURRENT);
    
    dispatch_async(concurrentQueue, ^{
        sleep(10);
        NSLog(@">>>>>>> 执行任务1");
    });

    dispatch_async(concurrentQueue, ^{
        sleep(10);
        NSLog(@">>>>>>> 执行任务2");
    });
    
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_sync(concurrentQueue, ^{
        NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
    });

回到 _dispatch_sync_f_slow

把疑问点:为何全局并发设置栅栏函数无效给解决了

代码👇

    dispatch_queue_t concurrentQueue = dispatch_get_global_queue(0, 0);
    
    dispatch_async(concurrentQueue, ^{
        sleep(10);
        NSLog(@">>>>>>> 执行任务1");
    });

    dispatch_async(concurrentQueue, ^{
        sleep(10);
        NSLog(@">>>>>>> 执行任务2");
    });
    
    /* 2. 栅栏函数 */ // - dispatch_barrier_sync
    dispatch_barrier_sync(concurrentQueue, ^{
        NSLog(@">>>>>>>> 栅栏函数任务 ----%@-----",[NSThread currentThread]);
    });

添加符号断点

发现全局并发直接来到了 _dispatch_sync_function_invoke
进入 _dispatch_sync_function_invoke

DISPATCH_NOINLINE
static void
_dispatch_sync_function_invoke(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    _dispatch_sync_function_invoke_inline(dq, ctxt, func);
}

进入 _dispatch_sync_function_invoke_inline

static inline void
_dispatch_sync_function_invoke_inline(dispatch_queue_class_t dq, void *ctxt,
        dispatch_function_t func)
{
    dispatch_thread_frame_s dtf;
    _dispatch_thread_frame_push(&dtf, dq);
    _dispatch_client_callout(ctxt, func);
    _dispatch_perfmon_workitem_inc();
    _dispatch_thread_frame_pop(&dtf);
}

小结:全局并发队列根本没有做 barrier 的处理 ,而是直接调用了 _dispatch_sync_function_invoke_inline -> _dispatch_client_callout
所以栅栏函数无效


dispatch_semaphore_t

dispatch_semaphore_t 它是多元信号量,允许多个线程并发访问资源。

dispatch_semaphore_t 应用

    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    //  创建信号量 ,允许通过最大并发数为 1
    dispatch_semaphore_t sem = dispatch_semaphore_create(1); 
 //任务1
     dispatch_async(queue, ^{
         //  等待
         dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); 
         NSLog(@"执行任务1");
         NSLog(@"任务1完成");
        // 发信号
         dispatch_semaphore_signal(sem);
     });
 
     //任务2
     dispatch_async(queue, ^{
        // 等待
         dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); 
         sleep(2);
         NSLog(@"执行任务2");
         NSLog(@"任务2完成");
        // 发信号
         dispatch_semaphore_signal(sem); 
     });

输出👇

dispatch_semaphore_wait 同步

   dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    dispatch_semaphore_t sem = dispatch_semaphore_create(0);
   
    //任务1
     dispatch_async(queue, ^{
         dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER); // 等待
         NSLog(@"执行任务1");
         NSLog(@"任务1完成");
     });
 
     //任务2
     dispatch_async(queue, ^{
         sleep(2);
         NSLog(@"执行任务2");
         NSLog(@"任务2完成");
         dispatch_semaphore_signal(sem); // 发信号
     });

输出👇

dispatch_semaphore_wait 是如何控制同步的呢?当任务2执行后,为何当发送一个 signal 就能执行任务1 呢?接下来我们带着疑问开始对 dispatch_semaphore_t 底层原理进行探索。


dispatch_semaphore_t 底层原理

进入源码,全局搜索 dispatch_semaphore_wait

intptr_t
dispatch_semaphore_wait(dispatch_semaphore_t dsema, dispatch_time_t timeout)
{
    //  对 dsema_value 进行 -1 操作
    long value = os_atomic_dec2o(dsema, dsema_value, acquire);
    if (likely(value >= 0)) {
        return 0;
    }
    return _dispatch_semaphore_wait_slow(dsema, timeout);
}

情况1:dsema_value 进行 -1value >= 0, 直接 return 0
情况2:dsema_value = 0 ,进行 -1value < 0, 就会执行 _dispatch_semaphore_wait_slow
进入 _dispatch_semaphore_wait_slow

static intptr_t
_dispatch_semaphore_wait_slow(dispatch_semaphore_t dsema,
        dispatch_time_t timeout)
{
    long orig;

    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
    switch (timeout) {
    default:
        if (!_dispatch_sema4_timedwait(&dsema->dsema_sema, timeout)) {
            break;
        }
        // Fall through and try to undo what the fast path did to
        // dsema->dsema_value
    case DISPATCH_TIME_NOW:
        orig = dsema->dsema_value;
        while (orig < 0) {
            if (os_atomic_cmpxchgv2o(dsema, dsema_value, orig, orig + 1,
                    &orig, relaxed)) {
                return _DSEMA4_TIMEOUT();
            }
        }
        // Another thread called semaphore_signal().
        // Fall through and drain the wakeup.
    case DISPATCH_TIME_FOREVER:
        _dispatch_sema4_wait(&dsema->dsema_sema);
        break;
    }
    return 0;
}

timeout = DISPATCH_TIME_NOW ,发出一个超时处理
timeout = DISPATCH_TIME_FOREVER,进入 _dispatch_sema4_wait

void
_dispatch_sema4_wait(_dispatch_sema4_t *sema)
{
    int ret = 0;
    do {
        ret = sem_wait(sema);
    } while (ret == -1 && errno == EINTR);
    DISPATCH_SEMAPHORE_VERIFY_RET(ret);
}

发现在 dispatch_semaphore_wait 底层就是一个do while 循环,当创建信号量的 value < 0 时,就会进入等待状态

我们在全局搜索一下另一个函数 dispatch_semaphore_signal

intptr_t
dispatch_semaphore_signal(dispatch_semaphore_t dsema)
{
    // os_atomic_inc2o 进行 +1 操作
    long value = os_atomic_inc2o(dsema, dsema_value, release);
    if (likely(value > 0)) {
        return 0;
    }
    if (unlikely(value == LONG_MIN)) {
        DISPATCH_CLIENT_CRASH(value,
                "Unbalanced call to dispatch_semaphore_signal()");
    }
    return _dispatch_semaphore_signal_slow(dsema);
}

情况1: dsema_value 进行 +1value > 0, 直接 return 0
情况2: dsema_value 进行 +1value == LONG_MIN ,抛出 crash
情况3: dsema_value 进行 +1value <0 && value != LONG_MIN进入 _dispatch_semaphore_signal_slow

ISPATCH_NOINLINE
intptr_t
_dispatch_semaphore_signal_slow(dispatch_semaphore_t dsema)
{
            // 创建
    _dispatch_sema4_create(&dsema->dsema_sema, _DSEMA4_POLICY_FIFO);
          // 循环+1,直到能正常执行
    _dispatch_sema4_signal(&dsema->dsema_sema, 1);
    return 1;
}

_dispatch_semaphore_signal_slow 创建新信号量,循环+1,直到能正常执行.

小结

  • dispatch_semaphore_wait 将信号量的值 -1
  • 如果信号的值 <0,则进入等待状态,否则继续执行。
  • dispatch_semaphore_signal 将信号量的值 +1, 如果信号的值 >0,唤起一个等待中的线程,否则就创建新信号量,循环+1,直到能正常执行

dispatch_group_t

dispatch_group_t 应用

      //创建调度组
    dispatch_group_t group = dispatch_group_create();
    dispatch_queue_t queue = dispatch_get_global_queue(0, 0);
    
    // 进组任务
    dispatch_group_async(group, queue, ^{
        NSLog(@"组任务1");
    });
//    // 方式1
//    dispatch_group_async(group, queue, ^{
//        NSLog(@"组任务2");
//    });
    // 方式2:
    // 进组
    dispatch_group_enter(group);
    dispatch_async(queue, ^{
        NSLog(@"组任务2");
    });
    // 出组
    dispatch_group_leave(group);
    
    //进组任务执行完毕通知
    dispatch_group_notify(group, dispatch_get_main_queue(), ^{
        NSLog(@"组任务1,2 执行完毕通知");
    });

打印输出👇

dispatch_group_t 底层探索

先从创建组开始,全局搜索 dispatch_group_create

dispatch_group_t
dispatch_group_create(void)
{
    return _dispatch_group_create_with_count(0);
}
// 传入 0  进入 _dispatch_group_create_with_count
static inli ne dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
    dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
            sizeof(struct dispatch_group_s));
    dg->do_next = DISPATCH_OBJECT_LISTLESS;
    dg->do_targetq = _dispatch_get_default_queue(false);
    if (n) {
        // 把 n 存进去
        os_atomic_store2o(dg, dg_bits,
                (uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
        os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
    }
    return dg;
}

可知 dispatch_group_create 就是创建了一个信号标记为 0
再全局搜索一下 dispatch_group_enter

void
dispatch_group_enter(dispatch_group_t dg)
{
    // The value is decremented on a 32bits wide atomic so that the carry
    // for the 0 -> -1 transition is not propagated to the upper 32bits.
    uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
            DISPATCH_GROUP_VALUE_INTERVAL, acquire);
    uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
    if (unlikely(old_value == 0)) {
        _dispatch_retain(dg); // <rdar://problem/22318411>
    }
    if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
        DISPATCH_CLIENT_CRASH(old_bits,
                "Too many nested calls to dispatch_group_enter()");
    }
}

根据注解可知 dispatch_group_entervalue值 从 0 -> -1
全局搜索 dispatch_group_leave

void
dispatch_group_leave(dispatch_group_t dg)
{
    // The value is incremented on a 64bits wide atomic so that the carry for
    // the -1 -> 0 transition increments the generation atomically.
    uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
            DISPATCH_GROUP_VALUE_INTERVAL, release);
    uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);

    if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
        old_state += DISPATCH_GROUP_VALUE_INTERVAL;
        do {
            new_state = old_state;
            if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
                new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            } else {
                // If the group was entered again since the atomic_add above,
                // we can't clear the waiters bit anymore as we don't know for
                // which generation the waiters are for
                new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
            }
            if (old_state == new_state) break;
        } while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
                old_state, new_state, &old_state, relaxed)));
        return _dispatch_group_wake(dg, old_state, true);
    }

    if (unlikely(old_value == 0)) {
        DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
                "Unbalanced call to dispatch_group_leave()");
    }
}

根据注解可知 dispatch_group_leavevalue值 从 -1 -> 0
old_state = -1 , old_value =-1 & DISPATCH_GROUP_VALUE_MASK
=> old_value = DISPATCH_GROUP_VALUE_MASK
=> if (old_state == new_state) break;
=> old_state = 0
=> _dispatch_group_wake

再来看一下 _dispatch_group_notify

static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dsn)
{
    uint64_t old_state, new_state;
    dispatch_continuation_t prev;

    dsn->dc_data = dq;
    _dispatch_retain(dq);

    prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
    os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
    if (os_mpsc_push_was_empty(prev)) {
        os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
            new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
            if ((uint32_t)old_state == 0) {
                os_atomic_rmw_loop_give_up({
                    return _dispatch_group_wake(dg, new_state, false);
                });
            }
        });
    }
}

从源码可知:
=> if old_state == 0
=> _dispatch_group_wake
只要状态 为 0 ,就会唤起组任务。
而在 dispatch_group_leave 也会唤起组任务
进入 _dispatch_group_wake 看一下,里面应该会有 callout 处理

static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
    uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>

    if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
        dispatch_continuation_t dc, next_dc, tail;

        // Snapshot before anything is notified/woken <rdar://problem/8554546>
        dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
        do {
            dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
            next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
            _dispatch_continuation_async(dsn_queue, dc,
                    _dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
            _dispatch_release(dsn_queue);
        } while ((dc = next_dc));

        refs++;
    }

    if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
        _dispatch_wake_by_address(&dg->dg_gen);
    }

    if (refs) _dispatch_release_n(dg, refs);

进入 _dispatch_continuation_async

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

来到了 dx_push 就完全明白了,之后就是 callout 执行 block()
->dx_push
-> dq_push
-> _dispatch_root_queue_push
-> _dispatch_client_callout

小结:
dispatch_group_create => 创建了一个信号标识为 0
dispatch_group_enter => 信号标识从 0 -> -1
dispatch_group_leave => 信号标识从 -1 -> 0,如果标识为0
唤起 _dispatch_group_wake
_dispatch_group_notify 如果标识为 0
唤起 _dispatch_group_wake

这里的精髓之处就是 dispatch_group_leave_dispatch_group_notify
调用了 _dispatch_group_notify
一般来说只要在 _dispatch_group_notify 调用就可以了。
因为考虑到是异步操作,任务执行的时间会很长。在任务执行完毕后调用dispatch_group_leave 时,又做了一次 callout

dispatch_group_async

在上面应用的之后已知 dispatch_group_async 做了enterleave 的封装,我们在底层源码中验证一下
全局搜索 dispatch_group_async

void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_block_t db)
{
    dispatch_continuation_t dc = _dispatch_continuation_alloc();
    uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
    dispatch_qos_t qos;

    qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
    _dispatch_continuation_group_async(dg, dq, dc, qos);
}

进入 _dispatch_continuation_group_async

static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
        dispatch_continuation_t dc, dispatch_qos_t qos)
{
    dispatch_group_enter(dg);
    dc->dc_data = dg;
    _dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}

在这里做了 dispatch_group_enter 操作
继续往下寻找到 dispatch_group_leave
进入 _dispatch_continuation_async

static inline void
_dispatch_continuation_async(dispatch_queue_class_t dqu,
        dispatch_continuation_t dc, dispatch_qos_t qos, uintptr_t dc_flags)
{
#if DISPATCH_INTROSPECTION
    if (!(dc_flags & DC_FLAG_NO_INTROSPECTION)) {
        _dispatch_trace_item_push(dqu, dc);
    }
#else
    (void)dc_flags;
#endif
    return dx_push(dqu._dq, dc, qos);
}

又看到 dx_push,继续探索来到 _dispatch_continuation_with_group_invoke

DISPATCH_ALWAYS_INLINE
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
    struct dispatch_object_s *dou = dc->dc_data;
    unsigned long type = dx_type(dou);
    if (type == DISPATCH_GROUP_TYPE) {
        _dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
        _dispatch_trace_item_complete(dc);
        dispatch_group_leave((dispatch_group_t)dou);
    } else {
        DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
    }
}

源码得知:
-> if (type == DISPATCH_GROUP_TYPE)
->callout
-> dispatch_group_leave


dispatch_Source_t 应用

dispatch_Source 里的 DISPATCH_SOURCE_TYPE_TIMER 定义了一个计时器的功能
核心代码

- (void) createTimer {
    dispatch_queue_t queue = dispatch_queue_create("timeQueue", DISPATCH_QUEUE_CONCURRENT);
    // 创建源
    self.t_source =  dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER,0,0, queue);
    dispatch_time_t start = DISPATCH_TIME_NOW; //dispatch_walltime(NULL,0);
    dispatch_source_set_timer(self.t_source,start,1.0*NSEC_PER_SEC,0);
    __weak typeof(self) weakSelf = self;
    // 设置源数据回调
    dispatch_source_set_event_handler(self.t_source, ^{
        __strong typeof(self) strongSelf = weakSelf;
        strongSelf.seconds++;
        dispatch_async(dispatch_get_main_queue(), ^{
            // 更新 UI
            strongSelf.secondLabel.text = [NSString stringWithFormat:@"%.f",strongSelf.seconds];
        });
    });
}
// 重置
- (void) stop {
    if(self.t_source) {
        if(_isSuspend){ // 已挂起
            NSLog(@">>>>>> 已挂起,无法继续挂起");
            return;
        }
        dispatch_suspend(self.t_source); // 挂起
        _isSuspend = true;
        [_sbutton setEnabled:YES];
        [_sbutton setTitle:@"继续" forState:UIControlStateNormal];
    }
}

dispatch_Source_t 作为计时器, 时间准确可以使用子线程,解决定时间跑在主线程上卡UI问题,且 CPU 负荷⾮常⼩,不占⽤资源。

使用注意点:

  • dispatch_source_set_event_handler 里要避免循环引用
  • dispatch_source_tsuspendresume要依次进行,不然会crash,我在这里做了按钮状态的判断
  • resume的状态下,才能执行cancel,不然也会 crash
    最后附上 源码地址

总结

今天主要分享了

  • barrier 函数的原理。
    简单理解: 就是一个死循环去唤醒队列里在栅栏函数前面的任务,当任务都唤醒完毕后再去执行栅栏函数里的任务, 之后再删除栅栏以免影响后面任务的执行。
  • dispatch_semaphore_t底层原理。
    dispatch_semaphore_wait 将信号量的值 -1如果信号的值 <0,则进入等待状态,否则继续执行。
    dispatch_semaphore_signal将信号量的值 +1, 如果信号的值 >0,唤起一个等待中的线程
  • dispatch_group_t 底层原理
    • dispatch_group_create => state = 0
    • dispatch_group_enter => state => 0 -> -1
    • dispatch_group_leave => state=> -1 -> 0,如果标识为0
      唤起 _dispatch_group_wake => callout
    • _dispatch_group_notify 如果标识为 0
      唤起 _dispatch_group_wake => callout
  • dispatch_Source_t 的应用和一些坑点。

知识点补充

可变数组线程安全

问:可变数组线程安全吗?
答:不安全
为什么不安全呢?

运行👇

- (void)demo4{
    // 可变数组线程安全?
    dispatch_queue_t concurrentQueue = dispatch_queue_create("cooci", DISPATCH_QUEUE_CONCURRENT);
    // 多线程 操作marray
    for (int i = 0; i<1000; i++) {
        dispatch_async(concurrentQueue, ^{
            
            NSString *name = [NSString stringWithFormat:@"%d", i];
            [self.mArray addObject:name];
         });
    }
}

多线程同时对 可变数据 self.mArray 进行操作,最后 crash
为什么会 crash 了 ? 开始分析
假设 mArray 里已经有 2 个数据 12, 要添加一个新的数据
多线程同时读
线程1 -> 读 mArray(1,2)
线程2 -> 读mArray(1,2)
多线程同时写
线程1 -> mArray3 => 写成功了 => mArray(1,2,3)
线程2 -> mArray(1,2)4
假设线程1已经执行完了 ,但线程2不知道,还在对之前读的数据 mArray(1,2) 进行写入 4 => mArray(1,2,4) ,这时就造成了数据的混乱,因为这时的 mArray(1,2,3) 已经有3个数据了 。

小结:mArray只有一片内存空间,多线程同时对同一片内存空间进行读写操作,不安全。

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容