channel 源码解析

设计原理

image.png

目前的 Channel 收发操作均遵循了先进先出的设计,具体规则如下:

  • 先从 Channel 读取数据的 Goroutine 会先接收到数据;
  • 先向 Channel 发送数据的 Goroutine 会得到先发送数据的权利;

Go 语言社区也在 2014 年提出了无锁 Channel 的实现方案,该方案将 Channel 分成了以下三种类型:

  • 同步 Channel — 不需要缓冲区,发送方会直接将数据交给(Handoff)接收方;
  • 异步 Channel — 基于环形缓存的传统生产者消费者模型;
  • chan struct{} 类型的异步 Channel — struct{} 类型不占用内存空间,不需要实现缓冲区和直接发送(Handoff)的语义;

channel 的构造语句 make(chan int),将会被 golang 编译器翻译为 runtime.makechan 函数

func makechan(t *chantype, size int) *hchan

type hchan struct {
   qcount   uint                // buffer 中已放入的元素个数       
   dataqsiz uint               //  用户构造 channel 时指定的 buf 大小    
   buf      unsafe.Pointer     //  buffer
   elemsize uint16             //  buffer 中每个元素的大小
   closed   uint32             //  channel 是否关闭,== 0 代表未 closed
   elemtype *_type              // channel 元素的类型信息    
   sendx    uint                // buffer 中已发送的索引位置 send index
   recvx    uint                // buffer 中已接收的索引位置 receive index  
   recvq    waitq              //  等待接收的 goroutine  list of recv waiters
   sendq    waitq               // 等待发送的 goroutine list of send waiters

   lock mutex
}

type waitq struct {
   first *sudog                // 链表 
   last  *sudog
}

Channel 的 ring buffer 实现

channel 中使用了 ring buffer(环形缓冲区) 来缓存写入的数据。ring buffer 有很多好处,而且非常适合用来实现 FIFO 式的固定长度队列。
image.png

hchan 中有两个与 buffer 相关的变量: recvx 和 sendx。

  • sendx 表示 buffer 中可写的 index。
  • recvx 表示 buffer 中可读的 index。
  • 从 recvx 到 sendx 之间的元素,表示已正常存放入 buffer 中的数据。

makechan

代码根据 Channel 中收发元素的类型和缓冲区的大小初始化 runtime.hchan 和缓冲区

  • 如果当前 Channel 中不存在缓冲区,那么就只会为 runtime.hchan 分配一段内存空间;
  • 如果当前 Channel 中存储的类型不是指针类型,会为当前的 Channel 和底层的数组分配一块连续的内存空间;
  • 在默认情况下会单独为 runtime.hchan 和缓冲区分配内存;

在函数的最后会统一更新 runtime.hchan 的 elemsize、elemtype 和 dataqsiz 几个字段。

发送数据

channel 的发送过程 (如 c <- 1), 对应于 runtime.chansend 函数的实现。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool

在尝试向 channel 中发送数据时,如果 recvq 队列不为空,则首先会从 recvq 中头部取出一个等待接收数据的 goroutine 出来。并将数据直接发送给该 goroutine。代码如下

lock(&c.lock)
直接发送
if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
}

recvq 中是正在等待接收数据的 goroutine。当某个 goroutine 使用 recv 操作 (例如,x := <- c),如果此时 channel 的缓存中没有数据,且没有其他 goroutine 正在等待发送数据 (即 sendq 为空),会将该 goroutine 以及要接收的数据地址打包成 sudog 对象,并放入到 recvq 中。
如果此时 recvq 不为空,则调用 send 函数将数据拷贝到对应的 goroutine 的堆栈上。

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)
if sg.elem != nil {
   sendDirect(c.elemtype, sg, ep)
   sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
   sg.releasetime = cputicks()
}
goready(gp, skip+1)

send 函数的实现主要包含两点:

  • memmove(dst, src, t.size) 进行数据的转移,本质上就是一个内存拷贝,将发送的数据直接拷贝到 x = <-c 表达式中变量 x 所在的内存地址上;。
  • goready(gp, skip+1) 将等待接收数据的 Goroutine 标记成可运行状态 Grunnable 并把该 Goroutine 放到发送方所在的处理器的 runnext 上等待执行,该处理器在下一次调度时会立刻唤醒数据的接收方;
image.png
缓冲区

如果创建的 Channel 包含缓冲区并且 Channel 中的数据没有装满,会执行下面这段代码

