go Sync.Cond介绍

是什么

sync.Cond 条件变量用来协调想要访问共享资源的那些 goroutine,当共享资源的状态发生变化的时候,它可以用来通知被互斥锁阻塞的 goroutine。

sync.Cond 基于互斥锁/读写锁,经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。

怎么用

sync.Cond 的结构

// Each Cond has an associated Locker L (often a *Mutex or *RWMutex),
// which must be held when changing the condition and
// when calling the Wait method.
//
// A Cond must not be copied after first use.
type Cond struct {
        noCopy noCopy

        // L is held while observing or changing the condition
        L Locker

        notify  notifyList
        checker copyChecker
}

sync.Cond 的四个方法

// NewCond 创建 Cond 实例时,需要关联一个锁。
func NewCond(l Locker) *Cond{}

// Broadcast 唤醒所有等待条件变量 c 的 goroutine,无需锁保护。
// Broadcast wakes all goroutines waiting on c.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Broadcast(){}

// Signal 唤醒一个协程
// Signal wakes one goroutine waiting on c, if there is any.
//
// It is allowed but not required for the caller to hold c.L
// during the call.
func (c *Cond) Signal(){}

// Wait 等待
// Wait atomically unlocks c.L and suspends execution
// of the calling goroutine. After later resuming execution,
// Wait locks c.L before returning. Unlike in other systems,
// Wait cannot return unless awoken by Broadcast or Signal.
//
// Because c.L is not locked when Wait first resumes, the caller
// typically cannot assume that the condition is true when
// Wait returns. Instead, the caller should Wait in a loop:
//
//    c.L.Lock()
//    for !condition() {
//        c.Wait()
//    }
//    ... make use of condition ...
//    c.L.Unlock()
//
func (c *Cond) Wait(){}

使用demo

package main

import (
    "log"
    "sync"
    "time"
)

var done = false

func read(name string, c *sync.Cond) {
    c.L.Lock()
    for !done {
        c.Wait()
    }
    log.Println(name, "starts reading")
    c.L.Unlock()
}

func write(name string, c *sync.Cond) {
    log.Println(name, "starts writing")
    time.Sleep(time.Second)
    c.L.Lock()
    done = true
    c.L.Unlock()
    log.Println(name, "wakes")
    c.Broadcast()
    //c.Signal()
}

func main() {
    cond := sync.NewCond(&sync.Mutex{})

    go read("reader1", cond)
    go read("reader2", cond)
    go read("reader3", cond)
    write("writer", cond)

    time.Sleep(time.Second * 3)
}

应用场景

经常用在多个 goroutine 等待,一个 goroutine 通知(事件发生)的场景。

比如:有一个协程在异步地接收数据,剩下的多个协程必须等待这个协程接收完数据,才能读取到正确的数据

实际使用

https://github.com/panjf2000/ants.
第三方包goroutine池,ants包中pool.go文件

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w *goWorker) {
    spawnWorker := func() {
        w = p.workerCache.Get().(*goWorker)
        w.run()
    }

    p.lock.Lock()

    w = p.workers.detach()
    if w != nil { // first try to fetch the worker from the queue
        p.lock.Unlock()
    } else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
        // if the worker queue is empty and we don't run out of the pool capacity,
        // then just spawn a new worker goroutine.
        p.lock.Unlock()
        spawnWorker()
    } else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
        if p.options.Nonblocking {
            p.lock.Unlock()
            return
        }
    retry:
        if p.options.MaxBlockingTasks != 0 && p.blockingNum >= p.options.MaxBlockingTasks {
            p.lock.Unlock()
            return
        }
        p.blockingNum++
        p.cond.Wait() // block and wait for an available worker
        p.blockingNum--
        var nw int
        if nw = p.Running(); nw == 0 { // awakened by the scavenger
            p.lock.Unlock()
            if !p.IsClosed() {
                spawnWorker()
            }
            return
        }
        if w = p.workers.detach(); w == nil {
            if nw < capacity {
                p.lock.Unlock()
                spawnWorker()
                return
            }
            goto retry
        }

        p.lock.Unlock()
    }
    return
}

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *goWorker) bool {
    if capacity := p.Cap(); (capacity > 0 && p.Running() > capacity) || p.IsClosed() {
        p.cond.Broadcast()
        return false
    }
    worker.recycleTime = time.Now()
    p.lock.Lock()

    // To avoid memory leaks, add a double check in the lock scope.
    // Issue: https://github.com/panjf2000/ants/issues/113
    if p.IsClosed() {
        p.lock.Unlock()
        return false
    }

    err := p.workers.insert(worker)
    if err != nil {
        p.lock.Unlock()
        return false
    }

    // Notify the invoker stuck in 'retrieveWorker()' of there is an available worker in the worker queue.
    p.cond.Signal()
    p.lock.Unlock()
    return true
}

参考

1、sync.Cond 条件变量
2、一文读懂 Go sync.Cond 设计
3、源码剖析sync.Cond(条件变量的实现机制)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容