Goroutine同步

# Goroutine多线程同步

Goroutine是Go语言特有的并发体,是一种轻量级的线程,由go关键字启动。在真实的Go语言的实现中,goroutine和系统线程也不是等价的。尽管两者的区别实际上只是一个量的区别,但正是这个量变引发了Go语言并发编程质的飞跃。

首先,每个系统级线程都会有一个固定大小的栈(一般默认可能是2MB),这个栈主要用来保存函数递归调用时参数和局部变量。固定了栈的大小导致了两个问题:一是对于很多只需要很小的栈空间的线程来说是一个巨大的浪费,二是对于少数需要巨大栈空间的线程来说又面临栈溢出的风险。针对这两个问题的解决方案是:要么降低固定的栈大小,提升空间的利用率;要么增大栈的大小以允许更深的函数递归调用,但这两者是没法同时兼得的。相反,一个Goroutine会以一个很小的栈启动(可能是2KB或4KB),当遇到深度递归导致当前栈空间不足时,Goroutine会根据需要动态地伸缩栈的大小(主流实现中栈的最大值可达到1GB)。因为启动的代价很小,所以我们可以轻易地启动成千上万个Goroutine。

Go的运行时还包含了其自己的调度器,这个调度器使用了一些技术手段,可以在n个操作系统线程上多工调度m个Goroutine。Go调度器的工作和内核的调度是相似的,但是这个调度器只关注单独的Go程序中的Goroutine。Goroutine采用的是半抢占式的协作调度,只有在当前Goroutine发生阻塞时才会导致调度;同时发生在用户态,调度器会根据具体函数只保存必要的寄存器,切换的代价要比系统线程低得多。运行时有一个runtime.GOMAXPROCS变量,用于控制当前运行正常非阻塞Goroutine的系统线程数目。

在main.main函数执行之前所有代码都运行在同一个goroutine,也就是程序的主系统线程中。因此,如果某个init函数内部用go关键字启动了新的goroutine的话,新的goroutine只有在进入main.main函数之后才可能被执行到。

## 基于原子操作的同步

所谓的原子操作就是并发编程中“最小的且不可并行化”的操作。通常,如果多个并发体对同一个共享资源进行的操作是原子的话,那么同一时刻最多只能有一个并发体对该资源进行操作。从线程角度看,在当前线程修改共享资源期间,其它的线程是不能访问该资源的。原子操作对于多线程并发编程模型来说,不会发生有别于单线程的意外情况,共享资源的完整性可以得到保证。一般情况下,原子操作都是通过“互斥”访问来保证的,通常由特殊的CPU指令提供保护。当然,如果仅仅是想模拟下粗粒度的原子操作,我们可以借助于sync.Mutex来实现:

```Go

import (

        "sync"

        )

var total struct {

        sync.Mutex

        value int

}

func worker(wg *sync.WaitGroup) {

        defer wg.Done()

        for i := 0; i <= 100; i++ {

                total.Lock()

                total.value += i

                total.Unlock()

        }}

func main() {

        var wg sync.WaitGroup

        wg.Add(2)

        go worker(&wg)

        go worker(&wg)

        wg.Wait()

        fmt.Println(total.value)}

```

在worker的循环中,为了保证total.value += i的原子性,我们通过sync.Mutex加锁和解锁来保证该语句在同一时刻只被一个线程访问。对于多线程模型的程序而言,进出临界区前后进行加锁和解锁都是必须的。如果没有锁的保护,total的最终值将由于多线程之间的竞争而可能会不正确。

用互斥锁来保护一个数值型的共享资源,麻烦且效率低下。标准库的sync/atomic包对原子操作提供了丰富的支持。我们可以重新实现上面的例子:

```Go

import (

        "sync"

        "sync/atomic")

var total uint64

func worker(wg *sync.WaitGroup) {

        defer wg.Done()

        var i uint64

        for i = 0; i <= 100; i++ {

                atomic.AddUint64(&total, i)

        }}

func main() {

        var wg sync.WaitGroup

        wg.Add(2)

        go worker(&wg)

        go worker(&wg)

        wg.Wait()

}

```

