一文读懂 Go sync.Cond 设计

Go 语言通过 go 关键字开启 goroutine 让开发者可以轻松地实现并发编程,而并发程序的有效运行,往往离不开 sync 包的保驾护航。目前,sync 包的赋能列表包括: sync.atomic 下的原子操作、sync.Map 并发安全 map、sync.Mutexsync.RWMutex 提供的互斥锁与读写锁、sync.Pool 复用对象池、sync.Once 单例模式、 sync.Waitgroup 的多任务协作模式、sync.Cond 的监视器模式。当然,除了 sync 包,还有封装层面更高的 channelcontext

要想写出合格的 Go 程序,以上的这些并发原语是必须要掌握的。对于大多数 Gopher 而言,sync.Cond 应该是最为陌生,本文将一探究竟。

初识 sync.Cond

sync.Cond 字面意义就是同步条件变量,它实现的是一种监视器(Monitor)模式。

In concurrent programming(also known as parallel programming), a monitor is a synchronization construct that allows threads to have both mutual exclusion and the ability to wait (block) for a certain condition to become false.

对于 Cond 而言,它实现一个条件变量,是 goroutine 间等待和通知的点。条件变量与共享的数据隔离,它可以同时阻塞多个 goroutine,直到另外的 goroutine 更改了条件变量,并通知唤醒阻塞着的一个或多个 goroutine。

初次接触的读者,可能会不太明白,那么下面我们看一下 GopherCon 2018 上《Rethinking Classical Concurrency Patterns》 中的演示代码例子。

type Item = int

type Queue struct {
   items     []Item
   itemAdded sync.Cond
}

func NewQueue() *Queue {
   q := new(Queue)
   q.itemAdded.L = &sync.Mutex{} // 为 Cond 绑定锁
   return q
}

func (q *Queue) Put(item Item) {
   q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   q.items = append(q.items, item)
   q.itemAdded.Signal()        // 当 Queue 中加入数据成功,调用 Singal 发送通知
}

func (q *Queue) GetMany(n int) []Item {
   q.itemAdded.L.Lock()
   defer q.itemAdded.L.Unlock()
   for len(q.items) < n {     // 等待 Queue 中有 n 个数据
      q.itemAdded.Wait()      // 阻塞等待 Singal 发送通知
   }
   items := q.items[:n:n]
   q.items = q.items[n:]
   return items
}

func main() {
   q := NewQueue()

   var wg sync.WaitGroup
   for n := 10; n > 0; n-- {
      wg.Add(1)
      go func(n int) {
         items := q.GetMany(n)
         fmt.Printf("%2d: %2d\n", n, items)
         wg.Done()
      }(n)
   }

   for i := 0; i < 100; i++ {
      q.Put(i)
   }

   wg.Wait()
}

在这个例子中,Queue 是存储数据 Item 的结构体,它通过 Cond 类型的 itemAdded 来控制数据的输入与输出。可以注意到,这里通过 10 个 goroutine 来消费数据,但它们所需的数据量并不相等,我们可以称之为 batch,依次在 1-10 之间。之后,逐步添加 100 个数据至 Queue 中。最后,我们能够看到 10 个 gotoutine 都能被唤醒,得到它想要的数据。

程序运行结果如下

 6: [ 7  8  9 10 11 12]
 5: [50 51 52 53 54]
 9: [14 15 16 17 18 19 20 21 22]
 1: [13]
 2: [33 34]
 4: [35 36 37 38]
 3: [39 40 41]
 7: [ 0  1  2  3  4  5  6]
 8: [42 43 44 45 46 47 48 49]
10: [23 24 25 26 27 28 29 30 31 32]

当然,程序每次运行结果都不会相同,以上输出只是某一种情况。

sync.Cond 实现

$GOPATH/src/sync/cond.go 中,Cond 的结构体定义如下

type Cond struct {
   noCopy noCopy
   L Locker
   notify  notifyList
   checker copyChecker
}

其中,noCopychecker 字段均是为了避免 Cond 在使用过程中被复制,详见小菜刀的 《no copy 机制》 一文。

L 是 Locker 接口,一般该字段的实际对象是 *RWmutex 或者 *Mutex

type Locker interface {
   Lock()
   Unlock()
}

notifyList 记录的是一个基于票号的通知列表,这里初次看注释看不懂没关系,和下文来回连贯着看。

type notifyList struct {
   wait   uint32         // 用于记录下一个等待者 waiter 的票号
   notify uint32         // 用于记录下一个应该被通知的 waiter 的票号
   lock   uintptr        // 内部锁
   head   unsafe.Pointer // 指向等待者 waiter 的队列队头
   tail   unsafe.Pointer // 指向等待者 waiter 的队列队尾
}

