2.1、channel创建
channel := make(chan int, 6)
channel创建,本质上就是初始化一个hchan结构体,使用make()创建channel,是如何调用到具体的实例化hchan的函数makechan()的?
在编译期根据make()中的不同类型,将其转换为不同类型的节点,
见/usr/local/go/src/cmd/compile/internal/gc/typecheck.go:327 OMAKE类型节点
func typecheck1(n *Node, top int) (res *Node) {
// ...
switch n.Op{
case OMAKE:
switch t.Etype {
case TCHAN:
l = nil
if i < len(args){
// ... 对缓冲区大小进行检测
n.Left = l // 带缓冲区,赋值缓冲区大小
}else{
n.Left = nodintconst(0) // 不带缓冲区
}
n.Op = OMAKECHAN
}
}
}
然后OMAKECHAN节点会/usr/local/go/src/cmd/compile/internal/gc/walk.go:416中转换成调用makechan或者makechan64的函数
// src/cmd/compile/internal/gc/walk.go
func walkexpr(n *Node, init *Nodes) *Node {
switch n.Op {
case OMAKECHAN:
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
}
}
makechan函数
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize % maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
}
return c
}
2.2、写channel
image.jpeg
流程:
- 首先判断recvq接收队列是否为空,若不为空,说明有消费者goroutine阻塞在队列里,即channel为空或者channel没有缓冲区。直接从recvq中取出一个G,写入数据,唤醒该G,结束写入流程。
- 若recvq为空,判断当前环形队列是否有空位,如果有,将数据写入队列尾部,如果队列已满,则将当前写数据的goroutine加入到sendq队列中,等待被唤醒
channel := make(chan int, 6) channel <- 10
<- 发送语句实际会被转换为chansend1
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
/*
* generic single channel send/recv
* If block is not nil,
* then the protocol will not
* sleep but return if it could
* not complete.
*
* sleep can wake up with g.param == nil
* when a channel involved in the sleep has
* been closed. it is easiest to loop and re-run
* the operation; we'll see that it's now closed.
*/
/*
*c: 操作的channel
ep:指针,指向发送的数据ch<-i
block:是否阻塞调用,在select case中才会设置为false
return:代表是否发送成功
*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
//channel为nil的情况,
if c == nil {
//block为false,直接返回false,表示发送失败
if !block {
return false
}
//对于nil channel,直接挂起当前goroutine,永久阻塞
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
//加锁
lock(&c.lock)
//如果channel关闭,panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
//从待接收队列中获取等待的goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
//只要可以从待接收队列中获取到goroutine,那么发送操作都是只需要copy一次,见code1:send
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
typedmemmove(c.elemtype, qp, ep)
//递增存放发送数据的索引
c.sendx++
if c.sendx == c.dataqsiz {
//调整索引
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
//如果是非阻塞发送,那么可以直接解锁返回,未发送成功
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
//阻塞发送,挂起当前goroutine
gp := getg()
//生成配置sudo
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
//入队待发送队列
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
//挂起goroutine,等待唤醒
//chanparkcommit函数会解锁,ch.lock
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
code1:send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
//sg.elem是指向接收goroutine中接收数据的指针 s<-ch
//如果待接收goroutine需要接收具体的数据,那么直接将数据copy到sg.elem
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
//赋值param,待接收者被唤醒后会根据 param 来判断是否是被发送至唤醒的
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1) //唤醒待接收者
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
// src is on our stack, dst is a slot on another stack.
//src 是发送的数据源地址,dst是接收数据的地址
//src 在当前的goroutine栈中,dst在其他栈上
// Once we read sg.elem out of sg, it will no longer
// be updated if the destination's stack gets copied (shrunk).
// So make sure that no preemption points can happen between read & use.
dst := sg.elem
// No need for cgo write barrier checks because dst is always
// Go memory.
//使用memove直接进行内存copy
//因为dst指向其他goroutine的栈,如果它发生了栈收缩,那就没有修改真正的dst位置。所以需要加一个写屏障
typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
memmove(dst, src, t.size)
}
写channel的特性:
- 向nil channel发送数据会被永久阻塞,并且不会被select语句选中
- 如果channel未关闭,非缓冲区没有待接收的goroutine,或者缓冲区已满,那么不会被select语句选中
- 向关闭的channel发送数据,会panic,并且可以被select语句选中,这也就意味着select语句中可能会panic
- 如果有待接收者,那么会将发送的数据直接copy到待接收者的接收位置,然后唤醒接收者
- 如果有缓冲区,并且缓冲区未满,那么就直接把发送的数据copy到缓冲区中
- 如果channel未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前goroutine,等待被唤醒
- 发送者被阻塞后,可以被关闭channel操作或者被接收者操作唤醒,关闭channel导致发送者被唤醒后,会panic
- 当channel中有待接收goroutine,那么channel的状态必然是费缓冲或者缓冲区为空
发送数据,可以被select选中的情况:
- channel已关闭
- channel未关闭,channel有待接收的goroutine,或者缓冲区不为空且缓冲区未满