1)chan结构体-hchan
- channel内部是固定长度的双向循环链表,make时确认size大小
- 环形队列有关的变量:
qcount 入队元素数
dataqsiz 队列容量
sendx发送索引
recvx接收索引 - 关于发送缓冲队列和接收缓冲队列:
当写阻塞队列不空,说明chan的buf已经无法写了,并且读阻塞队列为空
当读阻塞队列不空,说明chan的buf和sendq均为空
type hchan struct {
qcount uint // 当前队列中总元素个数
dataqsiz uint // 环形队列长度,即缓冲区大小(申明channel时指定的大小)
buf unsafe.Pointer // 环形队列指针
elemsize uint16 // buf中每个元素的大小
closed uint32 // 0为正常,关闭时字段为1
elemtype *_type // 元素类型,用于传值过程的赋值
// 环形缓冲区中已发送位置索引
sendx uint // send index
// 环形缓冲区中已接收位置索引
recvx uint // receive index
// 等待读消息的groutine队列。
// 当读阻塞队列不空,说明chan的buf和sendq均为空
recvq waitq // list of recv waiters,放的是等待接收的睡眠协程
// 等待写消息的groutine队列
// 当写阻塞队列不空,说明chan的buf已经无法写了,并且读阻塞队列为空
sendq waitq // list of send waiters,放的是等待发送的睡眠协程
// 互斥锁,为每个读写操作锁定通道(发送和接收必须互斥)
lock mutex
}
// 等待读写的队列数据结构,保证先进先出
type waitq struct {
first *sudog
last *sudog
}
2)make chan
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
//hchanSize返回实际占用的内存大小,扣去字节对齐部分
// 对应的源码为 c := make(chan int, size)
// c := make(chan int) 这种情况下,size = 0
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
//判断a*b是否会溢出(对于64位机器,大于2的64次方)
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// 空队列或队列中元素为空
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// raceaddr内部实现为:return unsafe.Pointer(&c.buf)
//c.buf存c.buf自己的地址,用于竞争探测。
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// elem不包含指针,直接申请hchan+mem内存大小
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
//包含指针,则为buf单独开辟空间
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
3) 发送数据
sudog 存储
sudoG 结构体:
// sudog 代表在等待列表里的 g,比如向 channel 发送/接收内容时
// 之所以需要 sudog 是因为 g 和同步对象之间的关系是多对多的
// 一个 g 可能会在多个等待队列中,所以一个 g 可能被打包为多个 sudog
// 多个 g 也可以等待在同一个同步对象上
// 因此对于一个同步对象就会有很多 sudog 了
// sudog 是从一个特殊的池中进行分配的。用 acquireSudog 和 releaseSudog 来分配和释放 sudog
//block表示是否阻塞(正常都是阻塞的,在select中有default就是非阻塞的)
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//第一种情况,空chan
if c == nil {
//非阻塞立即返回
if !block {
return false
}
//协程挂起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
//未关闭的chan且非阻塞状态如果满了就立即返回
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//互斥锁上锁
lock(&c.lock)
//向已经关闭的chan发数据会panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//从接收者队列中拿一个goroutine,把数据发给对应的goroutine,直接返回
//接收队列有数据,说明buf环为空
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
//没有等待接收的goroutine,如果队列不空,则放入到buf中
if c.qcount < c.dataqsiz {
//缓存环形buf未满,直接放进去,返回
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
//非阻塞,缓存满了,则放入失败
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
//获取一个sudog,把send协程信息和send内容放入到sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//把sudong放入chan的发送阻塞队列中
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
//休眠当前协程,等待某次接收流程被唤醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
//保持ep不被gc掉
KeepAlive(ep)
// someone woke us up.
//判断协程是否被其他处唤醒过
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)//释放sudog
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if sg.elem != nil {
//直接发送的recv goroutine
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
//释放chan的互斥锁
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
//唤醒recv 协程
goready(gp, skip+1)
}
发送过程:
1当对一个nil chan进行写操作时,如果是非阻塞调用,直接返回;否则将当前协程挂起
2 非阻塞模式且chan未close,没有缓冲区且没有等待接收或者缓冲区满的情况下,直接return false。
3 c.recvq中有等待读的接收者(说明唤醒buf为空),将其出队,将数据直接copy给接收者,并唤醒接收者。 返回
4 缓冲区未满的情况下,数据放入环形缓冲区即可。 返回
5缓冲区满了,把go协程信息和element指针放入到新申请sudog,把sudog放入到sendq(发送阻塞队列)。
总结一下:先看接收阻塞队列,再看环形缓冲队列,最后不行就挂起当前协程放入到发送阻塞队列(对于block类型)。
4) chan读操作
1 读空chan直接阻塞当前协程
2 非阻塞状态且chan为空的话已经close直接返回0值,没close的直接返回
3 对于阻塞状态,如果已经关闭且队列为空的话,直接返回0值
开始正常步骤:
4 如果有等待发送数据的groutine,从sendq中取出一个等待发送数据的Groutine,取出数据
5 如果没有等待的groutine,且环形队列中有数据,从队列中取出数据
6 如果没有等待的groutine,且环形队列中也没有数据,则阻塞该Groutine,并将groutine打包为sudogo加入到recevq等待队列中
总结一下:先从发送阻塞队列取,没有就从缓冲区取,还没有就挂起当前协程到接收缓冲队列(block型).
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//非阻塞时 先检查chan中是否有可以接收的数据(empty())
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
//chan没被关闭,直接返回
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
//已经关闭且无任何缓存数据的chan,直接返回
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
//已经关闭的chan,队列没有数据,存放数据清零直接返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
//如果发送队列有阻塞go协程(说明buf队列已经满了),则拿一个进行接收
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
//buf队列中有数据,直接取出来一个
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
//未关闭且没数据可以取,非阻塞直接返回
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
//没任何数据可读,当前协程放入读阻塞队列
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
5)关闭chan操作
把读阻塞队列所有协程释放,写阻塞队列所有协程释放(并把数据丢弃)
func closechan(c *hchan) {
//关闭空chan,直接panic
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
//已经关闭的chan,直接panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
var glist gList
// 释放读阻塞队列
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// 释放写阻塞队列,同时elem都直接释放(数据丢弃,无法被读)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
//释放所有被阻塞的协程
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
https://juejin.cn/post/7036279988131201054#heading-4
https://juejin.cn/post/6875325172249788429