什么是dispatch_group_t
dispatch_group_t是提交给队列用于异步调用的一组block。可以将队列的任务block进行分组,把队列任务关联到当前的组里面,然后还可以监听已关联的任务是否全部执行完成。dispatch_group_t可以通过dispatch_group_create函数创建:
dispatch_group_t group = dispatch_group_create();
dispatch_group_t的使用
dispatch_group_t的使用有两种方式,一种dispatch_group_enter() 和dispatch_group_leave()配合的方式;另一种是使用dispatch_group_async函数。不管哪种方式,都可以通过dispatch_group_notify函数来监听group里面的任务是否都已经执行完毕,或者可以通过dispatch_group_wait函数等待所有任务执行完毕。dispatch_group_wait和dispatch_group_notify的不同是,dispatch_group_notify可以指定所有任务完成之后执行的队列,它是异步的,不会阻塞当前线程;而dispatch_group_wait是同步的,会阻塞当前线程,但是dispatch_group_wait可以设置超时时间。下面就来看看dispatch_group_t使用示例。
dispatch_group_enter()&dispatch_group_leave()
这两个函数必须成对出现,这很类似信号量。dispatch_group_enter在任务添加之前调用,相当于告诉group要添加任务,而dispatch_group_leave则在任务结束的时候调用。以下是demo示例:
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任务完成,返回主线程");
});
使用dispatch_group_async
这个函数直接实现了dispatch_group_enter和dispatch_group_leave的作用,以下是demo示例:
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_group_t group = dispatch_group_create();
dispatch_group_async(group, conQueue, ^{
NSLog(@"1");
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"2");
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"3");
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任务完成,返回主线程");
});
到这里的时候我们可能发现它跟栅栏dispatch_barrier_async函数很像,可以等某个队列的某几个任务都执行了在执行我们想要的操作。但还是group跟栅栏函数是不一样的,group不会阻塞当前队列,而且一个group不止可以关联一个队列,可以同时关联多个队列,下面我们就来看看group关联多个队列的用法。
可以关联多个队列任务、两种方式一起使用
dispatch_queue_t conQueue = dispatch_queue_create("com.hello-world.djx", DISPATCH_QUEUE_CONCURRENT);
dispatch_queue_t globalQueue = dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_DEFAULT, 0);
dispatch_group_t group = dispatch_group_create();
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"1");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"2");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"3");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"4");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(conQueue, ^{
NSLog(@"5");
dispatch_group_leave(group);
});
dispatch_group_enter(group);
dispatch_async(globalQueue, ^{
NSLog(@"6");
dispatch_group_leave(group);
});
dispatch_group_async(group, conQueue, ^{
NSLog(@"7");
});
dispatch_group_async(group, globalQueue, ^{
NSLog(@"8");
});
dispatch_group_notify(group, dispatch_get_main_queue(), ^{
NSLog(@"任务完成,返回主线程");
});
NSLog(@"开始等待");
dispatch_time_t time = dispatch_walltime(NULL, 5* NSEC_PER_SEC);
dispatch_group_wait(group, time);
NSLog(@"等待结束")
从demo中可以看出,group可以关联多个队列conQueque和globalQueue,而且group的两种用法dispatch_group_enter()&dispatch_group_leave()和dispatch_group_async是可以一起使用的,而且dispatch_group_notify和dispatch_group_wait并不是互斥的,他们也一样可以同时使用。这个给了我们做业务有着很多选择。
Group底层原理
这里先提出几个问题:为什么dispatch_group_enter和dispatch_group_leave要成对出现?它们是怎么关联的?以及dispatch_group_async为什么不需要enter和leave?dispatch_group_notify是如何知道所有任务都执行完成了的?
接下来通过源码分析来解释这些问题。
dispatch_group_create源码解析
dispatch_group_t
dispatch_group_create(void)
{
return _dispatch_group_create_with_count(0);
}
static inline dispatch_group_t
_dispatch_group_create_with_count(uint32_t n)
{
dispatch_group_t dg = _dispatch_object_alloc(DISPATCH_VTABLE(group),
sizeof(struct dispatch_group_s));
dg->do_next = DISPATCH_OBJECT_LISTLESS;
dg->do_targetq = _dispatch_get_default_queue(false);
if (n) {
os_atomic_store2o(dg, dg_bits,
(uint32_t)-n * DISPATCH_GROUP_VALUE_INTERVAL, relaxed);
os_atomic_store2o(dg, do_ref_cnt, 1, relaxed); // <rdar://22318411>
}
return dg;
}
DISPATCH_VTABLE能够获取group的类信息。由于初始值n为0,所以这里的os_atomic_store2o不会被调用,所以这会dg_bits为0。dg_bits用于记录enter和leave的信息。
dispatch_group_enter源码解析
dispatch_group_enter函数:
void
dispatch_group_enter(dispatch_group_t dg)
{
// The value is decremented on a 32bits wide atomic so that the carry
// for the 0 -> -1 transition is not propagated to the upper 32bits.
uint32_t old_bits = os_atomic_sub_orig2o(dg, dg_bits,
DISPATCH_GROUP_VALUE_INTERVAL, acquire);
uint32_t old_value = old_bits & DISPATCH_GROUP_VALUE_MASK;
if (unlikely(old_value == 0)) {
_dispatch_retain(dg); // <rdar://problem/22318411>
}
if (unlikely(old_value == DISPATCH_GROUP_VALUE_MAX)) {
DISPATCH_CLIENT_CRASH(old_bits,
"Too many nested calls to dispatch_group_enter()");
}
}
os_atomic_sub_orig2o调用流程:os_atomic_sub_orig2o
->os_atomic_sub_orig
->_os_atomic_c11_op_orig((p), (v), m, sub, -)
->atomic_fetch_sub_explicit
在enter函数里,首先会对dg->dg_bits进行减一的原子操作,用于统计enter次数。有前面创建group可知,dg->dg_bits初始值应该是为0的,所以减1之后正常情况下不会等于0,所以这里懒得分析_dispatch_retain函数了。
dispatch_group_leave源码解析
dispatch_group_leave函数:
void
dispatch_group_leave(dispatch_group_t dg)
{
// The value is incremented on a 64bits wide atomic so that the carry for
// the -1 -> 0 transition increments the generation atomically.
uint64_t new_state, old_state = os_atomic_add_orig2o(dg, dg_state,
DISPATCH_GROUP_VALUE_INTERVAL, release);
uint32_t old_value = (uint32_t)(old_state & DISPATCH_GROUP_VALUE_MASK);
if (unlikely(old_value == DISPATCH_GROUP_VALUE_1)) {
old_state += DISPATCH_GROUP_VALUE_INTERVAL;
do {
new_state = old_state;
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
new_state &= ~DISPATCH_GROUP_HAS_WAITERS;
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
} else {
// If the group was entered again since the atomic_add above,
// we can't clear the waiters bit anymore as we don't know for
// which generation the waiters are for
new_state &= ~DISPATCH_GROUP_HAS_NOTIFS;
}
if (old_state == new_state) break;
} while (unlikely(!os_atomic_cmpxchgv2o(dg, dg_state,
old_state, new_state, &old_state, relaxed)));
return _dispatch_group_wake(dg, old_state, true);
}
if (unlikely(old_value == 0)) {
DISPATCH_CLIENT_CRASH((uintptr_t)old_value,
"Unbalanced call to dispatch_group_leave()");
}
}
os_atomic_add_orig2o调用流程: os_atomic_add_orig2o
->os_atomic_add_orig
->_os_atomic_c11_op_orig
->_os_atomic_c11_op_orig
->atomic_fetch_add_explicit
实际上这里是实际上就是对dg->dg_state的原子加1操作,并返回旧值old_value。用于统计leave次数。dg_state操作完成之后会就会判断enter和leave是否已经成对,如果是就会调用_dispatch_group_wake唤醒group(dispatch_group_wait),同时去异步执行notify任务。如果old_value==0,说明leave次数调用过多,会导致崩溃。
static void
_dispatch_group_wake(dispatch_group_t dg, uint64_t dg_state, bool needs_release)
{
uint16_t refs = needs_release ? 1 : 0; // <rdar://problem/22318411>
if (dg_state & DISPATCH_GROUP_HAS_NOTIFS) {
dispatch_continuation_t dc, next_dc, tail;
// Snapshot before anything is notified/woken <rdar://problem/8554546>
dc = os_mpsc_capture_snapshot(os_mpsc(dg, dg_notify), &tail);
do {
dispatch_queue_t dsn_queue = (dispatch_queue_t)dc->dc_data;
next_dc = os_mpsc_pop_snapshot_head(dc, tail, do_next);
_dispatch_continuation_async(dsn_queue, dc,
_dispatch_qos_from_pp(dc->dc_priority), dc->dc_flags);
_dispatch_release(dsn_queue);
} while ((dc = next_dc));
refs++;
}
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
if (refs) _dispatch_release_n(dg, refs);
}
_dispatch_group_wake函数中,会循环遍历当前group里是否有notify任务,如果有则交给_dispatch_continuation_async把block提交到队列中异步调用这个block,同时会判断有没有正在等待的group任务:
if (dg_state & DISPATCH_GROUP_HAS_WAITERS) {
_dispatch_wake_by_address(&dg->dg_gen);
}
dispatch_group_notify源码分析
void
dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dsn = _dispatch_continuation_alloc();
_dispatch_continuation_init(dsn, dq, db, 0, DC_FLAG_CONSUME);
_dispatch_group_notify(dg, dq, dsn);
}
static inline void
_dispatch_group_notify(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dsn)
{
uint64_t old_state, new_state;
dispatch_continuation_t prev;
dsn->dc_data = dq;
_dispatch_retain(dq);
prev = os_mpsc_push_update_tail(os_mpsc(dg, dg_notify), dsn, do_next);
if (os_mpsc_push_was_empty(prev)) _dispatch_retain(dg);
os_mpsc_push_update_prev(os_mpsc(dg, dg_notify), prev, dsn, do_next);
if (os_mpsc_push_was_empty(prev)) {
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, release, {
new_state = old_state | DISPATCH_GROUP_HAS_NOTIFS;
if ((uint32_t)old_state == 0) {
os_atomic_rmw_loop_give_up({
return _dispatch_group_wake(dg, new_state, false);
});
}
});
}
}
在这里会首先把当前函数的任务(block)标记位notify,因为一会所有任务执行完之后要调用。会循环(os_atomic_rmw_loop2o)监听状态,状态满足时old_state == 0执行_dispatch_group_wake去执行notify的block和唤醒group的wait。_dispatch_group_wake上面已经有分析过了。
dispatch_group_wait源码解析
dispatch_group_wait函数阻塞当前线程:
long
dispatch_group_wait(dispatch_group_t dg, dispatch_time_t timeout)
{
uint64_t old_state, new_state;
os_atomic_rmw_loop2o(dg, dg_state, old_state, new_state, relaxed, {
if ((old_state & DISPATCH_GROUP_VALUE_MASK) == 0) {
os_atomic_rmw_loop_give_up_with_fence(acquire, return 0);
}
if (unlikely(timeout == 0)) {
os_atomic_rmw_loop_give_up(return _DSEMA4_TIMEOUT());
}
new_state = old_state | DISPATCH_GROUP_HAS_WAITERS;
if (unlikely(old_state & DISPATCH_GROUP_HAS_WAITERS)) {
os_atomic_rmw_loop_give_up(break);
}
});
return _dispatch_group_wait_slow(dg, _dg_state_gen(new_state), timeout);
}
static long
_dispatch_group_wait_slow(dispatch_group_t dg, uint32_t gen,
dispatch_time_t timeout)
{
for (;;) {
int rc = _dispatch_wait_on_address(&dg->dg_gen, gen, timeout, 0);
if (likely(gen != os_atomic_load2o(dg, dg_gen, acquire))) {
return 0;
}
if (rc == ETIMEDOUT) {
return _DSEMA4_TIMEOUT();
}
}
}
dispatch_group_wait等待的原理是它开启了一个循环,检查group任务相关的状态,如国状态满足就返回0,执行dispatch_group_wait后面的代码;如果超时就返回超时提醒,也会继续执行后面的代码。
dispatch_group_async源码解析
dispatch_group_async之所以能等同于dispatch_group_enter()和dispatch_group_leave()的操作,实际上它内部实现也是依赖这两个方法实现的:
void
dispatch_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_block_t db)
{
dispatch_continuation_t dc = _dispatch_continuation_alloc();
uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC;
dispatch_qos_t qos;
qos = _dispatch_continuation_init(dc, dq, db, 0, dc_flags);
_dispatch_continuation_group_async(dg, dq, dc, qos);
}
static inline void
_dispatch_continuation_group_async(dispatch_group_t dg, dispatch_queue_t dq,
dispatch_continuation_t dc, dispatch_qos_t qos)
{
dispatch_group_enter(dg);
dc->dc_data = dg;
_dispatch_continuation_async(dq, dc, qos, dc->dc_flags);
}
这里是先创建一个dc,然后标记为group类型任务uintptr_t dc_flags = DC_FLAG_CONSUME | DC_FLAG_GROUP_ASYNC。然后在把任务添加到队列之前调用dispatch_group_enter(dg),到这里我们其实就可以发现了,dispatch_group_async实现原理实际上就是内部也是依赖dispatch_group_enter和dispatch_group_leave函数来管理的。所以我们也很容易猜到在block任务被调用的时候,也会调用dispatch_group_leave函数来保持平衡的:
static inline void
_dispatch_continuation_with_group_invoke(dispatch_continuation_t dc)
{
struct dispatch_object_s *dou = dc->dc_data;
unsigned long type = dx_type(dou);
if (type == DISPATCH_GROUP_TYPE) {
_dispatch_client_callout(dc->dc_ctxt, dc->dc_func);
_dispatch_trace_item_complete(dc);
dispatch_group_leave((dispatch_group_t)dou);
} else {
DISPATCH_INTERNAL_CRASH(dx_type(dou), "Unexpected object type");
}
}