手摸手Go 深入理解sync.Cond

sync.Cond实现了一个条件变量,用于等待一个或一组goroutines满足条件后唤醒的场景。每个Cond关联一个Locker通常是一个*MutexRWMutex`根据需求初始化不同的锁。

基本用法

老规矩正式剖析源码前,先来看看sync.Cond如何使用。比如我们实现一个FIFO的队列

package main

import (
    "fmt"
    "math/rand"
    "os"
    "os/signal"
    "sync"
    "time"
)

type FIFO struct {
    lock  sync.Mutex
    cond  *sync.Cond
    queue []int
}

type Queue interface {
    Pop() int
    Offer(num int) error
}

func (f *FIFO) Offer(num int) error {
    f.lock.Lock()
    defer f.lock.Unlock()
    f.queue = append(f.queue, num)
    f.cond.Broadcast()
    return nil
}
func (f *FIFO) Pop() int {
    f.lock.Lock()
    defer f.lock.Unlock()
    for {
        for len(f.queue) == 0 {
            f.cond.Wait()
        }
        item := f.queue[0]
        f.queue = f.queue[1:]
        return item
    }
}

func main() {
    l := sync.Mutex{}
    fifo := &FIFO{
        lock:  l,
        cond:  sync.NewCond(&l),
        queue: []int{},
    }
    go func() {
        for {
            fifo.Offer(rand.Int())
        }
    }()
    time.Sleep(time.Second)
    go func() {
        for {
            fmt.Println(fmt.Sprintf("goroutine1 pop-->%d", fifo.Pop()))
        }
    }()
    go func() {
        for {
            fmt.Println(fmt.Sprintf("goroutine2 pop-->%d", fifo.Pop()))
        }
    }()

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, os.Interrupt)
    <-ch
}

我们定一个FIFO 队列有OfferPop两个操作,我们起一个gorountine不断向队列投放数据,另外两个gorountine不断取拿数据。

  1. Pop操作会判断如果队列里没有数据len(f.queue) == 0则调用f.cond.Wait()goroutine挂起。
  2. 等到Offer操作投放数据成功,里面调用f.cond.Broadcast()来唤醒所有挂起在这个mutex上的goroutine。当然sync.Cond也提供了一个Signal(),有点儿类似Java中的notify()notifyAll()的意思 主要是唤醒一个和唤醒全部的区别。

总结一下sync.Mutex的大致用法

  1. 首先声明一个mutex,这里sync.Mutex/sync.RWMutex可根据实际情况选用
  2. 调用sync.NewCond(l Locker) *Cond 使用1中的mutex作为入参 注意 这里传入的是指针 为了避免c.L.Lock()c.L.Unlock()调用频繁复制锁 导致死锁
  3. 根据业务条件 满足则调用cond.Wait()挂起goroutine
  4. cond.Broadcast()唤起所有挂起的gorotune 另一个方法cond.Signal()唤醒一个最先挂起的goroutine

需要注意的是cond.wait()的使用需要参照如下模版 具体为啥我们后续分析

    c.L.Lock()
    for !condition() {
        c.Wait()
    }
    ... make use of condition ...
   c.L.Unlock()

源码分析

数据结构

分析具体方法前,我们先来了解下sync.Cond的数据结构。具体源码如下:

type Cond struct {
    noCopy noCopy // Cond使用后不允许拷贝
    // L is held while observing or changing the condition
    L Locker
  //通知列表调用wait()方法的goroutine会被放到notifyList中
    notify  notifyList
    checker copyChecker //检查Cond实例是否被复制
}

noCopy之前讲过 不清楚的可以看下《你真的了解mutex吗》,除此之外,Locker是我们刚刚谈到的mutexcopyChecker是用来检查Cond实例是否被复制的,就有一个方法 :

func (c *copyChecker) check() {
    if uintptr(*c) != uintptr(unsafe.Pointer(c)) &&
        !atomic.CompareAndSwapUintptr((*uintptr)(c), 0, uintptr(unsafe.Pointer(c))) &&
        uintptr(*c) != uintptr(unsafe.Pointer(c)) {
        panic("sync.Cond is copied")
    }
}

大致意思是说,初始type copyChecker uintptr默认为0,当第一次调用check()会将copyChecker自身的地址复制给自己,至于为什么uintptr(*c) != uintptr(unsafe.Pointer(c))会被调用2次,因为期间goroutine可能已经改变copyChecker。二次调用如果不相等,则说明sync.Cond被复制,重新分配了内存地址。

sync.Cond比较有意思的是notifyList

type notifyList struct {
    // wait is the ticket number of the next waiter. It is atomically
    // incremented outside the lock.
    wait uint32 // 等待goroutine操作的数量

    // notify is the ticket number of the next waiter to be notified. It can
    // be read outside the lock, but is only written to with lock held.
    //
    // Both wait & notify can wrap around, and such cases will be correctly
    // handled as long as their "unwrapped" difference is bounded by 2^31.
    // For this not to be the case, we'd need to have 2^31+ goroutines
    // blocked on the same condvar, which is currently not possible.
    notify uint32 // 唤醒goroutine操作的数量

    // List of parked waiters.
    lock mutex
    head *sudog
    tail *sudog
}

包含了3类字段:

  • waitnotify两个无符号整型,分别表示了Wait()操作的次数和goroutine被唤醒的次数,wait应该是恒大于等于notify
  • lock mutex 这个跟sync.Mutex我们分析信号量阻塞队列时semaRoot里的mutex一样,并不是Go提供开发者使用的sync.Mutex,而是系统内部运行时实现的一个简单版本的互斥锁。
  • headtail看名字,我们就能脑补出跟链表很像 没错这里就是维护了阻塞在当前sync.Cond上的goroutine构成的链表

整体来讲sync.Cond大体结构为:

cond architecture

操作方法

Wait()操作

func (c *Cond) Wait() {
  //1. 检查cond是否被拷贝
    c.checker.check()
  //2. notifyList.wait+1
    t := runtime_notifyListAdd(&c.notify)
  //3. 释放锁 让出资源给其他goroutine
    c.L.Unlock()
  //4. 挂起goroutine
    runtime_notifyListWait(&c.notify, t)
  //5. 尝试获得锁
    c.L.Lock()
}

Wait()方法源码很容易看出它的操作大概分了5步:

  1. 调用copyChecker.check()保证sync.Cond不会被拷贝
  2. 每次调用Wait()会将sync.Cond.notifyList.wait属性进行加一操作,这也是它完成FIFO的基石,根据wait来判断`goroutine1等待的顺序
