Golang并发模型

控制并发有三种种经典的方式,一种是通过channel通知实现并发控制 一种是WaitGroup,另外一种就是Context。

1. 使用最基本通过channel通知实现并发控制

无缓冲通道

无缓冲的通道指的是通道的大小为0,也就是说,这种类型的通道在接收前没有能力保存任何值,它要求发送 goroutine 和接收 goroutine 同时准备好,才可以完成发送和接收操作。

从上面无缓冲的通道定义来看,发送 goroutine 和接收 gouroutine 必须是同步的,同时准备后,如果没有同时准备好的话,先执行的操作就会阻塞等待,直到另一个相对应的操作准备好为止。这种无缓冲的通道我们也称之为同步通道。

正式通过无缓冲通道来实现多 goroutine 并发控制

func main() {
    ch := make(chan struct{})
    go func() {
        fmt.Println("do something..")
        time.Sleep(time.Second * 1)
        ch <- struct{}{}
    }()

    <-ch

    fmt.Println("I am finished")
}

当主 goroutine 运行到 <-ch 接受 channel 的值的时候,如果该 channel 中没有数据,就会一直阻塞等待,直到有值。 这样就可以简单实现并发控制

2. 通过sync包中的WaitGroup实现并发控制

sync 包中,提供了 WaitGroup ,它会等待它收集的所有 goroutine 任务全部完成。在WaitGroup里主要有三个方法

  • Add, 可以添加或减少 goroutine的数量
  • Done, 相当于Add(-1)
  • Wait, 执行后会堵塞主线程,直到WaitGroup 里的值减至0

在主 goroutineAdd(delta int) 索要等待goroutine 的数量。在每一个 goroutine 完成后 Done() 表示这一个goroutine 已经完成,当所有的 goroutine 都完成后,在主 goroutineWaitGroup 返回返回。

func main(){
    var wg sync.WaitGroup
    var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
    }
    for _, url := range urls {
        wg.Add(1)
        go func(url string) {
            defer wg.Done()
            http.Get(url)
        }(url)
    }
    wg.Wait()
}

但是在Golang官网中,有这么一句话

  • A WaitGroup must not be copied after first use.

翻译够来过来就是,在 WaitGroup 第一次使用后,不能被拷贝,因为会出现一下问题

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(wg sync.WaitGroup, i int) {
            log.Printf("i:%d", i)
            wg.Done()
        }(wg, i)
    }
    wg.Wait()
    log.Println("exit")
}

运行结果如下

2009/11/10 23:00:00 i:4
2009/11/10 23:00:00 i:0
2009/11/10 23:00:00 i:1
2009/11/10 23:00:00 i:2
2009/11/10 23:00:00 i:3
fatal error: all goroutines are asleep - deadlock!

goroutine 1 [semacquire]:
sync.runtime_Semacquire(0x1040a13c, 0x44bc)
    /usr/local/go/src/runtime/sema.go:47 +0x40
sync.(*WaitGroup).Wait(0x1040a130, 0x121460)
    /usr/local/go/src/sync/waitgroup.go:131 +0x80
main.main()
    /tmp/sandbox894380819/main.go:19 +0x120

它提示我所有的 goroutine 都已经睡眠了,出现了死锁。这是因为 wg 给拷贝传递到了 goroutine 中,导致只有 Add 操作,其实 Done操作是在 wg 的副本执行的。因此 Wait 就死锁了。

  • 改正方法一:
    将匿名函数中 wg 的传入类型改为 *sync.WaitGrou,这样就能引用到正确的WaitGroup了。

  • 改正方法二:
    将匿名函数中的 wg 的传入参数去掉,因为Go支持闭包类型,在匿名函数中可以直接使用外面的 wg 变量

go 中五种引用类型有 slice, channel, function, map, interface

interface是Go语言中最成功的设计之一,空的interface可以被当作“鸭子”类型使用,它使得Go这样的静态语言拥有了一定的动态性,但却又不损失静态语言在类型安全方面拥有的编译时检查的优势。依赖于接口而不是实现,优先使用组合而不是继承,这是程序抽象的基本原则。但是长久以来以C++为代表的“面向对象”语言曲解了这些原则,让人们走入了误区。为什么要将方法和数据绑死?为什么要有多重继承这么变态的设计?面向对象中最强调的应该是对象间的消息传递,却为什么被演绎成了封装继承和多态。面向对象是否实现程序程序抽象的合理途径,又或者是因为它存在我们就认为它合理了。历史原因,中间出现了太多的错误。不管怎么样,Go的interface给我们打开了一扇新的窗。

