GO阅读-同步编程-channel和select

非原创,搬运工

目录总汇

链接

思考

  • chanel的零值是什么,对其发送数据会怎么样
  • channel是并发安全的吗
  • channel使用不当会引发协程泄漏,请举例

1.Channel的使用场景和基本用法

在GO语言中流行一句很广的谚语

Don’t communicate by sharing memory, share memory by communicating.
--Go Proverbs by Rob Pike
直白来讲就是执行业务处理的协程不要通过共享内存的方式通信,而是要通过channel的通信方式分享数据

  • communicate by sharing memory是传统的并发编程处理方式,即对临界区加锁
  • share memory by communicating是类似CSP模型的方式,通过通信的方式,一个协程g可以把数据的“所有权”交给另一个g

也就是说channel类型可应用于一下场景

  • 数据交流和传递:类似消息中间件的生产者和消费者,传递的数据也可以是信号
  • 提供并发保护:类似锁
  • 任务编排:既然可以传递信号,意味着可以根据信号来让g按一定顺序执行,这就是编排功能

channel的元语定义如下

ChannelType = ( "chan" | "chan" "<-" | "<-" "chan" ) ElementType 

当然这个elem也可以是一个通道

chan<- chan int 
chan (<-chan int)

箭头“<-”遵循最左原则,总是尽量和左边的chan结合,所以需要注意配合括号使用
特别需要注意的是未初始化的chan的零值是nil(nil是chan的零值),可以后续使用make初始化

var c chan int //nil
c=make(chan int)

chan还可用于for-range

    for v := range ch {
        fmt.Println(v)
    }

那么忽略读取的值,只是清空chan就是

   for range ch {
    }

2.源码走读

2.1.创建通道makechan

后续就以该demo代码进行逻辑梳理
我们分别创建了带缓存和不带缓存的chan

func main() {
    c := make(chan int)
    c1 := make(chan int, 5)
    close(c)
    close(c1)
    fmt.Println("vim-go")
}

对变量c定义的那一行打断点

dlv debug main.go
b main.go:6
c
disass

channel汇编代码

通过查看汇编代码我们发现实际上是调用runtime.makechan方法(在之前学习make方法的时候我们就知道底层是这样调用的,现在是验证)
runtime/chan.go/makechan函数的定义

func makechan(t *chantype, size int) *hchan {
}

这个方法有点难看懂,但是看签名可以知道,最后得到了一个hchan的结构体(1.17和1.20没区别)

2.2chan数据结构

type hchan struct {
        qcount   uint           // total data in the queue
        dataqsiz uint           // size of the circular queue
        buf      unsafe.Pointer // points to an array of dataqsiz elements
        elemsize uint16
        closed   uint32
        elemtype *_type // element type
        sendx    uint   // send index
        recvx    uint   // receive index
        recvq    waitq  // list of recv waiters
        sendq    waitq  // list of send waiters

        lock mutex
}
  • qcount 记录循环队列的元素数量,直接看成chan中元素个数就行
  • datasize 循环队列大小,就是make创建指定的长度
  • buf 循环队列的指针
  • elemtype chan中元素类型
  • elemtsize chan中单个元素大小
  • sendx 发送指针send在buf中位置/索引
  • recvx 接收指针recv在buf中位置/索引
  • sendq和recvq 使用双向链表(循环队列)用于存储等待的gorutine
  • lock 使用锁保护所有字段,所有chan是并发安全的

2.3初始化

实际上go在编译时会根据容量大小选择调用makechan64或者makechan,但两者逻辑基本相同(makechan64只做了size检查,底层还是调用makechan)
makechan会根据是否有缓冲区和元素类型是否为指针来初始化成不同的runtime.hchan