//go:linkname notifyListAdd sync.runtime_notifyListAdd
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}
  1. 调用c.L.Unlock()释放锁,因为当前goroutine即将被gopark,让出锁给其他goroutine避免死锁
  2. 调用runtime_notifyListWait(&c.notify, t)可能稍微复杂一点儿
// notifyListWait waits for a notification. If one has been sent since
// notifyListAdd was called, it returns immediately. Otherwise, it blocks.
//go:linkname notifyListWait sync.runtime_notifyListWait
func notifyListWait(l *notifyList, t uint32) {
    lockWithRank(&l.lock, lockRankNotifyList)

    // 如果已经被唤醒 则立即返回
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
  // 把等待递增序号赋值给s.ticket 为FIFO打基础
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
  // 将当前goroutine插入到notifyList链表中
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
  // 最终调用gopark挂起当前goroutine
    goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
  // goroutine被唤醒后释放sudog
    releaseSudog(s)
}

主要完成两个任务:

  • 将当前goroutine插入到notifyList链表中
  • 调用gopark将当前goroutine挂起
  1. 当其他goroutine调用了SignalBroadcast方法,当前goroutine被唤醒后 再次尝试获得锁

Signal操作

Signal唤醒一个等待时间最长的goroutine,调用时不要求持有锁。

func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

具体实现也不复杂,先判断sync.Cond是否被复制,然后调用runtime_notifyListNotifyOne