if c.qcount < c.dataqsiz {
   // Space is available in the channel buffer. Enqueue the element to send.
   qp := chanbuf(c, c.sendx)
   if raceenabled {
      racenotify(c, c.sendx, nil)
   }
   typedmemmove(c.elemtype, qp, ep)
   c.sendx++
   if c.sendx == c.dataqsiz {
      c.sendx = 0
   }
   c.qcount++
   unlock(&c.lock)
   return true
}
  • 首先会使用 runtime.chanbuf 计算出下一个可以存储数据的位置,相当于 c.buf[c.sendx]
  • 然后通过 runtime.typedmemmove 将发送的数据拷贝到缓冲区中并增加 sendx 索引和 qcount 计数器
image.png
阻塞发送

如果用户使用的是无缓冲 channel 或者此时 buffer 已满,则 c.qcount < c.dataqsiz 条件不会满足,以上流程也并不会执行到。

  1. 调用 runtime.getg 获取发送数据使用的 Goroutine;
  2. 执行 runtime.acquireSudog 获取 runtime.sudog 结构并设置这一次阻塞发送的相关信息,例如发送的 Channel、是否在 select 中和待发送数据的内存地址等;
  3. 将刚刚创建并初始化的 runtime.sudog 加入发送等待队列,并设置到当前 Goroutine 的 waiting 上,表示 Goroutine 正在等待该 sudog 准备就绪;
  4. 调用 runtime.goparkunlock 将当前的 Goroutine 陷入沉睡等待唤醒;
  5. 被调度器唤醒后会执行一些收尾工作,将一些属性置零并且释放 runtime.sudog 结构体;

我们在这里可以简单梳理和总结一下使用 ch <- i 表达式向 Channel 发送数据时遇到的几种情况:

  1. 如果当前 Channel 的 recvq 上存在已经被阻塞的 Goroutine,那么会直接将数据发送给当前 Goroutine 并将其设置成下一个运行的 Goroutine;
  2. 如果 Channel 存在缓冲区并且其中还有空闲的容量,我们会直接将数据存储到缓冲区 sendx 所在的位置上;
  3. 如果不满足上面的两种情况,会创建一个 runtime.sudog 结构并将其加入 Channel 的 sendq 队列中,当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据;

接收数据

Channel 的接收过程使用两种不同的方式去接收

i <- ch
i, ok <- ch

这两种不同的方法经过编译器的处理都会变成 ORECV 类型的节点,后者会在类型检查阶段被转换成 OAS2RECV 类型。数据的接收操作遵循以下的路线图:
image.png
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)

如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;

if c == nil {
   if !block {
      return
   }
   gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
   throw("unreachable")
}
lock(&c.lock)

如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回

if c.closed != 0 && c.qcount == 0 {
   if raceenabled {
      raceacquire(c.raceaddr())
   }
   unlock(&c.lock)
   if ep != nil {
      typedmemclr(c.elemtype, ep)
   }
   return true, false
}
直接接收
if sg := c.sendq.dequeue(); sg != nil {
   recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
   return true, true
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int){
   if c.dataqsiz == 0 {
      if raceenabled {
         racesync(c, sg)
      }
      if ep != nil {
         // copy data from sender
         recvDirect(c.elemtype, sg, ep)
      }
   } else {
      // Queue is full. Take the item at the
      // head of the queue. Make the sender enqueue
      // its item at the tail of the queue. Since the
      // queue is full, those are both the same slot.
      qp := chanbuf(c, c.recvx)
      if raceenabled {
         racenotify(c, c.recvx, nil)
         racenotify(c, c.recvx, sg)
      }
      // copy data from queue to receiver
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      // copy data from sender to queue
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   sg.elem = nil
   gp := sg.g
   unlockf()
   gp.param = unsafe.Pointer(sg)
   sg.success = true
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   goready(gp, skip+1)
}

ep 接收数据的变量对应的地址。例如,在 x := <- c 中,ep表示变量 x 的地址。
而 sg 代表从 sendq 中取出的第一个 sudog

  • typedmemmove(c.elemtype, ep, qp) 表示 buffer 中的当前可读元素拷贝到接收变量的地址处。
  • typedmemmove(c.elemtype, qp, sg.elem) 表示将 sendq 中 goroutine 等待发送的数据拷贝到 buffer 中。因为此后进行了 recv++, 因此相当于把 sendq 中的数据放到了队尾。

c.sendx = c.recvx, 这句话实际的作用相当于 c.sendx = (c.sendx+1) % c.dataqsiz,因为此时 buffer 依然是满的,所以 sendx == recvx 是成立的。
如果 Channel 不存在缓冲区;

  • 调用 runtime.recvDirect 将 Channel 发送队列中 Goroutine 存储的 elem 数据拷贝到目标内存地址中;
    如果 Channel 存在缓冲区;
  • 将队列中的数据拷贝到接收方的内存地址;
  • 将发送队列头的数据拷贝到缓冲区中,释放一个阻塞的发送方;

无论发生哪种情况,运行时都会调用 runtime.goready 将当前处理器的 runnext 设置成发送数据的 Goroutine,在调度器下一次调度时将阻塞的发送方唤醒。
image.png

上图展示了 Channel 在缓冲区已经没有空间并且发送队列中存在等待的 Goroutine 时,运行 <-ch 的执行过程。发送队列头的 runtime.sudog 中的元素会替换接收索引 recvx 所在位置的元素,原有的元素会被拷贝到接收数据的变量对应的内存空间上。

缓存区

当 Channel 的缓冲区中已经包含数据时,从 Channel 中接收数据会直接从缓冲区中 recvx 的索引位置中取出数据进行处理:

if c.qcount > 0 {
   qp := chanbuf(c, c.recvx)
   if raceenabled {
      racenotify(c, c.recvx, nil)
   }
   if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
   }
   typedmemclr(c.elemtype, qp)
   c.recvx++
   if c.recvx == c.dataqsiz {
      c.recvx = 0
   }
   c.qcount--
   unlock(&c.lock)
   return true, true
}