其中,headtail 是指向 sudog 结构体的指针,sudog 是代表的处于等待列表的 goroutine,它本身就是双向链表。值得一提的是,在 sudog 中有一个字段 ticket 就是用于给当前 goroutine 记录票号使用的。

Cond 实现的核心模式为票务系统(ticket system),每一个想要来买票的 goroutine (调用Cond.Wait())我们称之为 waiter,票务系统会给每个 waiter 分配一个取票码,等供票方有该取票码的号时,就会唤醒 waiter。卖票的 goroutine 有两种,第一种是调用 Cond.Signal() 的,它会按照票号唤醒一个买票的 waiter (如果有的话),第二种是调用 Cond.Broadcast() 的,它会通知唤醒所有的阻塞 waiter。为了方便读者能够比较轻松地理解票务系统,下面我们给出图解示例。

水印图片.png

在 上文中,我们知道 Cond 字段中 notifyList 结构体是一个记录票号的通知列表。这里将 notifyList 比作排队取票买电影票,当 G1 通过 Wait 来买票时,发现此时并没有票可买,因此他只能阻塞等待有票之后的通知,此时他手上已经取得了专属取票码 0。同样的,G2 和 G3 也同样无票可买,它们分别取到了自己的取票码 1和 2。而 G4 是电影票提供商,它是卖票的,它通过两次 Signal 先后带来了两张票,按照票号顺序依次通知了 G1 和 G2 来取票,并把 notify 更新为了最新的 1。G5 也是买票的,它发现此时已经无票可买了,拿了自己的取票码 3 ,就阻塞等待了。G6 是个大票商,它通过 Broadcast 可以满足所有正在等待的买票者都买到票,此时等待的是 G3 和 G5,因此他直接唤醒了 G3 和 G5,并将 notify 更新到和 wait 值相等。

理解了上述取票系统的运作原理后,我们下面来看 Cond 包下四个实际对外方法函数的实现。

  • NewCond 方法
func NewCond(l Locker) *Cond {
   return &Cond{L: l}
}

用于初始化 Cond 对象,就是初始化控制锁。

  • Cond.Wait 方法
func (c *Cond) Wait() {
   c.checker.check()
   t := runtime_notifyListAdd(&c.notify)
   c.L.Unlock()
   runtime_notifyListWait(&c.notify, t)
   c.L.Lock()
}

runtime_notifyListAdd 的实现在 runtime/sema.go 的 notifyListAdd ,它用于原子性地增加等待者的 waiter 票号,并返回当前 goroutine 应该取的票号值 t 。runtime_notifyListWait 的实现在runtime/sema.go 的 notifyListWait,它会尝试去比较此时 goroutine 的应取票号 tnotify 中记录的当前应该被通知的票号。如果 t 小于当前票号,那么直接能得到返回,否则将会则塞等待,通知取号。

同时,这里需要注意的是,由于在进入 runtime_notifyListWait 时,当前 goroutine 通过 c.L.Unlock() 将锁解了,这就意味着有可能会有多个 goroutine 来让条件发生变化。那么,当前 goroutine 是不能保证在 runtime_notifyListWait 返回后,条件就一定是真的,因此需要循环判断条件。正确的 Wait 使用姿势如下:

//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
  • Cond.Signal 方法
func (c *Cond) Signal() {   c.checker.check()   runtime_notifyListNotifyOne(&c.notify)}

runtime_notifyListNotifyOne 的详细实现在 runtime/sema.go 的 notifyListNotifyOne,它的目的就是通知 waiter 取票。具体操作是:如果在上一次通知取票之后没有新的 waiter 取票者,那么该函数会直接返回。否则,它会将取票号 +1,并通知唤醒等待取票的 waiter。

需要注意的是,调用 Signal 方法时,并不需要持有 c.L 锁。

  • Cond.Broadcast 方法
func (c *Cond) Broadcast() {   c.checker.check()   runtime_notifyListNotifyAll(&c.notify)}

runtime_notifyListNotifyAll 的详细实现在 runtime/sema.go 的 notifyListNotifyAll,它会通知唤醒所有的 waiter,并将 notify 值置为 和 wait 值相等。调用 Broadcast 方法时,也不需要持有 c.L 锁。

讨论