func makechan(t *chantype, size int) *hchan {
// elem是channel元素,这里我们是chan int
        elem := t.elem

        ...略过一连串检测代码...
//math.MulUintptr是判断申请的内存空间会不会超过最大申请限制
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
       ......  

        var c *hchan
        switch {
//无缓冲区情况分支,只给hchan分配一段内存
//即不创建buf
        case mem == 0:
                c = (*hchan)(mallocgc(hchanSize, nil, true))
                c.buf = c.raceaddr()
//chan元素不是指针的情况
//给hchan和缓冲区buf分配一块连续内存
        case elem.ptrdata == 0:
                c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
//hchan数据结构后面紧接着就是buf
                c.buf = add(unsafe.Pointer(c), hchanSize)
//元素包含指针
//buf单独分配一块内存
        default:
                c = new(hchan)
                c.buf = mallocgc(mem, elem, true)
        }

        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        c.dataqsiz = uint(size)
//locakinit不太重要,根据函数名字可以推测是锁初始化
        lockInit(&c.lock, lockRankHchan)

        return c
}

func (c *hchan) raceaddr() unsafe.Pointer {
        return unsafe.Pointer(&c.buf)
}

makechan无非是干了两件数:

  • 参数校验
  • 初始化hchan结构体,其中根据元素是否为指针又对buf进行了不同的内存分配处理

扩展-在线方法查看汇编代码

ssa是编译器经过优化生成的中间代码,在网页写代码之后直接查看,跟dlv一样,只是多一种同类型工具

2.4数据发送chansend

接下来会经常遇到这两个函数,先有个映像:

  • gopark阻塞当前协程
  • goready唤醒一个协程

查看底层汇编


image.png

ssa网页查看结果

chan的发送实际上是调用runtime.chansend1方法,是chansend的封装,分段学习他的逻辑(同样略过一堆参数验证)

func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

其中参数block为true时表示发送操作是阻塞的,反之false就是想要发送是不阻塞的(遇到阻塞情况就直接返回false)

  • 第一部分判断是否为nil,前面说过chan的零值是nil,会调用gopark让调用者永久阻塞(所以接下来的throw也永远不会执行)
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }

验证阻塞,测试结果显示超时

func Test_chan(t *testing.T) {
    var c chan int
    c <- 1
    t.Log("hi")
}
  • 第二部分,如果chan没被关闭,但是满了,又要求不阻塞发送,就直接返回false
    if !block && c.closed == 0 && full(c) {
        return false
    }
  • 第三部分,若对一个已经关闭的chan发送数据直接panic
     lock(&c.lock)
//验证通道是否关闭
//这里可以得知往已经关闭的通道发送会引发panic
        if c.closed != 0 {
                unlock(&c.lock)
                panic(plainError("send on closed channel"))
        }

至此我们对chan的nil、close的情况进行了检查,接下里就开始发送数据

  • 第四部分,如果循环队列有等待的人就直接扔给他(实际就是把通道内的元素拷贝到接收变量的内存地址上去)
     if sg := c.recvq.dequeue(); sg != nil {
                send(c, sg, ep, func() { unlock(&c.lock) }, 3)
                return true
        }

以下是扩展阅读
关于recvq是用于存储等待的gorutine的,是waitq类型,实际就是一个循环队列/双向链表

type waitq struct {
        first *sudog
        last  *sudog
}

waitq.dequeue逻辑很简单,就是递归读取链表

func (q *waitq) dequeue() *sudog {
        for {
//先获取头部元素,这个是最后要被弹出去的
//头部都没有就说明是个空链表
                sgp := q.first
                if sgp == nil {
                        return nil
                }
                y := sgp.next
//弹出头部后整理链表,更新first
//没有下一个就意味着弹出头部后是个空链表
//有就将下一个作为头部元素
                if y == nil {
                        q.first = nil
                        q.last = nil
                } else {
                        y.prev = nil
                        q.first = y
                        sgp.next = nil 
                }
//如果第一个不满足条件就扔掉重新来
                if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
                        continue
                }

                return sgp
        }
}

