关于epoll/io多路复用的解析是经典八股文,网上很多讲解,这里不做赘述。
本文基于go1.17,是对go语言网络模型netpoller的学习笔记,只涉及unix下的epoll模型。侧重于弄清楚go是如何实现epoll模型的用户代码部分的,以及go是如何将内核缓冲区到用户空间的复制过程接入到go本身的调度的。
先来段简单的代码。这段代码监听了本地8888端口,并调用Accept方法在有连接到来时返回一个net.TCPConn的引用(声明是tcp服务器的情况下)。这个结构体最终会包含这个新连接的对应socket。
func Run(){
listen, err := net.Listen("tcp", "0.0.0.0:8888")
if err != nil {
fmt.Println(err)
return
}
for {
accept, err := listen.Accept()
if err != nil {
fmt.Println(err)
return
}
go handle(accept)
}
}
func handle(conn net.Conn){
data := make([]byte,1024)
read, err := conn.Read(data)
if err != nil {
fmt.Println(err)
return
}
fmt.Println("read:", read)
fmt.Println(data[:read])
_, err = conn.Write([]byte("hello"))
if err != nil {
fmt.Println(err)
return
}
}
跟踪进这个方法,发现主要是个accept的调用
// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
继续跟踪 代码分为两部分
func (ln *TCPListener) accept() (*TCPConn, error) {
// 第一部分返回一个netFD的引用(go对网络文件描述符的表示)
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 第二部分通过文件描述符创建一个新的TCPConn对象。
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
继续跟踪
func (fd *netFD) accept() (netfd *netFD, err error) {
// 这里d是个int类型 应该接近系统调用了
d, rsa, errcall, err := fd.pfd.Accept()
if err != nil {
if errcall != "" {
err = wrapSyscallError(errcall, err)
}
return nil, err
}
// 初始化新netFD结构体
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
poll.CloseFunc(d)
return nil, err
}
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
return netfd, nil
}
继续往下 fd.pfd.Accept() 方法
// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// 加锁
if err := fd.readLock(); err != nil {
return -1, nil, "", err
}
defer fd.readUnlock()
// 顾名思义,这里先不管
if err := fd.pd.prepareRead(fd.isFile); err != nil {
return -1, nil, "", err
}
// 来到了最核心的地方
for {
// 系统调用接受新连接,创建新socket
s, rsa, errcall, err := accept(fd.Sysfd)
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EINTR:
continue
// 主要看这个错误,如果用户进程进行socket的read操作时内核还没有准备好数据,则会返回这个错误也就是说这里就是go runtime发挥作用的地方
case syscall.EAGAIN:
if fd.pd.pollable() {
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
// This means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
continue
}
return -1, nil, errcall, err
}
}
继续 fd.pd.waitRead() 方法(TCPConn的Read方法最终也会走到这个函数)
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
// 最终调用了link到runtime包的方法
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
runtime_pollWait对应代码如下
// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// netpollcheckerr的作用先忽略
errcode := netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// As for now only Solaris, illumos, and AIX use level-triggered IO.
if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
netpollarm(pd, mode)
}
for !netpollblock(pd, int32(mode), false) {
// 并发环境中常见的双重检查?
errcode = netpollcheckerr(pd, int32(mode))
if errcode != pollNoError {
return errcode
}
// Can happen if timeout has fired and unblocked us,
// but before we had a chance to run, timeout has been reset.
// Pretend it has not happened and retry.
}
return pollNoError
}
既然是阻塞应该就是在netpollblock里面的逻辑了
// 一些注意事项
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.
// 这里主要注意gpp这个变量既作为要保存的阻塞协程的地址也作为了信号量在使用,在pollDesc结构体中有说明(rg uintptr // pdReady, pdWait, G waiting for read or nil)。有点丧心病狂。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// 区分读写操作
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to pdWait
for {
// Consume notification if already ready.
if atomic.Casuintptr(gpp, pdReady, 0) {
return true
}
// 一般来说能走到这都能跳出循环
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
// Double check that this isn't corrupt; otherwise we'd loop
// forever.
if v := atomic.Loaduintptr(gpp); v != pdReady && v != 0 {
throw("runtime: double wait")
}
}
// need to recheck error states after setting gpp to pdWait
// this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
// do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
if waitio || netpollcheckerr(pd, mode) == 0 {
// 这里才是重头戏了,调用gopark把当前协程停住了。并且停住之后gopark会调用netpollblockcommit把当前协程的结构体g写到gpp(也就是pollDesc.wg或者pollDesc.rg)上,而每个pollDesc都会在netpoll的链表上。之后go的sysmon或者schedule就能从这里恢复运行协程。
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// be careful to not lose concurrent pdReady notification
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
// 设置协程地址到gpp
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
来看gopark逻辑
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem() // 锁定当前m,不被调度给其他g运行
gp := mp.curg // 获取m上运行的g,在这个情境下就是发起读或者写的协程
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock // 也就是gpp
mp.waitunlockf = unlockf // 是netpollblockcommit地址
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp) // 解锁
// can't do anything that might move the G between Ms here.
mcall(park_m) //切换到g0栈执行park_m
}
这里mcall会切换到系统栈执行park_m,并把调用他的g传给park_m。
// park continuation on g0.
func park_m(gp *g) {
_g_ := getg() // _g_这个时候是g0了
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting) //修改传入的g状态为等待中
dropg() // 双向解绑g和m
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock) // netpollblockcommit执行
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule() // 开启新一轮调度,永不返回
}
func dropg() {
_g_ := getg()
setMNoWB(&_g_.m.curg.m, nil)
setGNoWB(&_g_.m.curg, nil)
}
至此从内核报syscall.EAGAIN错误开始到最终调用schedule()的链路打通了。go的runtime把本应该阻塞或者轮询的执行流(比如线程)换成了go自己协程的休眠。而协程的调度管理又是go语言的强大之处。
接下来分析如何唤醒协程,既然上面是让协程休眠了,那么可以猜测到在go的调度协程运行的地方会有唤醒因网络I/O阻塞的协程逻辑。在go的寻找可执行协程函数findrunnable我们能看见如下代码
// Poll network.
// This netpoll is only an optimization before we resort to stealing.
// We can safely skip it if there are no waiters or a thread is blocked
// in netpoll already. If there is any kind of logical race with that
// blocked thread (e.g. it has already returned from netpoll, but does
// not set lastpoll yet), this thread will do blocking netpoll below
// anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
// 这里返回可执行的协程列表
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
}
其中主要会调用netpoll
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 {
return gList{}
}
var waitms int32
if delay < 0 {
waitms = -1
} else if delay == 0 {
// 一般走到这里
waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
}
var events [128]epollevent // 为什么是128
retry:
// 看起来是这里了
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
if n != -_EINTR {
println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed")
}
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
if ev.events != _EPOLLIN {
println("runtime: netpoll: break fd ready for", ev.events)
throw("runtime: netpoll: break fd ready for something unexpected")
}
if delay != 0 {
// netpollBreak could be picked up by a
// nonblocking poll. Only read the byte
// if blocking.
var tmp [16]byte
read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
atomic.Store(&netpollWakeSig, 0)
}
continue
}
// 设置读还是写
var mode int32
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 还原pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.setEventErr(ev.events == _EPOLLERR)
// 取出之前保存的阻塞的g,添加到待运行列表
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
// This uses pwait instead of wait, because Android O blocks wait.
// 前四行都是取入参到对应寄存器
MOVL epfd+0(FP), DI
MOVQ ev+8(FP), SI
MOVL nev+16(FP), DX
MOVL timeout+20(FP), R10
MOVQ $0, R8 // 这一步没明白
MOVL $SYS_epoll_pwait, AX // 系统调用号281对应就是epoll_pwait系统调用
SYSCALL
MOVL AX, ret+24(FP) // 返回值赋值
RET
那么epoll_ctl和epoll_create在哪里调用呢
func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC) // 系统调用
if epfd < 0 {
epfd = epollcreate(1024) // 系统调用
if epfd < 0 {
println("runtime: epollcreate failed with", -epfd)
throw("runtime: netpollinit failed")
}
closeonexec(epfd)
}
r, w, errno := nonblockingPipe()
if errno != 0 {
println("runtime: pipe failed with", -errno)
throw("runtime: pipe failed")
}
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
if errno != 0 {
println("runtime: epollctl failed with", -errno)
throw("runtime: epollctl failed")
}
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}
go使用netpollopen对epoll_ctl进行了封装,主要是在调用netFD的connect方法时触发。大部分情况下就是dial建立连接的时候。
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 系统调用
}
最后值得一提的是go对epoll的接口的交互都是通过pollDesc结构体进行的。而pollDesc的内存空间是go语言负责分配管理(不受垃圾回收管理),不存在类似mmap的操作。
参考
https://blog.csdn.net/qu1993/article/details/111550425
Go netpoller 原生网络模型之源码全面揭秘 - Strike Freedom
https://zhuanlan.zhihu.com/p/464301587