3. 在Go 1.7 以后引进的强大的Context上下文,实现并发控制

3.1 简介

在一些简单场景下使用 channelWaitGroup 已经足够了,但是当面临一些复杂多变的网络并发场景下 channelWaitGroup 显得有些力不从心了。比如一个网络请求 Request,每个 Request 都需要开启一个 goroutine 做一些事情,这些 goroutine 又可能会开启其他的 goroutine,比如数据库和RPC服务。所以我们需要一种可以跟踪 goroutine 的方案,才可以达到控制他们的目的,这就是Go语言为我们提供的 Context,称之为上下文非常贴切,它就是goroutine 的上下文。它是包括一个程序的运行环境、现场和快照等。每个程序要运行时,都需要知道当前程序的运行状态,通常Go 将这些封装在一个 Context 里,再将它传给要执行的 goroutine

context 包主要是用来处理多个 goroutine 之间共享数据,及多个 goroutine 的管理。

3.2 package context

context 包的核心是 struct Context,接口声明如下:

// A Context carries a deadline, cancelation signal, and request-scoped values
// across API boundaries. Its methods are safe for simultaneous use by multiple
// goroutines.
type Context interface {
    // Done returns a channel that is closed when this `Context` is canceled
    // or times out.
    Done() <-chan struct{}

    // Err indicates why this Context was canceled, after the Done channel
    // is closed.
    Err() error

    // Deadline returns the time when this Context will be canceled, if any.
    Deadline() (deadline time.Time, ok bool)

    // Value returns the value associated with key or nil if none.
    Value(key interface{}) interface{}
}
  • Done() 返回一个只能接受数据的channel类型,当该context关闭或者超时时间到了的时候,该channel就会有一个取消信号
  • Err()Done() 之后,返回context 取消的原因。
  • Deadline() 设置该context cancel的时间点
  • Value() 方法允许 Context 对象携带request作用域的数据,该数据必须是线程安全的。

Context 对象是线程安全的,你可以把一个 Context 对象传递给任意个数的 gorotuine,对它执行 取消 操作时,所有 goroutine 都会接收到取消信号。

一个 Context 不能拥有 Cancel 方法,同时我们也只能 Done channel 接收数据。
背后的原因是一致的:接收取消信号的函数和发送信号的函数通常不是一个。
一个典型的场景是:父操作为子操作操作启动 goroutine,子操作也就不能取消父操作。

3.3 继承 context

context 包提供了一些函数,协助用户从现有的 Context 对象创建新的 Context 对象。
这些 Context 对象形成一棵树:当一个 Context 对象被取消时,继承自它的所有 Context 都会被取消。

Background 是所有 Context 对象树的根,它不能被取消。它的声明如下:

// Background returns an empty Context. It is never canceled, has no deadline,
// and has no values. Background is typically used in main, init, and tests,
// and as the top-level `Context` for incoming requests.
func Background() Context

WithCancelWithTimeout 函数 会返回继承的 Context 对象, 这些对象可以比它们的父 Context 更早地取消

当请求处理函数返回时,与该请求关联的 Context 会被取消。 当使用多个副本发送请求时,可以使用 WithCancel 取消多余的请求。 WithTimeout 在设置对后端服务器请求超时时间时非常有用。 下面是这三个函数的声明:

// WithCancel returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed or cancel is called.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

// A CancelFunc cancels a Context.
type CancelFunc func()

// WithTimeout returns a copy of parent whose Done channel is closed as soon as
// parent.Done is closed, cancel is called, or timeout elapses. The new
// Context's Deadline is the sooner of now+timeout and the parent's deadline, if
// any. If the timer is still running, the cancel function releases its
// resources.
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithValue 函数能够将请求作用域的数据与 Context 对象建立关系。声明如下:

// WithValue returns a copy of parent whose Value method returns val for key.
func WithValue(parent Context, key interface{}, val interface{}) Context

3.4 context例子

当然,想要知道 Context 包是如何工作的,最好的方法是看一个例子。

package main

import (
    "context"
    "fmt"
    "sync"
    "time"
)

