下载Go源码后,根目录结构如下:
VERSION-- 文件,当前Go版本
api-- 目录,包含所有API列表
doc-- 目录,Go语言的各种文档,官网上有的,这里基本会有
favicon.ico-- 文件,官网logo
include-- 目录,Go 基本工具依赖的库的头文件
lib-- 目录,文档模板
misc-- 目录,其他的一些工具,大部分是各种编辑器的Go语言支持,还有cgo的例子等
robots.txt-- 文件,搜索引擎 robots文件
src -- 目录,Go语言源码:基本工具(编译器等)、标准库
test-- 目录,包含很多测试程序(并非_test.go方式的单元测试,而是包含main包的测试)包括一些fixbug测试;可以通过这个学到一些特性的使用
channel实现文件目录:
/go/src/runtime/chan.go
channel 数据结构
channel 是 goroutine 之间通信的一种方式,CSP 模型中 消息通道对应的就是 channel;channel 结构体定义如下:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
结构体元素解析:
- qcount 缓冲通道中的元素个数
- dataqsiz 缓冲通道中的容量
- buf 有缓冲channel的缓冲区,一个定长环形数组
- elemsize 通道中存储元素的长度
- closed 关闭通道使用非0表示关闭
- elemtype 通道中存储元素的类型
- sendx 当前发送元素指向buf环形数组的下标指针
- recvx 当前接收元素指向buf环形数组的下标指针
- recvq 因消费者而阻塞的等待队列
- sendq 因生产者而阻塞的等待队列
- lock 锁保护 hchan 中的所有字段
核心的部分是存放 channel 数据的环形队列,dataqsiz、qcount 分别指定了队列的容量和当前使用量;另一个重要部分就是recvq 和 sendq 两个链表,recvq 是因读这个通道而导致阻塞的 goroutine,sendq 是因写这个通道而阻塞的 goroutine;如果一个 goroutine 阻塞于 channel 了,那么它就被挂在 recvq 或 sendq 中;waitq是链表的定义,包含一个头结点和一个尾结点:
type waitq struct {
first *sudog
last *sudog
}
链表中每个元素都是sudog结构体如下:
type sudog struct {
// The following fields are protected by the hchan.lock of the
// channel this sudog is blocking on. shrinkstack depends on
// this for sudogs involved in channel ops.
g *g
// isSelect indicates g is participating in a select, so
// g.selectDone must be CAS'd to win the wake-up race.
isSelect bool
next *sudog
prev *sudog
elem unsafe.Pointer // data element (may point to stack)
// The following fields are never accessed concurrently.
// For channels, waitlink is only accessed by g.
// For semaphores, all fields (including the ones above)
// are only accessed when holding a semaRoot lock.
acquiretime int64
releasetime int64
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel
}
结构中主要的就是 g、elem:
- g 代表着 G-M-P模型中的 G,sudog 是对g的封装便于在 csp 模型中 g 可以同时阻塞在不同的 channel 上
- elem 用于存储 goroutine 的数据;读通道时,数据会从 hchan 的队列中拷贝到 sudog 的 elem 域;写通道时,数据则是由 sudog 的elem 域拷贝到 hchan 的队列中
创建channel实现
创建方法:
ch := make(chan string,5)
实现函数如下:
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
// 异常判断 元素类型大小限制
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
// 异常判断 对齐限制
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// maxAlloc 是 Arena 区域的最大值,缓冲元素的大小与hchan相加不能超过 缓冲槽大小
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
// 无缓冲channel
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
// buf 是不分配空间 缓存地址就指向自己
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
// 分配一整块内存 存储hchan和 buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
// 是指针类型 分配hchan结构体 buf单独分配
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
// 初始化元素类型的大小
c.elemsize = uint16(elem.size)
// 初始化元素的类型
c.elemtype = elem
// 初始化 channel 的容量
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
通道缓冲chanbuf实现
实现函数如下:
// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}
传入 hchan 对象及元素在缓冲区环形数组中的下标计算该下标槽点内存地址并返回
发送数据channelsend实现
实现函数如下:
// 传入参数 hchan ,发送数据地址,是否阻塞发送, select中的通道操作使用
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 判断 channel 为空 向其中发送数据将会永久阻塞
if c == nil {
// 如果非阻塞返回 false
if !block {
return false
}
// 如果阻塞
// gopark 会使当前 goroutine 挂起,通过 unlockf 唤醒;调用gopark时传入的unlockf为nil,会被一直休眠
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
// 如果开启竞争检测
if raceenabled {
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not closed, we observe that the channel is
// not ready for sending. Each of these observations is a single word-sized read
// (first c.closed and second c.recvq.first or c.qcount depending on kind of channel).
// Because a closed channel cannot transition from 'ready for sending' to
// 'not ready for sending', even if the channel is closed between the two observations,
// they imply a moment between the two when the channel was both not yet closed
// and not ready for sending. We behave as if we observed the channel at that moment,
// and report that the send cannot proceed.
//
// It is okay if the reads are reordered here: if we observe that the channel is not
// ready for sending and then observe that it is not closed, that implies that the
// channel wasn't closed during the first observation.
// 写入数据到 channel
// 1.非阻塞写 2.没有关闭channel 3.无缓冲channel并且消费者环形队列头结点为空 或 有缓冲channel中存储的元素数量与容量相等 返回false
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
// 计时器
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 获取同步锁
lock(&c.lock)
// 判断 channel 关闭,解锁并 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 当有 goroutine 在 recvq 队列上等待时,跳过缓存队列,将消息直接发给 reciever goroutine;dequeue 从等待接受的 goroutine 队列链表获取一个sudog,goready 唤醒阻塞的 goroutine
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// 缓存队列未满,将消息复制到缓存队列上并移动 sendx 下标,hchan buf 数据量增加
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 数据拷贝到 buf 中
typedmemmove(c.elemtype, qp, ep)
// index 移动
c.sendx++
// 环形队列如果已经加到最大就置 0
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 缓冲元素数量加 1
c.qcount++
// 解锁返回
unlock(&c.lock)
return true
}
// 阻塞 解锁直接返回 false
if !block {
unlock(&c.lock)
return false
}
// chan队列已满,阻塞 将本协程放入等待协程中,同时休眠此协程
// Block on the channel. Some receiver will complete our operation for us.
// 创建 goroutine
gp := getg()
// 创建 sudog
mysg := acquireSudog()
// 初始化 释放时间
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 初始化写入的数据
mysg.elem = ep
mysg.waitlink = nil
// 初始化 goroutine
mysg.g = gp
mysg.isSelect = false
// 初始化 hchan
mysg.c = c
// goroutine 设置的休眠 sudog
gp.waiting = mysg
gp.param = nil
// 加入到写阻塞的等待队列
c.sendq.enqueue(mysg)
// 挂起休眠
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
// Ensure the value being sent is kept alive until the
// receiver copies it out. The sudog has a pointer to the
// stack object, but sudogs aren't considered as roots of the
// stack tracer.
// 保证数据不被回收
KeepAlive(ep)
// 此时被唤醒 gp.waiting不是当前的 mysg 直接 panic
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
// 等待的 sudog 置为 nil
gp.waiting = nil
// 唤醒时传递的参数为 nil 说明出问题了直接 panic
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
// 将 hchan 置为 nil
mysg.c = nil
// 释放 sudog
releaseSudog(mysg)
return true
}
生产者数据发送send实现如下:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
// Pretend we go through the buffer, even though
// we copy directly. Note that we need to increment
// the head/tail locations only when raceenabled.
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
// 写入的数据不为空
if sg.elem != nil {
// 将数据拷贝到 hchan
sendDirect(c.elemtype, sg, ep)
// sudog 中数据置为 nil
sg.elem = nil
}
// 取数 goroutine
gp := sg.g
unlockf()
// 传入 sudug 使 param 不为空
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒 goroutine
goready(gp, skip+1)
}
接收数据chanrecv实现
实现函数如下:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// raceenabled: don't need to check ep, as it is always on the stack
// or is new memory allocated by reflect.
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// hchan 为 nil
if c == nil {
if !block {
return
}
// hchan 中接收消息永久阻塞
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// Fast path: check for failed non-blocking operation without acquiring the lock.
//
// After observing that the channel is not ready for receiving, we observe that the
// channel is not closed. Each of these observations is a single word-sized read
// (first c.sendq.first or c.qcount, and second c.closed).
// Because a channel cannot be reopened, the later observation of the channel
// being not closed implies that it was also not closed at the moment of the
// first observation. We behave as if we observed the channel at that moment
// and report that the receive cannot proceed.
//
// The order of operations is important here: reversing the operations can lead to
// incorrect behavior when racing with a close.
// 1.非阻塞读 2.无缓冲channel并且消费者环形队列头结点为空 或 有缓冲channel中存储的元素数量为0 3.没有关闭channel 直接返回
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 计时器
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// 获取同步锁
lock(&c.lock)
// channel 关闭 且 缓冲元素为0 返回空值
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
// typedmemclr 使返回值 ep 变成了零值
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果有 send 生产者阻塞在队列中,直接从 send 生产者取数据
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// 缓存队列不为空,从队列头取出元素
if c.qcount > 0 {
// Receive directly from queue
// 根据hchan 索引获取数据地址
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
// 数据拷贝到 ep 中
typedmemmove(c.elemtype, ep, qp)
}
// 清空环形数组己经读取的 gp
typedmemclr(c.elemtype, qp)
// 移动索引
c.recvx++
// 环形队列如果已经加到最大就置 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 缓存队列元素数量减 1
c.qcount--
unlock(&c.lock)
return true, true
}
// 没有数据 读非阻塞 直接解锁返回
if !block {
unlock(&c.lock)
return false, false
}
// chan队列为空,阻塞 将本协程放入等待协程中,同时休眠此协程
// no sender available: block on this channel.
// 获取 goroutine
gp := getg()
// 获取 SudoG
mysg := acquireSudog()
// 初始化释放时间
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// goroutine 加入到读阻塞等待队列
c.recvq.enqueue(mysg)
// 休眠
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// 此时被唤醒 gp.waiting不是当前的 mysg 直接 panic
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
// 将 goroutine 中 param 参数置为 nil
gp.param = nil
// SudoG 中的 hchan 置为 nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
消费者数据接收recv实现如下:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 缓存队列不为空,直接从生产者获取数据
if c.dataqsiz == 0 {
if raceenabled {
racesync(c, sg)
}
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
// 有 send 阻塞在这里,从 buf 中获取数据
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
}
// copy data from queue to receiver
if ep != nil {
// 将 buf 中未读的当前位置数据拷贝给消费者
typedmemmove(c.elemtype, ep, qp)
}
// 将阻塞的生产者数据拷贝此位置
typedmemmove(c.elemtype, qp, sg.elem)
// 接收元素索引向后移动
c.recvx++
// 环形队列如果已经加到最大就置 0
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 环形队列读取的索引位置就是写入数据环形的末端
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
// 数据置为 nil
sg.elem = nil
// 获取 SudoG 中的 goroutine 传递给 param 参数
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 唤醒 sendq 里面 SudoG 对应的 g
goready(gp, skip+1)
}
closechan 实现
关闭通道设置chan关闭标志位,closed=1;函数如下:
func closechan(c *hchan) {
// 关闭为 nil 的 hchan 直接 panic
if c == nil {
panic(plainError("close of nil channel"))
}
// 获取同步锁
lock(&c.lock)
// 已关闭 hchan 释放锁 panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
// 将 closed 置为 1
c.closed = 1
var glist gList
// 遍历接收队列
// release all readers
for {
// 取出读阻塞队列中的 SudoG
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
// typedmemclr 使返回值 ep 变成了零值
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 获取 goroutine 将参数 param 值为空,在接收方法中根据 param 是否为空判断是否为close
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 将 goroutine 加入到 glist
glist.push(gp)
}
// 遍历发送队列
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
// 将接收队列和发送队列全部唤醒
goready(gp, 3)
}
}