$GOPATH/src/sync/cond.go 下,我们可以发现其代码量非常之少,但它呈现的只是核心逻辑,其实现细节位于 runtime/sema.go 之中,依赖的是 runtime 层的调度原语,对细节感兴趣的读者可以深入学习。

问题来了,为什么在日常开发中,我们很少会使用到 sync.Cond ?

  • 无效唤醒

前文中我们提到,使用 Cond.Wait 正确姿势如下

    c.L.Lock()    for !condition() {        c.Wait()    }    ... make use of condition ...    c.L.Unlock()

以文章开头的例子而言,如果在每次调用 Put 方法时,使用 Broadcast 方法唤醒所有的 waiter,那么很大概率上被唤醒的 waiter 醒来发现条件并不满足,又会重新进入等待。尽管是调用 Signal 方法唤醒指定的 waiter,但是它也不能保证唤醒的 waiter 条件一定满足。因此,在实际的使用中,我们需要尽量保证唤醒操作是有效地,为了做到这点,代码的复杂度难免会增加。

  • 饥饿问题

还是以文章开头例子为例,如果同时有多个 goroutine 执行 GetMany(3) 和 GetMany(3000),执行 GetMany(3) 与执行 GetMany(3000) 的 goroutine 被唤醒的概率是一样的,但是由于 GetMany(3) 只需要 3个数据就能满足条件,那么如果一直存在 GetMany(3) 的 goroutine,执行 GetMany(3000) 的 goroutine 将永远拿不到数据,一直被无效唤醒。

  • 不能响应其他事件

条件变量的意义在于让 goroutine 等待某种条件发生时进入睡眠状态。但是这会让 goroutine 在等待条件时,可能会错过一些需要注意的其他事件。例如,调用 Cond.Wait 的函数中包含了 context 上下文,当 context 传来取消信号时,它并不能像我们期望的一样,获取到取消信号并退出。Cond 的使用,让我们不能同时选择(select)条件和其他事件。

  • 可替代性

通过对 sync.Cond 几个对外方法的分析,我们不难看到,它的使用场景是可以被 channel 所代替的,但是这也会增加代码的复杂性。上文中的例子,可以使用 channel 改写如下。

type Item = inttype waiter struct { n int   c chan []Item}type state struct {   items []Item    wait  []waiter}type Queue struct {  s chan state}func NewQueue() *Queue {   s := make(chan state, 1)    s <- state{}    return &Queue{s}}func (q *Queue) Put(item Item) {   s := <-q.s  s.items = append(s.items, item) for len(s.wait) > 0 {       w := s.wait[0]      if len(s.items) < w.n {         break       }       w.c <- s.items[:w.n:w.n]        s.items = s.items[w.n:]     s.wait = s.wait[1:] }   q.s <- s}func (q *Queue) GetMany(n int) []Item {    s := <-q.s  if len(s.wait) == 0 && len(s.items) >= n {      items := s.items[:n:n]      s.items = s.items[n:]       q.s <- s        return items    }   c := make(chan []Item)  s.wait = append(s.wait, waiter{n, c})   q.s <- s    return <-c}

最后,虽然在上文的讨论中都是列出的 sync.Cond 潜在问题,但是如果开发者能够在使用中考虑到以上的几点问题,对于监视器模型的实现而言,在代码的语义逻辑上,sync.Cond 的使用会比 channel 的模式更易理解和维护。记住一点,通俗易懂的代码模型总是比深奥的炫技要接地气。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,948评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,371评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,490评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,521评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,627评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,842评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,997评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,741评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,203评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,534评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,673评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,339评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,955评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,770评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,000评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,394评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,562评论 2 349

推荐阅读更多精彩内容

  • 锁 Mutex 互斥锁 互斥即不可同时运行。即使用了互斥锁的两个代码片段互相排斥,只有其中一个代码片段执行完成后,...
    Xuenqlve阅读 691评论 0 1
  • 官方包的注释: sync包提供基础的同步原语,sync.Mutext、sync.RWMutex、sync.Wait...
    thepoy阅读 598评论 0 1
  • 本文从上下文Context、同步原语与锁、Channel、调度器四个方面介绍Go语言是如何实现并发的。本文绝大部分...
    彦帧阅读 1,559评论 1 3
  • 版本 go version 1.10.1 使用方法 数据结构 noCopy:noCopy对象,拥有一个Lock方法...
    不就是个名字么不要在意阅读 1,115评论 0 1
  • 16宿命:用概率思维提高你的胜算 以前的我是风险厌恶者,不喜欢去冒险,但是人生放弃了冒险,也就放弃了无数的可能。 ...
    yichen大刀阅读 6,041评论 0 4