//go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne
func notifyListNotifyOne(l *notifyList) {
  // wait==notify 说明没有等待的goroutine了
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }
    lockWithRank(&l.lock, lockRankNotifyList)
    // 锁下二次检查
    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    // 更新下一个需要被唤醒的ticket number
    atomic.Store(&l.notify, t+1)

    // Try to find the g that needs to be notified.
    // If it hasn't made it to the list yet we won't find it,
    // but it won't park itself once it sees the new notify number.
    //
    // This scan looks linear but essentially always stops quickly.
    // Because g's queue separately from taking numbers,
    // there may be minor reorderings in the list, but we
    // expect the g we're looking for to be near the front.
    // The g has others in front of it on the list only to the
    // extent that it lost the race, so the iteration will not
    // be too long. This applies even when the g is missing:
    // it hasn't yet gotten to sleep and has lost the race to
    // the (few) other g's that we find on the list.
  //这里是FIFO实现的核心 其实就是遍历链表 sudog.ticket查找指定需要唤醒的节点
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

主要逻辑:

  1. 判断是否存在等待需要被唤醒的goroutine 没有直接返回
  2. 递增notify属性,因为是根据notifysudog.ticket匹配来查找需要唤醒的goroutine,因为其是递增生成的,故而有了FIFO语义。
  3. 遍历notifyList持有的链表,从head开始依据next指针依次遍历。这个过程是线性的,故而时间复杂度为O(n),不过官方说法这个过程实际比较快This scan looks linear but essentially always stops quickly.

有个小细节:还记得我们Wait()操作中,wait属性原子更新和goroutine插入等待链表是两个单独的步骤,所以存在竞争的情况下,链表中的节点可能会轻微的乱序产生。但是不要担心,因为ticket是原子递增的 所以唤醒顺序不会乱。

Broadcast操作

Broadcast()Singal()区别主要是它可以唤醒全部等待的goroutine,并直接将wait属性的值赋值给notify

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}
// notifyListNotifyAll notifies all entries in the list.
//go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll
func notifyListNotifyAll(l *notifyList) {
    // Fast-path 无等待goroutine直接返回
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    // 直接更新notify=wait
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // 依次调用goready唤醒goroutine
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

逻辑比较简单不再赘述

总结

  1. sync.Cond一旦创建使用 不允许被拷贝,由noCopycopyChecker来限制保护。
  2. Wait()操作先是递增notifyList.wait属性 然后将goroutine封装进sudog,将notifyList.wait赋值给sudog.ticket,然后将sudog插入notifyList链表中
  3. Singal()实际是按照notifyList.notifynotifyList链表中节点的ticket匹配 来确定唤醒的goroutine,因为notifyList.notifynotifyList.wait都是原子递增的,故而有了FIFO的语义
  4. Broadcast()相对简单 就是唤醒全部等待的goroutine
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,047评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,807评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,501评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,839评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,951评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,117评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,188评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,929评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,372评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,679评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,837评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,536评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,168评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,886评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,129评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,665评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,739评论 2 351

推荐阅读更多精彩内容

  • 本文从上下文Context、同步原语与锁、Channel、调度器四个方面介绍Go语言是如何实现并发的。本文绝大部分...
    彦帧阅读 1,559评论 1 3
  • Select select 可见监听 Channel 上的数据流动; select 结构与 switch 的结构类...
    hellomyshadow阅读 201评论 0 0
  • 版本 go version 1.10.1 使用方法 数据结构 noCopy:noCopy对象,拥有一个Lock方法...
    不就是个名字么不要在意阅读 1,115评论 0 1
  • 如果能够将所有内存都分配到栈上无疑性能是最佳的,但不幸的是我们不可避免需要使用堆上分配的内存。我们可以优化使用堆内...
    光华路程序猿阅读 446评论 0 1
  • 推荐指数: 6.0 书籍主旨关键词:特权、焦点、注意力、语言联想、情景联想 观点: 1.统计学现在叫数据分析,社会...
    Jenaral阅读 5,706评论 0 5