浅谈 Golang sync 包的相关使用方法

更多精彩文章:https://deepzz.com

Desc:Go sync 包的使用方法,sync.Mutex,sync.RMutex,sync.Once,sync.Cond,sync.Waitgroup
尽管 Golang 推荐通过 channel 进行通信和同步,但在实际开发中 sync 包用得也非常的多。另外 sync 下还有一个 atomic 包,提供了一些底层的原子操作(这里不做介绍)。本篇文章主要介绍该包下的锁的一些概念及使用方法。

整个包都围绕这 Locker 进行,这是一个 interface:

type Locker interface {
        Lock()
        Unlock()
}

只有两个方法,Lock()Unlock()

另外该包下的对象,在使用过之后,千万不要复制。

有许多同学不理解锁的概念,下面会一一介绍到:

为什么需要锁?

并发的情况下,多个线程或协程同时去修改一个变量,可能会出现如下情况:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var a = 0

    // 启动 100 个协程,需要足够大
    // var lock sync.Mutex
    for i := 0; i < 100; i++ {
        go func(idx int) {
            // lock.Lock()
            // defer lock.Unlock()
            a += 1
            fmt.Printf("goroutine %d, a=%d\n", idx, a)
        }(i)
    }

    // 等待 1s 结束主程序
    // 确保所有协程执行完
    time.Sleep(time.Second)
}

观察打印结果,是否出现 a 的值是相同的情况(未出现则重试或调大协程数),答案:是的。

显然这不是我们想要的结果。出现这种情况的原因是,协程依次执行:从寄存器读取 a 的值 -> 然后做加法运算 -> 最后写会寄存器。试想,此时一个协程取出 a 的值 3,正在做加法运算(还未写回寄存器)。同时另一个协程此时去取,取出了同样的 a 的值 3。最终导致的结果是,两个协程产出的结果相同,a 相当于只增加了 1。

所以,锁的概念就是,我正在处理 a(锁定),你们谁都别和我抢,等我处理完了(解锁),你们再处理。这样就实现了,同时处理 a 的协程只有一个,就实现了同步。

把上面代码里的注释取消掉再试下。

什么是互斥锁 Mutex?

什么是互斥锁?它是锁的一种具体实现,有两个方法:

func (m *Mutex) Lock()
func (m *Mutex) Unlock()

在首次使用后不要复制该互斥锁。对一个未锁定的互斥锁解锁将会产生运行时错误。

一个互斥锁只能同时被一个 goroutine 锁定,其它 goroutine 将阻塞直到互斥锁被解锁(重新争抢对互斥锁的锁定)。如:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    ch := make(chan struct{}, 2)

    var l sync.Mutex
    go func() {
        l.Lock()
        defer l.Unlock()
        fmt.Println("goroutine1: 我会锁定大概 2s")
        time.Sleep(time.Second * 2)
        fmt.Println("goroutine1: 我解锁了,你们去抢吧")
        ch <- struct{}{}
    }()

    go func() {
        fmt.Println("groutine2: 等待解锁")
        l.Lock()
        defer l.Unlock()
        fmt.Println("goroutine2: 哈哈,我锁定了")
        ch <- struct{}{}
    }()

    // 等待 goroutine 执行结束
    for i := 0; i < 2; i++ {
        <-ch
    }
}

注意,平时所说的锁定,其实就是去锁定互斥锁,而不是说去锁定一段代码。也就是说,当代码执行到有锁的地方时,它获取不到互斥锁的锁定,会阻塞在那里,从而达到控制同步的目的。

什么是读写锁 RWMutex?

那么什么是读写锁呢?它是针对读写操作的互斥锁,读写锁与互斥锁最大的不同就是可以分别对 进行锁定。一般用在大量读操作、少量写操作的情况:

func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()

func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()

由于这里需要区分读写锁定,我们这样定义:

  • 读锁定(RLock),对读操作进行锁定
  • 读解锁(RUnlock),对读锁定进行解锁
  • 写锁定(Lock),对写操作进行锁定
  • 写解锁(Unlock),对写锁定进行解锁

在首次使用之后,不要复制该读写锁。不要混用锁定和解锁,如:Lock 和 RUnlock、RLock 和 Unlock。因为对未读锁定的读写锁进行读解锁或对未写锁定的读写锁进行写解锁将会引起运行时错误。

如何理解读写锁呢?

  1. 同时只能有一个 goroutine 能够获得写锁定。
  2. 同时可以有任意多个 gorouinte 获得读锁定。
  3. 同时只能存在写锁定或读锁定(读和写互斥)。

也就是说,当有一个 goroutine 获得写锁定,其它无论是读锁定还是写锁定都将阻塞直到写解锁;当有一个 goroutine 获得读锁定,其它读锁定任然可以继续;当有一个或任意多个读锁定,写锁定将等待所有读锁定解锁之后才能够进行写锁定。所以说这里的读锁定(RLock)目的其实是告诉写锁定:有很多人正在读取数据,你给我站一边去,等它们读(读解锁)完你再来写(写锁定)。

使用例子:

package main

import (
    "fmt"
    "math/rand"
    "sync"
)

var count int
var rw sync.RWMutex

func main() {
    ch := make(chan struct{}, 10)
    for i := 0; i < 5; i++ {
        go read(i, ch)
    }
    for i := 0; i < 5; i++ {
        go write(i, ch)
    }

    for i := 0; i < 10; i++ {
        <-ch
    }
}

func read(n int, ch chan struct{}) {
    rw.RLock()
    fmt.Printf("goroutine %d 进入读操作...\n", n)
    v := count
    fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
    rw.RUnlock()
    ch <- struct{}{}
}