循环队列调用send方法发送数据,主要逻辑是以下部分

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
//拷贝变量地址到接收端的内存地址
        if sg.elem != nil {
                sendDirect(c.elemtype, sg, ep)
                sg.elem = nil
        }
        gp := sg.g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        sg.success = true
        if sg.releasetime != 0 {
                sg.releasetime = cputicks()
        }
        goready(gp, skip+1)

主要干了以下两件事情:

  • 调用sendDirect将发送数据直接拷贝到 接收channel通道元素的内存地址上去
    即 x = c <-a ,拷贝到x内存地址上
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
//memove是内存拷贝
        memmove(dst, src, t.size)
}
  • 调用goready
//这里进行协程调度是为了唤醒那个在等待数据的协程
func goready(gp *g, traceskip int) {
    // 切换到g0的栈
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

该函数主要功能就是切换的g0栈执行ready方法,核心方法是gorutine状态切换到runable,放入队列等待P调度,这个在之前学习GMP的时候已经很熟悉了,这就是唤醒协程(这里是数据接收方)的方法(注意是放入runnext,并没有立即执行,要等待P调度)

至此发送数据的第一种情况分析完毕:若有等待协程则直接发送,具体发送过程是将元素拷贝给接收变量内存地址,唤醒数据接收方进入协程调度
发送过程
  • 第五部分,若没有等待协程,且buf没满,数据放入buf,然后退出
//datasize缓冲区大小,qcount为放入元素数量
//进入该分支时此时缓冲区未满,且接收端没人等
        if c.qcount < c.dataqsiz {
//计算出下一个存储数据位置,sendx是发送指针位置
                qp := chanbuf(c, c.sendx)
//将刚进入channel的元素发送到计算出的位置
                typedmemmove(c.elemtype, qp, ep)
//发送指针自增
                c.sendx++
// 缓冲区满了,
//重置sendx
                if c.sendx == c.dataqsiz {
                        c.sendx = 0
                }
                c.qcount++
                unlock(&c.lock)
                return true
        }

qcount<datasize即buf中还有剩余空间,利用chanbuf计算出下一个存储数据的位置,再利用typedmemove将发送数据拷贝到计算出的位置,发送指针sendx+1,通道元素数量qcount+1

  • 第6部分,也就是第三种发送情况,走到这里说明是条件都不满足:
    1.要么没有缓冲区,也没人消费
    2.要么缓冲区buf大小不够,也没人消费
    需要阻塞当前协程
    if !block {
        unlock(&c.lock)
        return false
    }
//func getg() *g尝试获取发送数据使用的协程
//就是自己了没错
    gp := getg()
//获取sudog结构体并设置一次阻塞发送的配置
//将要发送的元素进行单独特例包装
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
//扔到sendq链表里面去
    c.sendq.enqueue(mysg)
    atomic.Store8(&gp.parkingOnChan, 1)
//你个协程睡觉去吧你
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

acquireSudog返回一个sudog结构体,并设置这一次阻塞发送的相关信息,包括发送的协程,是否在select中发送等
sendq.enqueue就是和recvq的dequeue相反,是将其入队,等待条件满足唤醒
调用gopark切走协程主动让出cpu执行权限
剩下代码就是被唤醒后做一些收尾的工作,如释放内存,return true表示成功发送数据

2.5只发送不接收引发协程泄漏

在上述源码阅读中我们发现,如果到达第三种情况,即要么没缓冲区也没人接收,要么有缓冲区但是满了,最后会阻塞协程进入休眠

gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

在以后学完GMP后我们会知道协程是分配在堆上面的,一直休眠就意味着该堆空间一直没法回收,就引发了协程泄漏,同理只接收不发送也会如此(指到达第三种情况)

2.6小结

至此通过阅读chansend源码,数据发送(chan<-i)的三种情况已经分析完毕:
1.刚好有协程在recvq中等待则直接给他
2.缓冲区有空位置(qcount<datasize)就放上去
3.都不满足就创建sudo结构体,包装元素,扔到sendq链表中,阻塞当前协程等待被唤醒
发送数据时触发Gorutine调度的几个时机:
1.直接发送时会唤醒协程(send调用goready),等待调度
2.发送数据时没找到接收方且缓存已满就将自己入队等待调度(gopark)

2.7.接收数据chanrecv

ssa截图

本质上是调用runtime.chanrecv1(是对chanrecv的封装),
如果返回两个值编译器是调用chanrecv2

    func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
  }
  func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
  }