type Message struct {
    netId int
    Data  string
}

type ServerConn struct {
    sendCh   chan Message
    handleCh chan Message
    wg       *sync.WaitGroup
    ctx      context.Context
    cancel   context.CancelFunc
    netId    int
}

func main() {

    conn := &ServerConn{
        sendCh:   make(chan Message),
        handleCh: make(chan Message),
        wg:       &sync.WaitGroup{},
        netId:    100,
    }

    conn.ctx, conn.cancel = context.WithCancel(context.WithValue(context.Background(), "key", conn.netId))
    loopers := []func(*ServerConn, *sync.WaitGroup){readLoop, writeLoop, handleLoop}

    for _, looper := range loopers {
        conn.wg.Add(1)
        go looper(conn, conn.wg)
    }

    go func() {
        time.Sleep(time.Second * 3)
        conn.cancel()
    }()

    conn.wg.Wait()

}

func readLoop(c *ServerConn, wg *sync.WaitGroup) {

    netId, _ := c.ctx.Value("key").(int)
    handlerCh := c.handleCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        time.Sleep(time.Second * 1)
        select {
        case <-cDone:
            fmt.Println("readLoop close")
            return
        default:
            handlerCh <- Message{netId, "Hello world"}
        }
    }
}

func handleLoop(c *ServerConn, wg *sync.WaitGroup) {
    handlerCh := c.handleCh
    sendCh := c.sendCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        select {
        case handleData, ok := <-handlerCh:
            if ok {
                handleData.netId++
                handleData.Data = "I am whole world"
                sendCh <- handleData
            }

        case <-cDone:
            fmt.Println("handleLoop close")
            return
        }

    }
}

func writeLoop(c *ServerConn, wg *sync.WaitGroup) {
    sendCh := c.sendCh
    ctx, _ := context.WithCancel(c.ctx)
    cDone := ctx.Done()

    defer wg.Done()

    for {
        select {
        case sendData, ok := <-sendCh:
            if ok {
                fmt.Println(sendData)
            }
        case <-cDone:
            fmt.Println("writeLoop close")
            return
        }
    }
}


在上面的例子中,�模仿了Golang后台程序主要业务流程, 当一个TCP连接到来时通过启动三个goroutine来分别处理收发和处理数据。而这三个goroutine的是并发运行的,通过channelsync.WaitGroupcontext控制数据的处理。

在�每一个循环中产生一个goroutine,每一个goroutine中都传入context,在每个goroutine中通过传入ctx创建一个Context,并且通过select一直监控该Context的运行情况,当在父Context退出的时候,代码中并没有�明显调用子ContextCancel函数,但是分析结果,子Context还是被正确合理的关闭了,这是因为,所有基于这个Context或者衍生的子Context都会收到通知,这时就可以进行清理操作了,最终释放goroutine,这就优雅的解决了goroutine启动后不可控的问题。

下面是运行结果:

Screen Shot 2017-09-17 at 19.29.44.png

3.5 Context 使用原则

  • 不要把Context放在结构体中,要以参数的方式传递
  • Context作为参数的函数方法,应该把Context作为第一个参数,放在第一位。
  • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO
  • ContextValue相关方法应该传递必须的数据,不要什么数据都使用这个传递
  • Context是线程安全的,可以放心的在多个goroutine中传递
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,163评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,301评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,089评论 0 352
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,093评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,110评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,079评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,005评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,840评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,278评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,497评论 2 332
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,667评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,394评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,980评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,628评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,649评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,548评论 2 352

推荐阅读更多精彩内容

  • Goroutines 模型:和其他goroutine在共享的地址空间中并发执行的函数 资源消耗: 初始时非常小的栈...
    大漠狼道阅读 1,285评论 0 8
  • 本文翻译自Sameer Ajmani的文章《Go Concurrency Patterns: Pipelines ...
    大蟒传奇阅读 3,869评论 0 15
  • 今天介绍一下 go语言的并发机制以及它所使用的CSP并发模型 CSP并发模型 CSP模型是上个世纪七十年代提出的,...
    falm阅读 68,463评论 10 80
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,651评论 18 139
  • 青春里没有返程的旅行 我们喜欢说,我喜欢你,好像我一定会喜欢你一样,好像我出生就为了等你一样,好像无论牵挂谁,思念...
    沉阿姣阅读 312评论 0 2