atomic.AddUint64函数调用保证了total的读取、更新和保存是一个原子操作,因此在多线程中访问也是安全的。原子操作配合互斥锁可以实现非常高效的单件模式。互斥锁的代价比普通整数的原子读写高很多,在性能敏感的地方可以增加一个数字型的标志位,通过原子检测标志位状态降低互斥锁的使用次数来提高性能。

### 单例模式

基于sync.Once实现单例模式

```Go

        m    Mutex

        done uint32

}

func (o *Once) Do(f func()) {

        if atomic.LoadUint32(&o.done) == 1 {

                return

        }

        o.m.Lock()

        defer o.m.Unlock()

        if o.done == 0 {

                defer atomic.StoreUint32(&o.done, 1)

                f()

        }

}

```

sync/atomic包对基本的数值类型及复杂对象的读写都提供了原子操作的支持。atomic.Value原子对象提供了Load和Store两个原子方法,分别用于加载和保存数据,返回值和参数都是interface{}类型,因此可以用于任意的自定义复杂类型。

```Go

var config atomic.Value // 保存当前配置信息// 初始化配置信息

config.Store(loadConfig())// 启动一个后台线程, 加载更新后的配置信息

go func() {

        for {

                time.Sleep(time.Second)

                config.Store(loadConfig())

        }}()// 用于处理请求的工作者线程始终采用最新的配置信息

        for i := 0; i < 10; i++ {

                go func() {

                        for r := range requests() {

                                c := config.Load()

                                // ...

                        }

                }()

}

```

## 基于Channel的同步

Channel通信是在Goroutine之间进行同步的主要方法。在无缓存的Channel上的每一次发送操作都有与其对应的接收操作相配对,发送和接收操作通常发生在不同的Goroutine上(在同一个Goroutine上执行2个操作很容易导致死锁)。无缓存的Channel上的发送操作总在对应的接收操作完成前发生.

```Go

var done = make(chan bool)

var msg string

func aGoroutine() {

        msg = "你好, 世界"

        done <- true

}

func main() {

        go aGoroutine()

        <-done

        println(msg)

}

```

对于这个程序虽然goroutine和main没有严格的前后关系,但是由于done channel的返送需要发生在接收之后,这样就能保证主线程不会在go没有执行完成之前结束。可保证打印出“hello, world”。该程序首先对msg进行写入,然后在done管道上发送同步信号,随后从done接收对应的同步信号,最后执行println函数。若在关闭Channel后继续从中接收数据,接收者就会收到该Channel返回的零值。因此在这个例子中,用close(c)关闭管道代替done <- false依然能保证该程序产生相同的行为。但是对已经关闭的channel发送数据会导致程序panic,这里建议不关闭,channel也可以实现不同goroutine之间的处理。

对于带缓冲的Channel,对于Channel的第K个接收完成操作发生在第K+C个发送操作完成之前,其中C是Channel的缓存大小。 如果将C设置为0自然就对应无缓存的Channel,也即使第K个接收完成在第K个发送完成之前。因为无缓存的Channel只能同步发1个,也就简化为前面无缓存Channel的规则:对于从无缓冲Channel进行的接收,发生在对该Channel进行的发送完成之前。我们可以根据控制Channel的缓存大小来控制并发执行的Goroutine的最大数目:

```Go

var work = []func(){

        func() { println("1"); time.Sleep(1 * time.Second) },

        func() { println("2"); time.Sleep(1 * time.Second) },

        func() { println("3"); time.Sleep(1 * time.Second) },

        func() { println("4"); time.Sleep(1 * time.Second) },

        func() { println("5"); time.Sleep(1 * time.Second) },

}

func main() {

        for _, w := range work {

                go func(w func()) {

                        limit <- 1

                        w()

                        <-limit

                }(w)

        }

        select{}

}

```

在循环创建Goroutine过程中,使用了匿名函数并在函数中引用了循环变量w,由于w是引用传递的而非值传递,因此无法保证Goroutine在运行时调用的w与循环创建时的w是同一个值,为了解决这个问题,我们可以利用函数传参的值复制来为每个Goroutine单独复制一份w。

