本文是goroutine 调度系列的网络io篇
初始化一个全局的epoll实例
网络io时会注册套接字文件描述符的读或者写事件,m放弃g
调度程序(sysmon)监听注册的套接字文件描述符的读写事件,把就绪的goroutine放入全局就绪队列
放弃网络io的g的m也就去尝试去获取goroutine,包括窃取其他p的g或者全局就绪的g(入口就是runtime下的findrunnable函数(调用schedule会调用此函数))
此处是在linux amd64环境下,针对tcp
为了简洁直接从accept开始,跳过listen阶段
net package 中
tcpsock.go中
func (l *TCPListener) Accept() (Conn, error) {
...
c, err := l.accept()
...
}
tcpsock_posix.go中
func (ln *TCPListener) accept() (*TCPConn, error) {
fd, err := ln.fd.accept()
...
}
fd_unix.go中
func (fd *netFD) accept() (netfd *netFD, err error) {
...
if err = netfd.init(); err != nil {
...
}
...
}
func (fd *netFD) init() error {
return fd.pfd.Init(fd.net, true)
}
internal/poll package 中
fd_unix.go中
func (fd *FD) Init(net string, pollable bool) error {
...
return fd.pd.init(fd)
}
fd_poll_runtime.go中
func (pd *pollDesc) init(fd *FD) error {
初始化全局epoll实例
serverInit.Do(runtime_pollServerInit)
注册文件描述符
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
...
}
针对套接字的读
conn.Read
net package 中
fd_unix.go 中
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
...
}
func (fd *FD) Read(p []byte) (int, error) {
...
注册读事件
if err := fd.pd.prepareRead(fd.isFile); err != nil {
...
}
...
设置了非阻塞如果读失败会返回eagain
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
}
runtime package 中
netpoll.go中
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
...
for !netpollblock(pd, int32(mode), false) {
...
}
}
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
...
if waitio || netpollcheckerr(pd, mode) == 0 {
gopark(netpollblockcommit, unsafe.Pointer(gpp), "IO wait", traceEvGoBlockNet, 5)
}
...
}
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
...
mcall(park_m)
}
func park_m(gp *g) {
m放弃g
dropg()
触发调度,也就是m寻找可以可运行的goroutine
schedule()
}
proc.go中
调度程序
func sysmon() {
...
获取套接字文件描述符的就绪事件
gp := netpoll(false)
if gp != nil {
...
injectglist(gp)
....
}
}
func injectglist(glist *g) {
...
放入全局就绪队列
for n = 0; glist != nil; n++ {
gp := glist
glist = gp.schedlink.ptr()
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
}
根据空闲p的数量启动相应的m数量
for ; n != 0 && sched.npidle != 0; n-- {
startm(nil, false)
}
}