go并发编程范式

1. 访问范围约束

通过限制访问约束,减少不必要的同步带来的性能损耗。例如,集中控制channel的写入,对外提供channel的读取,这样本身便提供了对并发安全的支持。

// channel拥有者具有写入权限
chanOwner := func() <-chan int {
    results := make(chan int, 5) //1
    go func() {
        defer close(results)
        for i := 0; i <= 5; i++ {
            results <- i
        }
    }()
    return results
}
// 消费者只具备读取权限
consumer := func(results <-chan int) { //3
    for result := range results {
        fmt.Printf("Received: %d\n", result)
    }
    fmt.Println("Done receiving!")
}

results := chanOwner() //2
consumer(results)

// 对于共享数据的不同数据段的并发访问同样是安全的
printData := func(wg *sync.WaitGroup, data []byte) {
    defer wg.Done()

    var buff bytes.Buffer
    for _, b := range data {
        fmt.Fprintf(&buff, "%c", b)
    }
    fmt.Println(buff.String())
}

var wg sync.WaitGroup
wg.Add(2)
data := []byte("golang")
go printData(&wg, data[:3]) // 1
go printData(&wg, data[3:]) // 2

wg.Wait()

2. for-select

for-select循环模式如下所示:

for { // 无限循环或遍历
    select {
    // 对通道进行操作
    }
}

常见的几种for-select循环的用法

a. 在通道上发送迭代变量

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:
        return
    case stringStream <- s:   // slice数据循环迭代写入channel
    }
}

b. 无限循环等待停止

// 第一种方式
for {
    select {
    case <-done: 
        return   // 停止返回
        default:
    }
    // 执行非抢占任务
}

// 
for {
    select {
    case <-done:
        return 
        default:    // 将要执行的任务放入default分支中
                // 执行非抢占任务
    }
}

3. goroutine泄露

goroutine几种终止方式:

  • 完成任务,终止
  • 遇到不可恢复的错误无法继续它的任务,终止。
  • 被告知停止当前的任务。

一个常见的goroutine泄露的例子:

doWork := func(strings <-chan string) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(completed)
        for s := range strings {  // 对于channel的访问,将一直被阻塞
            fmt.Println(s)
        }
    }()
    return completed
}

doWork(nil)
// 这里还有其他任务执行
fmt.Println("Done.")

解决goroutine泄露的一种方法,即向子goroutine发送结束信号,通知其退出。

doWork := func(done <-chan interface{}, strings <-chan string) <-chan interface{} { //1
    terminated := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited.")
        defer close(terminated)
        for {   // for-select 处理一手终止信号
            select {
            case s := <-strings:  // 该case分支将一直被阻塞
                // Do something interesting
                fmt.Println(s)
            case <-done: //2 :接收到结束信号,退出当前goroutine
                return
            }
        }
    }()
    return terminated
}

done := make(chan interface{})
terminated := doWork(done, nil)

go func() { //3
    // Cancel the operation after 1 second.
        // 1s后close channel,向子goroutine广播结束信号
    time.Sleep(1 * time.Second)
    fmt.Println("Canceling doWork goroutine...")
    close(done)
}()

// 一直阻塞,直到子goroutine结束
<-terminated //4
fmt.Println("Done.")

另外一个goroutine泄露的示例:

newRandStream := func() <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.") // 1
        defer close(randStream)
        for {
            randStream <- rand.Int()  // 此处在读取完第三个元素后,将会永久阻塞,导致goroutine泄露
        }
    }()

    return randStream
}

randStream := newRandStream()
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}

针对该goroutine泄露的解决方案:

newRandStream := func(done <-chan interface{}) <-chan int {
    randStream := make(chan int)
    go func() {
        defer fmt.Println("newRandStream closure exited.")
        defer close(randStream)

        for {
            select {
            case randStream <- rand.Int():
            case <-done:   // 结束信号到达,立即结束
                return
            }
        }

    }()

    return randStream
}