chanrecv函数签名,同样也是分段学习

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
  • 第一部分同理检查nil,后面的非阻塞也一样就先跳过
    if c == nil {
        if !block {
            return
        }
        gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
  • 第二部分,讨论被关闭的情况,如果已经关闭且没元素了就返回true和false,映射到开发者操作已关闭的chan,如果已关闭且没元素,接收chan的第二个参数返回false;反之还有元素就返回true和true,映射到开发者就是返回true
    lock(&c.lock)

    if c.closed != 0 {
        if c.qcount == 0 {
            if raceenabled {
                raceacquire(c.raceaddr())
            }
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
//这个是1.20新增的代码,但是总体逻辑还是一样的
//如果通道被关闭且缓冲区还有数据
//还能继续接收
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }

因此这也是开发者不能单凭第二个参数来判断通道是否已关闭的原因,实际开发中会类似content.Context再使用一个通道发送关闭信号


又到了真正接收数据的逻辑部分

  • 第三部分,有等待的发送方就直接去找他要数据
    if sg := c.sendq.dequeue(); sg != nil {
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

这里要注意recv会根据缓冲区大小分别处理

  • 不存在则调用recvDirect从sendq队列上取

  • 存在则将数据拷贝到内存,将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方

  • 第四部分,没等待的发送方,buf有数据直接拿

    if c.qcount > 0 {
        qp := chanbuf(c, c.recvx)

        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        typedmemclr(c.elemtype, qp)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.qcount--
        unlock(&c.lock)
        return true, true
    }

逻辑类似上面发送代码,存到缓冲区中

  • 第五部分,buf没元素,阻塞
    if !block {
        unlock(&c.lock)
        return false, false
    }

    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    gp.waiting = mysg
    mysg.g = gp
    mysg.c = c
    c.recvq.enqueue(mysg)
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

还是获取接收协程信息,sudog包装入队然后阻塞,最后收尾工作

2.8.关闭通道close

调用closechan方法
image.png

前面是异常处理,跳过

c.closed = 1

    var glist gList
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        gp := sg.g
        gp.param = nil
        glist.push(gp)
    }

    for {
        sg := c.sendq.dequeue()
        ...
    }
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

清除sendq上的未被处理协程,最后为所有被阻塞的调用goready触发调度

3.channel使用容易犯的错误

使用chan最常见的错误是引发panic和协程泄露(一直gopark),其实我们通过源码学习也知道了部分原因
引发panic的情况:

  • close为nil的chan
  • close已经close的chan
  • send已经close的chan

引发协程泄露的情况

  • 对nil的chan进行发送和接收,也是卡在gopark
  • 只接收不发送和只发送不接收,这很好理解直接卡在gopark了
  • 还有一些特殊情况都是上面第二种的衍生
func process(timeout time.Duration) bool {
    ch := make(chan bool)
    go func() {
        // 模拟处理耗时的业务
        time.Sleep((timeout + time.Second))
        ch <- true // block
        fmt.Println("exit goroutine")
    }()
    select {
    case result := <-ch:
        return result
    case <-time.After(timeout):
        return false
    }
}

此时如果time.After先超时了,那么go开启的协程就会阻塞永远结束不了,解决办法就是给chan增加容量

4.select

在上面分析阻塞接收和阻塞发送时,我们都遇到结构体sudo(进入recvq或sendq队列的结构体,保存协程信息),它有一个字段叫做isSelect,判断当前执行环境是否在select中,其实在学习select的时候我们就知道了,我们常将select和channel搭配使用,这也说明了两者代码是存在一定关系的。
但是当我们想使用dlv对select打断点时发现有些没有效果,这段代码会在编译阶段被优化成其他!

4.1无case的select

select{}
image.png

这个可以通过dlv捕捉到,内部实现是runtime.block方法

func block() {
    gopark(nil, nil, waitReasonSelectNoCases, traceEvGoStop, 1) // 永久阻塞
}

同时内部也调用gopark(让出当前 Goroutine 对处理器的使用权并传入等待原因 waitReasonSelectNoCases,上文的非阻塞发送有遇到过)进行永久阻塞

4.2单一channel

demo.go

func main() {
    c := make(chan int)
    x := 0
    select {
    case c <- x:
        fmt.Println("hi")
    }
}
image.png

实际上是直接被编译器优化成接收方法,并没有select的痕迹

4.3单一channel+default

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    default:
        fmt.Println("bye")
    }
image.png

调用selectnbsend,将c<-x改为 <-c


image.png

变成了调用selectnbrecv
阻塞发送的情况

func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        return chansend(c, elem, false, getcallerpc())
}