func write(n int, ch chan struct{}) {
    rw.Lock()
    fmt.Printf("goroutine %d 进入写操作...\n", n)
    v := rand.Intn(1000)
    count = v
    fmt.Printf("goroutine %d 写入结束,新值为:%d\n", n, v)
    rw.Unlock()
    ch <- struct{}{}
}

WaitGroup 例子

WaitGroup 用于等待一组 goroutine 结束,用法很简单。它有三个方法:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()

Add 用来添加 goroutine 的个数。Done 执行一次数量减 1。Wait 用来等待结束:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    for i, s := range seconds {
        // 计数加 1
        wg.Add(1)
        go func(i, s int) {
            // 计数减 1
            defer wg.Done()
            fmt.Printf("goroutine%d 结束\n", i)
        }(i, s)
    }
    
    // 等待执行结束
    wg.Wait()
    fmt.Println("所有 goroutine 执行结束")
}

注意,wg.Add() 方法一定要在 goroutine 开始前执行哦。

Cond 条件变量

Cond 实现一个条件变量,即等待或宣布事件发生的 goroutines 的会合点。

type Cond struct {
    noCopy noCopy
  
    // L is held while observing or changing the condition
    L Locker
  
    notify  notifyList
    checker copyChecker
}

它会保存一个通知列表。

func NewCond(l Locker) *Cond
func (c *Cond) Broadcast()
func (c *Cond) Signal()
func (c *Cond) Wait()

Wait 方法、Signal 方法和 Broadcast 方法。它们分别代表了等待通知、单发通知和广播通知的操作。

我们来看一下 Wait 方法:

func (c *Cond) Wait() {
    c.checker.check()
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

它的操作为:加入到通知列表 -> 解锁 L -> 等待通知 -> 锁定 L。其使用方法是:

c.L.Lock()
for !condition() {
    c.Wait()
}
... make use of condition ...
c.L.Unlock()

举个例子:

// Package main provides ...
package main

import (
    "fmt"
    "sync"
    "time"
)

var count int = 4

func main() {
    ch := make(chan struct{}, 5)

    // 新建 cond
    var l sync.Mutex
    cond := sync.NewCond(&l)

    for i := 0; i < 5; i++ {
        go func(i int) {
            // 争抢互斥锁的锁定
            cond.L.Lock()
            defer func() {
                cond.L.Unlock()
                ch <- struct{}{}
            }()

            // 条件是否达成
            for count > i {
                cond.Wait()
                fmt.Printf("收到一个通知 goroutine%d\n", i)
            }
            
            fmt.Printf("goroutine%d 执行结束\n", i)
        }(i)
    }

    // 确保所有 goroutine 启动完成
    time.Sleep(time.Millisecond * 20)
    // 锁定一下,我要改变 count 的值
    fmt.Println("broadcast...")
    cond.L.Lock()
    count -= 1
    cond.Broadcast()
    cond.L.Unlock()

    time.Sleep(time.Second)
    fmt.Println("signal...")
    cond.L.Lock()
    count -= 2
    cond.Signal()
    cond.L.Unlock()

    time.Sleep(time.Second)
    fmt.Println("broadcast...")
    cond.L.Lock()
    count -= 1
    cond.Broadcast()
    cond.L.Unlock()

    for i := 0; i < 5; i++ {
        <-ch
    }
}

Pool 临时对象池

sync.Pool 可以作为临时对象的保存和复用的集合。其结构为:

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})

新键 Pool 需要提供一个 New 方法,目的是当获取不到临时对象时自动创建一个(不会主动加入到 Pool 中),Get 和 Put 方法都很好理解。

深入了解过 Go 的同学应该知道,Go 的重要组成结构为 M、P、G。Pool 实际上会为每一个操作它的 goroutine 相关联的 P 都生成一个本地池。如果从本地池 Get 对象的时候,本地池没有,则会从其它的 P 本地池获取。因此,Pool 的一个特点就是:可以把由其中的对象值产生的存储压力进行分摊。

它有着以下特点:

  • Pool 中的对象在仅有 Pool 有着唯一索引的情况下可能会被自动删除(取决于下一次 GC 执行的时间)。
  • goroutines 协程安全,可以同时被多个协程使用。

GC 的执行一般会使 Pool 中的对象全部移除。

那么 Pool 都适用于什么场景呢?从它的特点来说,适用与无状态的对象的复用,而不适用与如连接池之类的。在 fmt 包中有一个很好的使用池的例子,它维护一个动态大小的临时输出缓冲区。

官方例子:

package main

import (
    "bytes"
    "io"
    "os"
    "sync"
    "time"
)

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}

func timeNow() time.Time {
    return time.Unix(1136214245, 0)
}

func Log(w io.Writer, key, val string) {
    // 获取临时对象,没有的话会自动创建
    b := bufPool.Get().(*bytes.Buffer)
    b.Reset()
    b.WriteString(timeNow().UTC().Format(time.RFC3339))
    b.WriteByte(' ')
    b.WriteString(key)
    b.WriteByte('=')
    b.WriteString(val)
    w.Write(b.Bytes())
    // 将临时对象放回到 Pool 中
    bufPool.Put(b)
}

func main() {
    Log(os.Stdout, "path", "/search?q=flowers")
}

打印结果:
2006-01-02T15:04:05Z path=/search?q=flowers

Once 执行一次

使用 sync.Once 对象可以使得函数多次调用只执行一次。其结构为:

type Once struct {
    m    Mutex
    done uint32
}

func (o *Once) Do(f func())

用 done 来记录执行次数,用 m 来保证保证仅被执行一次。只有一个 Do 方法,调用执行。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    onceBody := func() {
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody)
            done <- true
        }()
    }
    for i := 0; i < 10; i++ {
        <-done
    }
}

# 打印结果
Only once
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容