同样的也可以同时10个后台线程分别打印:

```Go

func main() {

        done := make(chan int, 10) // 带 10 个缓存

        // 开N个后台打印线程

        for i := 0; i < cap(done); i++ {

                go func(){

                        fmt.Println("你好, 世界")

                        done <- 1

                }()

        }

        // 等待N个后台线程完成

        for i := 0; i < cap(done); i++ {

                <-done

        }

}

```

对于这种要等待N个线程完成后再进行下一步的同步操作有一个简单的做法,就是使用sync.WaitGroup来等待一组事件:

```Go

func main() {

        var wg sync.WaitGroup

        // 开N个后台打印线程

        for i := 0; i < 10; i++ {

                wg.Add(1)

                go func() {

                        fmt.Println("你好, 世界")

                        wg.Done()

                }()

        }

        // 等待N个后台线程完成

        wg.Wait()

}

```

中wg.Add(1)用于增加等待事件的个数,必须确保在后台线程启动之前执行(如果放到后台线程之中执行则不能保证被正常执行到)。当后台线程完成打印工作之后,调用wg.Done()表示完成一个事件。main函数的wg.Wait()是等待全部的事件完成。

循环创建结束后,在main函数中最后一句select{}是一个空的管道选择语句,该语句会导致main线程阻塞,从而避免程序过早退出。还有for{}、<-make(chan int)等诸多方法可以达到类似的效果。因为main线程被阻塞了,如果需要程序正常退出的话可以通过调用os.Exit(0)实现。

### 生产者消费者模型

并发编程最常见的例子就是生产者消费者模式,该模式主要通过生产和消费的能力来提高程序的整体处理数据的速度,简单来说就是生成数据,然后放到成果队列中,消费者从成果队列中获得数据,这样将一个东西拆分解耦成为前置条件的产生和后置消费的异步行为。当成果队列中没有数据时,消费者就进入饥饿的等待中;而当成果队列中数据已满时,生产者则面临因产品挤压导致CPU被剥夺的下岗问题。

Go语言实现生产者消费者并发很简单:

```Go

// 生产者: 生成 factor 整数倍的序列

func Producer(factor int, out chan<- int) {

        for i := 0; ; i++ {

                out <- i*factor

        }

}// 消费者

func Consumer(in <-chan int) {

        for v := range in {

                fmt.Println(v)

        }

}

func main() {

        ch := make(chan int, 64) // 成果队列

        go Producer(3, ch) // 生成 3 的倍数的序列

        go Producer(5, ch) // 生成 5 的倍数的序列

        go Consumer(ch)    // 消费 生成的队列

        // 运行一定时间后退出

        sig := make(chan os.Signal, 1)

        signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

        fmt.Printf("quit (%v)\n", <-sig)

}

```

我们开启了2个Producer生产流水线,分别用于生成3和5的倍数的序列。然后开启1个Consumer消费者线程,打印获取的结果。我们可以让main函数保存阻塞状态不退出,只有当用户输入Ctrl-C时才真正退出程序.我们这个例子中有2个生产者,并且2个生产者之间并无同步事件可参考,它们是并发的。因此,消费者输出的结果序列的顺序是不确定的,这并没有问题,生产者和消费者依然可以相互配合工作。

## 观察者模式

观察者模式也可以叫做发布订阅模式,在对象间定义一对多的依赖关系,且需要一个对象变化的时候多个对象做出相应的响应则可以使用观察者模式。对于被观察者来说,他并不关心谁观察了自己,只需要发布自己发生了变化就可以了,这也是观察者的好处,不变的对象作为被观察者,变的对象作为观察者,使用注册增加或者减少观察者,但是对于被观察者没有影响。

```Go

// 定义消息类型

type MSG string

type ISubject interface {

  Registry(observer IObserver)        // 注册观察

  Remove(observer IObserver)          // 取消观察

  Notify(msg MSG)                    // 通知观察者

}

type IObserver interface {

  Update(msg MSG)                    // 当观察对象发生变化的时候进行响应

  //GetName() string

}

```

具体实现可以参考如下:

