通道的原理:
向通道写数据的协程是发送协程,从通道里读数据的是接收协程。
由于通道存在互斥锁,每次只有一个协程进行发送或者接收数据的操作。在通道里数据是严格按照先进先出的规则对数据进行写和读。
发送协程发送数据的大体过程:
优先看是否开以发送给接收的协程,同时唤醒第一个接收的协程
其次看是否可以发送到缓冲区
最后看是否需要阻塞等待
接收协程的大体流程:
优先看是否可以有等待发送的协程,如果有,先按照缓冲区接收下标,获取缓冲区中的数据,然后用发送协程携带的数据替换缓冲区中接收到的数据,遵守数据先进先出的原则。唤醒发送协程。
其次看缓冲区是否有数据可以接收
最后看是否需要阻塞等待
通道结构体
type hchan struct {
qcount uint // 如果有缓冲区,这个代表缓冲区中保存的总数据个数
dataqsiz uint // 如果有缓冲区,缓冲区可以保存的数据的个数
buf unsafe.Pointer // 指向缓冲区的地址,是一个环形队列,元素个数由make里面的参数指定,不指定就是0,每个元素大小为数据的大小,如make(chan uint8 ,6),用于生成6个uint8大小的缓冲区队列,每个发送协程发送的数据大小为1字节,每个接收协程也从缓冲区接收1字节的数据
elemsize uint16 //缓冲区中每个数据的大小
closed uint32 //通道是否关闭,0为没有关闭,1为已经关闭
elemtype *_type // element type //通道的元素类型
sendx uint // send index //缓冲区中发送的下标,下标从0开始,缓冲区类似于一个定长数组,当到达数组尾部,又会重新从0开始,从而实现先进先出的环形队列的效果
recvx uint // receive index //缓冲区中从该下标开始接收数据,下标从0开始,缓冲区类似于一个定长数组,当到达数组尾部,又会重新从0开始,从而实现先进先出的环形队列的效果
recvq waitq //等待的接收协程队列,先进先出,假如不存在缓冲区或者缓冲区中没有数据,并且是阻塞方式,接收协程就进入先进先出的队列进行等待,直到有发送协程将其唤醒,否则就会阻塞发生死锁现象或者一直等待导致协程内存泄露,等待接收者和发送者队列不能同时存在
sendq waitq // 等待发送的协程列表,假如通道不存在缓冲区,或者缓冲区已经满了,并且是阻塞方式,那么发送的协程就会进入先进先出的发送队列,直到有接收协程将其唤醒,否则就会阻塞发生死锁现象或者一直等待导致协程内存泄露,等待接收者和发送者队列不能同时存在
lock mutex //互斥锁保证每次只有一个协程对数据进行操作,保存数据先进先出的顺序
}
创建一个通道变量,通道变量是一个指向hchan结构体的指针
通道分为带缓冲区和不带缓冲区的
当通道元素不包含指针,缓冲区在hchan下面
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
// compiler checks this but be safe.
if elem.Size_ >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {
throw("makechan: bad alignment")
}
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
// 带缓冲区,通道元素不包含指针
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 带缓冲区通道元素包含指针
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")
}
return c
}
阻塞型通道和非阻塞型通道:
假如一个通道在select的case里面,并且该select里面包含了default分支,那么该通道为非阻塞型通道,否则为阻塞型通道。
下面的c是非阻塞型通道,因为select里面有default分支
当发现c通道里面没有缓冲区不能塞进数据的时候,发送协程不会进入发送者等待队列,而是直接返回
package main
import (
"fmt"
)
func main() {
var c = make(chan int)
select {
case c <- 4:
fmt.Println("case")
default:
fmt.Println("不会发生阻塞")
}
}
向通道发送数据的过程
1.假如通道为nil(一个通道没有用make进行初始化,或者是初始化后将其设置为nil)的时候,假如是非阻塞的方式,这个时候就会直接返回,告诉调用者不会选中该通道。如果是阻塞的方式,那么当前协程就会阻塞等待,可能会发生死锁的情况
2.假如通道是非阻塞的,没有关闭,缓冲区已经满了或者没有等待的接收协程,那么直接返回,并且不选择该通道
3.开始加锁
4.假如通道已经关闭,那么就解锁,并且panic.因为不能向一个已经关闭的通道发送数据
5.假如等待接收协程队列不为空,从等待接收协程队列里面取出第一个等待接收的协程,将数据发送给它,将其唤醒。返回。告诉调用者已经选中该通道
6.假如等待接收协程队列为空,那么看一下缓冲区是否已经满了。
如果缓冲区没有满,那么向发送下标标记的缓冲区写入数据。发送下标移动到下一个缓冲区,假如下标已经等于缓冲区长度,那么将发送下标设置为0,从头开始,解锁,返回,告诉调用者选中了该通道
如果缓冲区满了,假如是非阻塞的形式,那么直接返回,告诉调用者没有选中该通道,否则将进入先进先出的发送者队列进行等待,等待唤醒。唤醒后判断通道是否关闭,如果关闭就会panic。因为不能向关闭的通道写数据,如果没有关闭,就告诉调用者已经选中该通道。
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled {
racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second full()).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation. However, nothing here
// guarantees forward progress. We rely on the side effects of lock release in
// chanrecv() and closechan() to update this thread's view of c.closed and full().
if !block && c.closed == 0 && full(c) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
KeepAlive(ep)
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
return true
}
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
racenotify(c, c.recvx, nil)
racenotify(c, c.recvx, sg)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
关闭通道
1.假如通道是nil,不能将其关闭,否则会发生panic
2.假如通道之前已经关闭,不能重复关闭,此时也会发生panic
3.关闭时,将获取锁,将closed设置为1
4.关闭的通道只能读,不能写。
5.关闭后会唤醒所有的等待接收协程队列,因为有等待接收协程队列,所以缓冲区是没有数据的,这个时候会返回一个空值,和一个通道已经关闭的通知
6.关闭后会唤醒所有的等待发送协程列表,因为通道已经关闭,不能向已经关闭的通道发送数据,当发送协程运行后,就会发生panic
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = unsafe.Pointer(sg)
sg.success = false
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
关闭通道后,唤醒发送协程,发生panic的例子
package main
import (
"fmt"
"sync"
)
func main() {
var c = make(chan int)
var finish = make(chan int, 1)
var wait sync.WaitGroup
wait.Add(4)
for i := 0; i < 4; i++ {
go func(i int) {
wait.Done()
c <- i
}(i)
}
go func() {
wait.Wait()
close(c)
finish <- 1
}()
fmt.Println("finish", <-finish)
}
从通道接收数据
1.假如通道变量是nil
当通道是是非阻塞方式时,会直接返回,并且告诉调用者该通道没有被选中并且没有接收到数据
当通道是阻塞方式时,接收协程会被阻塞,可能会发生死锁的情况
2.当通道是非阻塞的,并且没有数据可以接收
假如通道没有被关闭,会立即返回,告诉调用者该通道没有被选中,并且没有接收到数据
当通道已经关闭,那么就得到一个零值,告诉调用者该通道被选中,并且没有接收到数据
3.加锁
4.假如通道已经关闭
此时等待发送协程队列一定为空,因为关闭通道的时候会将它们都唤醒
假如缓冲队列里面没有数据,那么就解锁,得到一个零值,返回,告诉调用者该通道被选中,没有接收到数据
5.假如通道没有关闭
首先看等待发送协程队列是否为空,如果不为空,那么就取第一个等待发送协程。
为了遵守数据先进先出的规则。
如果存在缓冲区,此时缓冲区的数据都是满的,那么先得到缓冲区中接收下标处的数据,然后将第一个等待发送协程携带的数据填充到由接收下标指定的位置,替换缓冲区中由接收下标指定位置的数据。接收下标加1指向缓冲区的下一个位置,如果接收下标为缓冲区长度,那么接收下标就需要变为0,从头开始。接着解锁唤醒该等待接收协程
如果不存在缓冲区,那么直接将取到发送协程所携带的数据,然后解锁,将该发送协程唤醒
在唤醒发送协程后,返回,告诉调用者选中该通道,已经接收到数据
6.假如缓冲区里面有数据
得到缓冲区中由接收下标指定位置的数据,将缓冲区中接收下标处的数据清零,将接收下标加1,移动到下一个位置,如果接收下标的值等于缓冲区的长度,那么将其设置为0,从缓冲区的头部开始接收数据,缓冲区中的数据量减少1,解锁。返回调用,告诉调用者选中了该通道,并且接收到了数据
7.假如发送队列为空,并且缓冲区里面也没有数据,那么就接收不到数据。
假如是非阻塞的方式,那么直接返回,解锁。告诉调用者没有选中该通道,没有接收到数据
假如是阻塞的方式,那么将按照先进先出的方式进入等待接收者对列等待被唤醒,被唤醒时获取了数据,返回,告诉调用者选中该通道,已经接收到数据
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
if !block && empty(c) {
// After observing that the channel is not ready for receiving, we observe whether the
// channel is closed.
//
// Reordering of these checks could lead to incorrect behavior when racing with a close.
// For example, if the channel was open and not empty, was closed, and then drained,
// reordered reads could incorrectly indicate "open and empty". To prevent reordering,
// we use atomic loads for both checks, and rely on emptying and closing to happen in
// separate critical sections under the same lock. This assumption fails when closing
// an unbuffered channel with a blocked send, but that is an error condition anyway.
if atomic.Load(&c.closed) == 0 {
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
return
}
// The channel is irreversibly closed. Re-check whether the channel has any pending data
// to receive, which could have arrived between the empty and closed checks above.
// Sequential consistency is also required here, when racing with such a send.
if empty(c) {
// The channel is irreversibly closed and empty.
if raceenabled {
raceacquire(c.raceaddr())
}
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
if c.closed != 0 {
if c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// The channel has been closed, but the channel's buffer have data.
} else {
// Just found waiting sender with not closed.
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
if !block {
unlock(&c.lock)
return false, false
}
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
gp.activeStackChans = false
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}