done := make(chan interface{})
randStream := newRandStream(done)
fmt.Println("3 random ints:")
for i := 1; i <= 3; i++ {
    fmt.Printf("%d: %d\n", i, <-randStream)
}

close(done)  // close channel,发出通知信号
//模拟正在进行的工作
time.Sleep(1 * time.Second)

防止goroutine泄露遵循的一个原则:如果goroutine负责创建子goroutine,它也必须负责确保它可以停止子goroutine

4. or-channel

or-done-channel:将任意个数的done channel组合成为一个done channel,即N个done channel中任意一个done了,整个组合的done channnel就done了。

// or-channel的一种递归实现
package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    var or func(chs ...<-chan interface{}) <-chan interface{}
    or = func(chs ...<-chan interface{}) <-chan interface{} {

        if len(chs) == 0 {
            return nil
        }

        if len(chs) == 1 {
            return chs[0]
        }

        chsLen := len(chs)
        orDone := make(chan interface{}) // done channel
        go func() {
            defer close(orDone)
            select {
            case <-or(chs[:chsLen/2]...): // 0...chsLen/2-1 channel监听
            case <-or(chs[chsLen/2:]...): // chsLen/2...chsLen-1 channel监听
            }
        }()
        return orDone
    }

    var chs []chan interface{}
    for i := 0; i < 5; i++ {
        chs = append(chs, make(chan interface{}))
    }

    go func(chs ...chan interface{}) {
        time.Sleep(1 * time.Second)
        idx := rand.Intn(5)
        fmt.Printf("close channel %d\n", idx)
        close(chs[idx])
    }(chs...)

    //<-or(chs...)
    <-or(chs[0], chs[1], chs[2], chs[3], chs[4])
    fmt.Println("end test")
}

Tips:
Go的一个优点是能够快速创建,调度和运行goroutine,并且在Go中积极鼓励使用goroutines来正确建模问题。

5. 错误处理

Go避开了流行的错误异常模型,Go认为错误处理非常重要,并且在开发程序时,我们应该像关注算法一样关注错误处理。

// 并发情况下的错误处理,潜在的结果和潜在的错误同时返回
type Result struct { //1
    Error    error
    Response *http.Response
}
checkStatus := func(done <-chan interface{}, urls ...string) <-chan Result { //2

    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            result = Result{Error: err, Response: resp} //3:错误和结果包装在一起
            select {
            case <-done:
                return
            case results <- result: //4
            }
        }
    }()

    return results
}
done := make(chan interface{})
defer close(done)

urls := []string{"https://www.baidu.com", "https://badhost"}
for result := range checkStatus(done, urls...) {  // 检查错误和结果
    if result.Error != nil { //5
        fmt.Printf("error: %v", result.Error)
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

6. 管道

利用channel来实现管道的功能:

generator := func(done <-chan interface{}, integers ...int) <-chan int {
    intStream := make(chan int)
    go func() {
        defer close(intStream)
        for _, i := range integers {
            select {
            case <-done:
                return
            case intStream <- i:
            }
        }
    }()
    return intStream
}

multiply := func(done <-chan interface{}, intStream <-chan int, multiplier int) <-chan int {
    multipliedStream := make(chan int)
    go func() {
        defer close(multipliedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case multipliedStream <- i * multiplier:  // 乘法结果塞入管道中
            }
        }
    }()

    return multipliedStream
}

add := func(done <-chan interface{}, intStream <-chan int, additive int) <-chan int {
    addedStream := make(chan int)
    go func() {
        defer close(addedStream)
        for i := range intStream {
            select {
            case <-done:
                return
            case addedStream <- i + additive:  // 加法结果塞入管道中
            }
        }
    }()
    return addedStream
}

done := make(chan interface{})
defer close(done)

intStream := generator(done, 1, 2, 3, 4)  // 生产数据
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2) // 管道传输数据
 
// 通过关闭done channel,随时终止管道数据的传输
for v := range pipeline {
    fmt.Println(v)
}

利用channel实现的一些generator:

// repeat 会重复传输你给它的值,知道关闭done channel
repeat := func(done <-chan interface{}, values ...interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            for _, v := range values {
                select {
                case <-done:
                    return
                case valueStream <- v:
                }
            }
        }
    }()
    return valueStream
}

// take: 从channel中拿取一定数量的数据,然后返回
take := func(done <-chan interface{}, valueStream <-chan interface{}, num int, ) <-chan interface{} {

    takeStream := make(chan interface{})
    go func() {
        defer close(takeStream)
        for i := 0; i < num; i++ {
            select {
            case <-done:
                return
            case takeStream <- <-valueStream:  // 从传入的管道中接收数据
            }
        }
    }()
    return takeStream
}

// 模拟管道两端的生产方和消费方
done := make(chan interface{})
defer close(done)

for num := range take(done, repeat(done, 1), 10) {
    fmt.Printf("%v ", num)
}

一个指定函数规则的generator:

// fn即为函数产生器
repeatFn := func(done <-chan interface{}, fn func() interface{}) <-chan interface{} {

    valueStream := make(chan interface{})
    go func() {
        defer close(valueStream)
        for {
            select {
            case <-done:
                return
            case valueStream <- fn():
            }
        }
    }()
    return valueStream
}

done := make(chan interface{})
defer close(done)
// 随机数生成器,用于产生随机数
rand := func() interface{} {
    return rand.Int()
}

for num := range take(done, repeatFn(done, rand), 10) {
    fmt.Println(num)
}

7. 扇入/扇出

单个goroutine处理管道的输入/输出变成多个goroutine处理管道的输入/输出,称之为扇入/扇出。
扇出:描述启动多个goroutine以处理来自管道的输入过程。
扇入:描述将多个goroutine的处理结果组合到一个通道中。

启用扇入/扇出操作的时机:

  • 模块当前计算结果不依赖于前面的计算结果(可并行)。
  • 整个管道流程的处理时间较长。

相关扇入/扇出的示例可参考:https://www.kancloud.cn/mutouzhang/go/596844

可以创建一个function来同时处理close done channel和数据流接收操作

orDone := func(done, c <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:  // 处理done channel
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:  // 数据接收处理
                case <-done:
                                        // return 
                }
            }
        }
    }()

    return valStream
}

// 从返回的channel中读取数据,可随时中断数据的读取,close done channel
for val := range orDone(done, myChan) {
    // Do something with val
}

8. tee-channel

// or-done channel + bridge-channel
orDone := func(done, c <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{})
    go func() {
        defer close(valStream)
        for {
            select {
            case <-done:  // 处理done channel
                return
            case v, ok := <-c:
                if ok == false {
                    return
                }
                select {
                case valStream <- v:  // 数据接收处理
                case <-done:
                                        // return 
                }
            }
        }
    }()

    return valStream
}

// 从返回的channel中读取数据,可随时中断数据的读取,close done channel
for val := range orDone(done, myChan) {
    // Do something with val
}

// bridge-channel : channel套channel
bridge := func(done <-chan interface{}, chanStream <-chan <-chan interface{}) <-chan interface{} {

    valStream := make(chan interface{}) // 1
    go func() {
        defer close(valStream)
        for { // 2
            var stream <-chan interface{}
            select {
            case maybeStream, ok := <-chanStream:
                if ok == false {
                    return
                }
                stream = maybeStream
            case <-done:
                return
            }
            for val := range orDone(done, stream) { // 3
                select {
                case valStream <- val:
                case <-done:
                }
            }
        }
    }()
    return valStream
}

9. context package

context package提供的一些操作方法如下所示:

var Canceled = errors.New("context canceled")
var Canceled = errors.New("context canceled")

type CancelFunc  // 取消函数
type Context

func Background() Context
func TODO() Context
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
func WithValue(parent Context, key, val interface{}) Context

Context interface定义:

type Context interface {
        // Deadline 返回任务完成时(该 context 被取消)的时间。
        // 如果deadline 未设置,则返回的ok值为false。
        // 连续调用该函数将返回相同的结果。
        Deadline() (deadline time.Time, ok bool)

        // Done 返回任务完成时(该 context 被取消)一个已关闭的通道。
        // 如果该context无法被取消,Done 将返回nil。
        // 连续调用该函数将返回相同的结果。
        //
        // 当cancel被调用时,WithCancel 遍历 Done以执行关闭;
        // 当deadline即将到期时,WithDeadline 遍历 Done以执行关闭;
        // 当timeout时,WithTimeout 遍历 Done以执行关闭。
        //
        // Done 主要被用于 select 语句:
        //
        //  // Stream 使用DoSomething生成值,并将值发送出去
        //  // 直到 DoSomething 返回错误或 ctx.Done 被关闭
        //  func Stream(ctx context.Context, out chan<- Value) error {
        //      for {
        //          v, err := DoSomething(ctx)
        //          if err != nil {
        //              return err
        //          }
        //          select {
        //          case <-ctx.Done():
        //              return ctx.Err()
        //          case out <- v:
        //          }
        //      }
        //  }
        //
        // 查看 https://blog.golang.org/pipelines更多示例以了解如何使用
        // Done通道执行取消操作。
        Done() <-chan struct{}

        // 如果 Done 尚未关闭, Err 返回 nil.
        // 如果 Done 已关闭, Err 返回值不为nil的error以解释为何关闭:
        // 因 context 的关闭导致
        // 或 context 的 deadline 执行导致。
        // 在 Err 返回值不为nil的error之后, 连续调用该函数将返回相同的结果。
        Err() error

        // Value 根据 key 返回与 context 相关的结果,
        // 如果没有与key对应的结果,则返回nil。
        // 连续调用该函数将返回相同的结果。
        //
        // 该方法仅用于传输进程和API边界的请求数据,
        // 不可用于将可选参数传递给函数。
        //
        // 键标识着上Context中的特定值。
        // 在Context中存储值的函数通常在全局变量中分配一个键,
        // 然后使用该键作为context.WithValue和Context.Value的参数。
        // 键可以是系统支持的任何类型;
        // 程序中各包应将键定义为未导出类型以避免冲突。
        //
        // 定义Context键的程序包应该为使用该键存储的值提供类型安全的访问器:
        //
        //  // user包 定义了一个User类型,该类型存储在Context中。
        //  package user
        //
        //  import "context"
        //
        //  // User 类型的值会存储在 Context中。
        //  type User struct {...}
        //
        //  // key是位于包内的非导出类型。
        //  // 这可以防止与其他包中定义的键的冲突。
        //  type key int
        //
        //  // userKey 是user.User类型的值存储在Contexts中的键。
        //  // 它是非导出的; clients use user.NewContext and user.FromContext
        //  // 使用 user.NewContext 和 user.FromContext来替代直接使用键。
        //  var userKey key
        //
        //  // NewContext 返回一个新的含有值 u 的 Context。
        //  func NewContext(ctx context.Context, u *User) context.Context {
        //      return context.WithValue(ctx, userKey, u)
        //  }
        //
        //  // FromContext 返回存储在 ctx中的 User类型的值(如果存在的话)。
        //  func FromContext(ctx context.Context) (*User, bool) {
        //      u, ok := ctx.Value(userKey).(*User)
        //      return u, ok
        //  }
        Value(key interface{}) interface{}

一些终止信号的函数:

// 取消
func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
// deadline
func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)
// 超时
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

其中,
WithCancel返回一个新的Context,它在调用返回的cancel函数时关闭done通道。
WithDeadline返回一个新的Context,当机器的时钟超过给定的最后期限时,它关闭done通道。
WithTimeout返回一个新的Context,它在给定的超时时间后关闭done通道。

具体一些context包的使用示例可参考:https://www.kancloud.cn/mutouzhang/go/596849

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,588评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,456评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,146评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,387评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,481评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,510评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,522评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,296评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,745评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,039评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,202评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,901评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,538评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,165评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,415评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,081评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,085评论 2 352

推荐阅读更多精彩内容