设计原理
目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:
- 先从 Channel 读取数据的 Goroutine 会先接收到数据;
- 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;
Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:
- 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
- 异步 Channel — 基于环形缓存的传统生产者消费者模型;
- chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;
channel 的构造语句 make(chan int),将会被 golang 编译器翻译为 runtime.makechan 函数
func makechan(t *chantype, size int) *hchan
type hchan struct {
qcount uint // buffer 中已放入的元素个数
dataqsiz uint // 用户构造 channel 时指定的 buf 大小
buf unsafe.Pointer // buffer
elemsize uint16 // buffer 中每个元素的大小
closed uint32 // channel 是否关闭,== 0 代表未 closed
elemtype *_type // channel 元素的类型信息
sendx uint // buffer 中已发送的索引位置 send index
recvx uint // buffer 中已接收的索引位置 receive index
recvq waitq // 等待接收的 goroutine list of recv waiters
sendq waitq // 等待发送的 goroutine list of send waiters
lock mutex
}
type waitq struct {
first *sudog // 链表
last *sudog
}
Channel 的 ring buffer 实现
channel 中使用了 ring buffer(环形缓冲区) 来缓存写入的数据。ring buffer 有很多好处,而且非常适合用来实现 FIFO 式的固定长度队列。hchan 中有两个与 buffer 相关的变量: recvx 和 sendx。
- sendx 表示 buffer 中可写的 index。
- recvx 表示 buffer 中可读的 index。
- 从 recvx 到 sendx 之间的元素,表示已正常存放入 buffer 中的数据。
makechan
代码根据 Channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和缓冲区
- 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
- 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
- 在默认情况下会单独为 runtime.hchan 和缓冲区分配内存;
在函数的最后会统一更新 runtime.hchan 的 elemsize、elemtype 和 dataqsiz 几个字段。
发送数据
channel 的发送过程 (如 c <- 1), 对应于 runtime.chansend 函数的实现。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
在尝试向 channel 中发送数据时,如果 recvq 队列不为空,则首先会从 recvq 中头部取出一个等待接收数据的 goroutine 出来。并将数据直接发送给该 goroutine。代码如下
lock(&c.lock)
直接发送
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
recvq 中是正在等待接收数据的 goroutine。当某个 goroutine 使用 recv 操作 (例如,x := <- c),如果此时 channel 的缓存中没有数据,且没有其他 goroutine 正在等待发送数据 (即 sendq 为空),会将该 goroutine 以及要接收的数据地址打包成 sudog 对象,并放入到 recvq 中。
如果此时 recvq 不为空,则调用 send 函数将数据拷贝到对应的 goroutine 的堆栈上。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
send 函数的实现主要包含两点:
- memmove(dst, src, t.size) 进行数据的转移,本质上就是一个内存拷贝,将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;。
- goready(gp, skip+1) 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
缓冲区
如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会执行下面这段代码
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
- 首先会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,相当于 c.buf[c.sendx]
- 然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器
阻塞发送
如果用户使用的是无缓冲 channel 或者此时 buffer 已满,则 c.qcount < c.dataqsiz 条件不会满足,以上流程也并不会执行到。
- 调用 runtime.getg 获取发送数据使用的 Goroutine;
- 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;
- 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
- 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;
- 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;
我们在这里可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:
- 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
- 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
- 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;
接收数据
Channel 的接收过程使用两种不同的方式去接收
i <- ch
i, ok <- ch
这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。数据的接收操作遵循以下的路线图:func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
直接接收
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int){
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
}
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
ep 接收数据的变量对应的地址。例如,在 x := <- c 中,ep表示变量 x 的地址。
而 sg 代表从 sendq 中取出的第一个 sudog
- typedmemmove(c.elemtype, ep, qp) 表示 buffer 中的当前可读元素拷贝到接收变量的地址处。
- typedmemmove(c.elemtype, qp, sg.elem) 表示将 sendq 中 goroutine 等待发送的数据拷贝到 buffer 中。因为此后进行了 recv++, 因此相当于把 sendq 中的数据放到了队尾。
c.sendx = c.recvx, 这句话实际的作用相当于 c.sendx = (c.sendx+1) % c.dataqsiz,因为此时 buffer 依然是满的,所以 sendx == recvx 是成立的。
如果 Channel 不存在缓冲区;
- 调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
如果 Channel 存在缓冲区; - 将队列中的数据拷贝到接收方的内存地址;
- 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;
上图展示了 Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时,运行 <-ch 的执行过程。发送队列头的 runtime.sudog 中的元素会替换接收索引 recvx 所在位置的元素,原有的元素会被拷贝到接收数据的变量对应的内存空间上。
缓存区
当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。
阻塞接收
当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:
在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。
完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。
- 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
- 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
- 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
- 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
- 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;
从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:
- 当 Channel 为空时;
- 当缓冲区中不存在数据并且也不存在数据的发送者时;
关闭管道
编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 函数。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:
func closechan(c *hchan)
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。