```Go

package main

import "log"

type MSG string //这里简单使用string作为msg实际上可以更加复杂

type ISubject interface {

  Registry(observer IObserver)

  Remove(observer IObserver)

  Notify(msg MSG)

}

type IObserver interface {

  Update(msg MSG)

  GetName() string // getName 不是必须但是为了方便我选择使用map保存队列

}

type Subject struct {

  Observers map[string]IObserver

}

func NewSubject() Subject {

  return Subject{

      Observers: make(map[string]IObserver),

  }

}

func (s *Subject) Registry(observer IObserver) {

  s.Observers[observer.GetName()] = observer

}

func (s *Subject) Remove(observer IObserver) {

  delete(s.Observers, observer.GetName())

}

func (s *Subject) Notify(msg MSG) {

  for _, v := range s.Observers {

      v.Update(msg)

  }

}

type Observer struct {

  Name string

}

func (o *Observer) Update(msg MSG) {

  log.Print(o.GetName(), ":")

  log.Printf(

      "%s", msg)

}

func (o *Observer) GetName() string {

  return o.Name

}

func main() {

  subject := NewSubject()

  o1 := Observer{Name: "observer 1"}

  o2 := Observer{Name: "observer 2"}

  subject.Registry(&o1)

  subject.Registry(&o2)

  subject.Notify("发生什么事啦!")

}

```

这种方式会持有对象,很多时候其实不需要持有对象,可以使用函数作为参数输入进行调用,将Update作为函数的call函数进行调用,当数据发生变化进行call调用。

更加复杂的将通知这件事情作为生产内容,将调用作为消费手段实现异步调用,就可以实现异步调用:

```Go

/**

* @Author:Dijiang

* @Description:

* @Date: Created in 11:25 2022/4/6

* @Modified By: Dijiang

*/

package main

import (

  "fmt"

  "log"

  "os"

  "os/signal"

  "syscall"

)

type MSG string //这里简单使用string作为msg实际上可以更加复杂

type IObserver func()

var msgChan = make(chan IObserver, 5)

type ISubject interface {

  Registry(observer IObserver)

  Remove(observer IObserver)

  Notify(msg MSG)

}

type Subject struct {

  Observers []IObserver

  msgChan  chan func(MSG)

}

func NewSubject() Subject {

  return Subject{

      Observers: make([]IObserver, 0),

  }

}

func (s *Subject) Registry(observer IObserver) {

  s.Observers = append(s.Observers, observer)

}

func (s *Subject) Remove(observer IObserver) {

  //for i := 0; i < len(s.Observers); i++ {

  // if s.Observers[i] == observer {

  //    s.Observers = append(s.Observers[:i], s.Observers[i+1:]...)

  //    return

  // }

  //}

  log.Printf("can't find observer %v", observer)

}

func (s *Subject) Notify(msg MSG) {

  for _, v := range s.Observers {

      msgChan <- v

  }

  //println(len(msgChan))

}

type Observer struct {

  Name string

}

func (o *Observer) GetName() string {

  return o.Name

}

func main() {

  subject := NewSubject()

  o1 := func() {

      println("01:")

  }

  o2 := func() {

      println("02: ")

  }

  subject.Registry(o1)

  subject.Registry(o2)

  go func() {

      var v IObserver

      for {

        select {

        case v = <-msgChan:

            v()

        default:

            continue

        }

      }

  }()

  subject.Notify("发生什么事啦!")

  subject.Notify("发生什么事啦!")

  subject.Notify("发生什么事啦!")

  subject.Notify("发生什么事啦!")

  subject.Notify("发生什么事啦!")

  subject.Notify("发生什么事啦!")

  // 运行一定时间后退出

  sig := make(chan os.Signal, 1)

  signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)

  fmt.Printf("quit (%v)\n", <-sig)

}

```

这样的简单改造可以使得观察者模式更加易用。同时需要注意到很多的mvvm模式都可以说是这种的观察者模式,如vue和依赖定时刷新界面的mvvm模式。(但是个人感觉而言,mvvm难点在于数据劫持和树状依赖的构建而不在于观察者模式)

![](undefined)

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容