内部是对chansend和chanrecv的封装,区别在于传入的block=false,还记得上文分析过,当刚好没有等待协程且缓冲区已满时会进入block判断分支:

//chansend函数
if sg:= c.recvq.dequeue
   ....
if c.qucount < c.dataqsize
  ...

        if !block {
                unlock(&c.lock)
                return false
        }

也就是说遇到上面情况(没有消费协程且没缓冲区)会快速失败,而不是继续向下执行阻塞协程,来让select后面的代码有可执行机会

4.3.1 default的应用举例

比如手写一个消息队列(这里我们采用一个发布者对应多个订阅者时使用多个通道的方式实现),有如下结构体

type Msg struct {
    context string
}

type Broker struct {
    l  sync.RWMutex
    ch []chan Msg
}

订阅方法如下

func (b *Broker) SubScribe(c int) (<-chan Msg, error) {
    b.l.Lock()
    defer b.l.Unlock()
    ch := make(chan Msg, c)
    b.ch = append(b.ch, ch)
    return ch, nil
}

发送消息如下

func (b *Broker) Send(m Msg) error {
    b.l.Lock()
    defer b.l.Unlock()
    for _, c := range b.ch {
        c<-m
    }
    return nil
}

这时我们就遇到问题,如果一个订阅者中的消息没有被消费导致他的通道满了(即通道容量本来是10,现在发送第11个消息,前面都没消费),就会导致后面全部卡住,此时我们可以修改成以下方式发送

        select {
        case c <- m:
        default:
            return errors.New("cant send")
        }

4.3.2多路复用(多个channel)

demo.go

    select {
    case c <- x:
        fmt.Println("hi")
    case <-c:
        fmt.Println(x)
    default:
        fmt.Println(x)
    }

image.png

变成了selectgo函数,这是select里面的重头戏(位于runtime/select.go)
先来看函数签名

func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)
  • case0是一个类型为[ncases]scase的数组
  • order0 是一个指向[2*ncases]uint16数组,值都为0

为什么 selectgo 还需要传递一个 order0,而不是直接根据 ncase 直接分配呢
编译转换会使用 temp 函数来构造生成数组的语句,而这个语句便可以保证数据会分配到栈上,而不是堆上,避免了不必要的堆分配

  • selectgo 会返回选中的序号,如果是个接收,还会返回是否接收到一个值

select在go语言不存在相应的结构体,但是使用的分支case在go中使用scase结构体表示

type scase struct {
    c    *hchan         // chan
    elem unsafe.Pointer // data element
}
  • c 类型hchan已经很熟悉了,就是一个通道的结构体,来存储case中使用的channel
    回到selectgo函数