如果接收数据的内存地址不为空,那么会使用 runtime.typedmemmove 将缓冲区中的数据拷贝到内存中、清除队列中的数据并完成收尾工作。


image.png
阻塞接收

当 Channel 的发送队列中不存在等待的 Goroutine 并且缓冲区中也不存在任何数据时,从管道中接收数据的操作会变成阻塞的,然而不是所有的接收操作都是阻塞的,与 select 语句结合使用时就可能会使用到非阻塞的接收操作:
在正常的接收场景中,我们会使用 runtime.sudog 将当前 Goroutine 包装成一个处于等待状态的 Goroutine 并将其加入到接收队列中。
完成入队之后,上述代码还会调用 runtime.goparkunlock 立刻触发 Goroutine 的调度,让出处理器的使用权并等待调度器的调度。

  • 如果 Channel 为空,那么会直接调用 runtime.gopark 挂起当前 Goroutine;
  • 如果 Channel 已经关闭并且缓冲区没有任何数据,runtime.chanrecv 会直接返回;
  • 如果 Channel 的 sendq 队列中存在挂起的 Goroutine,会将 recvx 索引所在的数据拷贝到接收变量所在的内存空间上并将 sendq 队列中 Goroutine 的数据拷贝到缓冲区;
  • 如果 Channel 的缓冲区中包含数据,那么直接读取 recvx 索引对应的数据;
  • 在默认情况下会挂起当前的 Goroutine,将 runtime.sudog 结构加入 recvq 队列并陷入休眠等待调度器的唤醒;

从 Channel 接收数据时,会触发 Goroutine 调度的两个时机:

  • 当 Channel 为空时;
  • 当缓冲区中不存在数据并且也不存在数据的发送者时;

关闭管道

编译器会将用于关闭管道的 close 关键字转换成 OCLOSE 节点以及 runtime.closechan 函数。
当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:

func closechan(c *hchan)

当 Channel 是一个空指针或者已经被关闭时,Go 语言运行时都会直接崩溃并抛出异常:

if c == nil {
   panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
   unlock(&c.lock)
   panic(plainError("close of closed channel"))
}

将 recvq 和 sendq 两个队列中的数据加入到 Goroutine 列表 gList 中,与此同时该函数会清除所有 runtime.sudog 上未被处理的元素

c.closed = 1
var glist gList
// release all readers
for {
   sg := c.recvq.dequeue()
   if sg == nil {
      break
   }
   if sg.elem != nil {
      typedmemclr(c.elemtype, sg.elem)
      sg.elem = nil
   }
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   gp := sg.g
   gp.param = unsafe.Pointer(sg)
   sg.success = false
   if raceenabled {
      raceacquireg(gp, c.raceaddr())
   }
   glist.push(gp)
}
// release all writers (they will panic)
for {
   sg := c.sendq.dequeue()
   if sg == nil {
      break
   }
   sg.elem = nil
   if sg.releasetime != 0 {
      sg.releasetime = cputicks()
   }
   gp := sg.g
   gp.param = unsafe.Pointer(sg)
   sg.success = false
   if raceenabled {
      raceacquireg(gp, c.raceaddr())
   }
   glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
   gp := glist.pop()
   gp.schedlink = 0
   goready(gp, 3)
}

该函数在最后会为所有被阻塞的 Goroutine 调用 runtime.goready 触发调度。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,657评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,662评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,143评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,732评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,837评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,036评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,126评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,868评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,315评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,641评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,773评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,470评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,126评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,859评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,095评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,584评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,676评论 2 351

推荐阅读更多精彩内容