//将case0数组和order0转为slice结构
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
   // [:n:n]的方式会让slice的len和cap相等
    ncases := nsends + nrecvs
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

下面看不懂没关系,我也看不懂索性直接copy,只要知道它是随机选取分支就好
首先会进行执行必要的初始化,决定处理case的两个顺序 :

  • 轮询顺序pollorder
  • 加锁顺序 lockorder

order1会被分为pollorder和lockorder,这两个slice将会真正决定select的随机选择以及死锁问题

    norder := 0
    for i := range scases {
        cas := &scases[i]
//对于channel为nil的收发操作,将他们的elem设置为nil
        if cas.c == nil {
            cas.elem = nil // allow GC
            continue
        }
// fastrabdb为生成随机数函数
//porder刚开始是0,循环结束后是随机顺序的scases索引
        j := fastrandn(uint32(norder + 1))
        pollorder[norder] = pollorder[j]
        pollorder[j] = uint16(i)
        norder++
    }
    pollorder = pollorder[:norder]
    lockorder = lockorder[:norder]

     ....
     sellock(scases, lockorder)
     ....

这里的轮询顺序pollorder是随机的,避免channel的饥饿问题,保证公平性,之后根据channel的地址顺序确定加锁顺序来避免死锁发生(sellock函数)

如果多个 goroutine 都需要锁定 ch1 ch2,而他们加锁的顺序不固定,那么很可能会出现死锁问题
这个时候,对加锁的顺序就有要求了,按照同样的顺序的话,没有竞争到 ch1.lock 的 goroutine,会等待加锁 ch1.lcok,而不会直接去加锁 ch2.lock

func sellock(scases []scases, lockorder []int16) {
    var c *hchan
    for _, o := range lockorder {
        c0 := scases[0].c // 根据加锁顺序获取 case

        // c 记录了上次加锁的 hchan 地址,如果和当前 *hchan 相同,那么就不会再次加锁
        if c0 != nil && c0 != c {
            c = c0
            lock(&c.lock)
        }
    }
}

加锁完成,进入selectgo主循环逻辑
第一阶段的主要职责是查找所有 case 中是否有可以立刻被处理的 Channel。无论是在等待的 Goroutine 上还是缓冲区中,只要存在数据满足条件就会立刻处理

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    gp = getg()
    nextp = &gp.waiting
    for _, casei := range lockorder {
        casi = int(casei)
        cas = &scases[casi]
        c = cas.c
        sg := acquireSudog()
        sg.g = gp
        sg.c = c

        if casi < nsends {
            c.sendq.enqueue(sg)
        } else {
            c.recvq.enqueue(sg)
        }
    }

    gopark(selparkcommit, nil, waitReasonSelect, traceEvGoBlockSelect, 1)
    ...
}

如果不能立刻找到活跃的 Channel 就会进入循环的下一阶段,按照需要将当前 Goroutine 加入到 Channel 的 sendq 或者 recvq 队列中
除了将当前 Goroutine 对应的 runtime.sudog 结构体加入队列之外,这些结构体都会被串成链表附着在 Goroutine 上。在入队之后会调用 runtime.gopark 挂起当前 Goroutine 等待调度器的唤醒。
等到 select 中的一些 Channel 准备就绪之后,当前 Goroutine 就会被调度器唤醒。这时会继续执行 runtime.selectgo 函数的第三部分,从 runtime.sudog 中读取数据:

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    ...
    sg = (*sudog)(gp.param)
    gp.param = nil

    casi = -1
    cas = nil
    sglist = gp.waiting
    for _, casei := range lockorder {
        k = &scases[casei]
        if sg == sglist {
            casi = int(casei)
            cas = k
        } else {
            c = k.c
            if int(casei) < nsends {
                c.sendq.dequeueSudoG(sglist)
            } else {
                c.recvq.dequeueSudoG(sglist)
            }
        }
        sgnext = sglist.waitlink
        sglist.waitlink = nil
        releaseSudog(sglist)
        sglist = sgnext
    }

    c = cas.c
    goto retc
    ...
}

第三次遍历全部 case 时,我们会先获取当前 Goroutine 接收到的参数 sudog 结构,我们会依次对比所有 case 对应的 sudog 结构找到被唤醒的 case,获取该 case 对应的索引并返回。

由于当前的 select 结构找到了一个 case 执行,那么剩下 case 中没有被用到的 sudog 就会被忽略并且释放掉。为了不影响 Channel 的正常使用,我们还是需要将这些废弃的 sudog 从 Channel 中出队。

当我们在循环中发现缓冲区中有元素或者缓冲区未满时就会通过 goto 关键字跳转到 bufrecv 和 bufsend 两个代码段,这两段代码的执行过程都很简单,它们只是向 Channel 中发送数据或者从缓冲区中获取新数据:

bufrecv:
    recvOK = true
    qp = chanbuf(c, c.recvx)
    if cas.elem != nil {
        typedmemmove(c.elemtype, cas.elem, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.qcount--
    selunlock(scases, lockorder)
    goto retc

bufsend:
    typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
    c.sendx++
    if c.sendx == c.dataqsiz {
        c.sendx = 0
    }
    c.qcount++
    selunlock(scases, lockorder)
    goto retc

这里在缓冲区进行的操作和直接调用 runtime.chansendruntime.chanrecv 差不多,上述两个过程在执行结束之后都会直接跳到 retc 字段。

两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

两个直接收发 Channel 的情况会调用运行时函数 runtime.sendruntime.recv,这两个函数会与处于休眠状态的 Goroutine 打交道:

recv:
    recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    recvOK = true
    goto retc

send:
    send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
    goto retc

不过如果向关闭的 Channel 发送数据或者从关闭的 Channel 中接收数据,情况就稍微有一点复杂了:

  • 从一个关闭 Channel 中接收数据会直接清除 Channel 中的相关内容(1.20版本需要判断有无缓冲区);
  • 向一个关闭的 Channel 发送数据就会直接 panic 造成程序崩溃:
rclose:
    selunlock(scases, lockorder)
    recvOK = false
    if cas.elem != nil {
        typedmemclr(c.elemtype, cas.elem)
    }
    goto retc

sclose:
    selunlock(scases, lockorder)
    panic(plainError("send on closed channel"))

总体来看,select 语句中的 Channel 收发操作和直接操作 Channel 没有太多出入,只是由于 select 多出了 default 关键字所以会支持非阻塞的收发。

参考

1.go语言channel
2.go 深入刨析
3.go ready函数
4.go夜读

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 217,734评论 6 505
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,931评论 3 394
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 164,133评论 0 354
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,532评论 1 293
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,585评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,462评论 1 302
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,262评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,153评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,587评论 1 314
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,792评论 3 336
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,919评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,635评论 5 345
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,237评论 3 329
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,855评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,983评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,048评论 3 370
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,864评论 2 354

推荐阅读更多精彩内容

  • 设计理念 执行业务处理的 goroutine 不要通过共享内存的方式通信,而是要通过 Channel 通信的方式分...
    kyo1992阅读 594评论 0 0
  • Don't communicate by sharing memory, share memory by comm...
    IceberGu阅读 605评论 0 0
  • GO 中 Chan 实现原理分享 嗨,我是小魔童哪吒,还记得咱们之前分享过GO 通道 和sync包的使用吗?咱们来...
    阿兵云原生阅读 376评论 0 5
  • channel是golang中特有的一种数据结构,通常与goroutine一起使用,下面我们就介绍一下这种数据结构...
    cfanbo阅读 287评论 0 0
  • 设计原理 目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下: 先从 Channel 读取数据的...
    Xuenqlve阅读 1,611评论 0 0