netpoller

关于epoll/io多路复用的解析是经典八股文,网上很多讲解,这里不做赘述。

本文基于go1.17,是对go语言网络模型netpoller的学习笔记,只涉及unix下的epoll模型。侧重于弄清楚go是如何实现epoll模型的用户代码部分的,以及go是如何将内核缓冲区到用户空间的复制过程接入到go本身的调度的。

先来段简单的代码。这段代码监听了本地8888端口,并调用Accept方法在有连接到来时返回一个net.TCPConn的引用(声明是tcp服务器的情况下)。这个结构体最终会包含这个新连接的对应socket。

func Run(){
    listen, err := net.Listen("tcp", "0.0.0.0:8888")
    if err != nil {
        fmt.Println(err)
        return
    }
    for  {
        accept, err := listen.Accept()
        if err != nil {
            fmt.Println(err)
            return
        }
        go handle(accept)
    }
}

func handle(conn net.Conn){
    data := make([]byte,1024)
    read, err := conn.Read(data)
    if err != nil {
        fmt.Println(err)
        return 
    }
    fmt.Println("read:", read)
    fmt.Println(data[:read])
    _, err = conn.Write([]byte("hello"))
    if err != nil {
        fmt.Println(err)
        return
    }

}

跟踪进这个方法,发现主要是个accept的调用

// Accept implements the Accept method in the Listener interface; it
// waits for the next call and returns a generic Conn.
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}  

继续跟踪 代码分为两部分

func (ln *TCPListener) accept() (*TCPConn, error) {
      //  第一部分返回一个netFD的引用(go对网络文件描述符的表示)
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
      // 第二部分通过文件描述符创建一个新的TCPConn对象。
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

继续跟踪

func (fd *netFD) accept() (netfd *netFD, err error) {
        // 这里d是个int类型 应该接近系统调用了
    d, rsa, errcall, err := fd.pfd.Accept()
    if err != nil {
        if errcall != "" {
            err = wrapSyscallError(errcall, err)
        }
        return nil, err
    }
        // 初始化新netFD结构体
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net); err != nil {
        poll.CloseFunc(d)
        return nil, err
    }
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    lsa, _ := syscall.Getsockname(netfd.pfd.Sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

继续往下 fd.pfd.Accept() 方法

// Accept wraps the accept network call.
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
       // 加锁
    if err := fd.readLock(); err != nil {
        return -1, nil, "", err
    }
    defer fd.readUnlock()
      // 顾名思义,这里先不管
    if err := fd.pd.prepareRead(fd.isFile); err != nil {
        return -1, nil, "", err
    }
      // 来到了最核心的地方
    for { 
                // 系统调用接受新连接,创建新socket
        s, rsa, errcall, err := accept(fd.Sysfd)
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EINTR:
            continue
                // 主要看这个错误,如果用户进程进行socket的read操作时内核还没有准备好数据,则会返回这个错误也就是说这里就是go runtime发挥作用的地方
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                                
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            // This means that a socket on the listen
            // queue was closed before we Accept()ed it;
            // it's a silly error, so try again.
            continue
        }
        return -1, nil, errcall, err
    }
}

继续 fd.pd.waitRead() 方法(TCPConn的Read方法最终也会走到这个函数)

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
        // 最终调用了link到runtime包的方法
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

runtime_pollWait对应代码如下

// poll_runtime_pollWait, which is internal/poll.runtime_pollWait,
// waits for a descriptor to be ready for reading or writing,
// according to mode, which is 'r' or 'w'.
// This returns an error code; the codes are defined above.
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // netpollcheckerr的作用先忽略
    errcode := netpollcheckerr(pd, int32(mode))
    if errcode != pollNoError {
        return errcode
    }
    // As for now only Solaris, illumos, and AIX use level-triggered IO.
    if GOOS == "solaris" || GOOS == "illumos" || GOOS == "aix" {
        netpollarm(pd, mode)
    }
       
    for !netpollblock(pd, int32(mode), false) {
              // 并发环境中常见的双重检查?
        errcode = netpollcheckerr(pd, int32(mode))
        if errcode != pollNoError {
            return errcode
        }
        // Can happen if timeout has fired and unblocked us,
        // but before we had a chance to run, timeout has been reset.
        // Pretend it has not happened and retry.
    }
    return pollNoError
}

既然是阻塞应该就是在netpollblock里面的逻辑了

// 一些注意事项
// returns true if IO is ready, or false if timedout or closed
// waitio - wait only for completed IO, ignore errors
// Concurrent calls to netpollblock in the same mode are forbidden, as pollDesc
// can hold only a single waiting goroutine for each mode.

// 这里主要注意gpp这个变量既作为要保存的阻塞协程的地址也作为了信号量在使用,在pollDesc结构体中有说明(rg uintptr // pdReady, pdWait, G waiting for read or nil)。有点丧心病狂。
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
        // 区分读写操作
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    // set the gpp semaphore to pdWait
    for {
        // Consume notification if already ready.
        if atomic.Casuintptr(gpp, pdReady, 0) {
            return true
        }
                // 一般来说能走到这都能跳出循环
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }

        // Double check that this isn't corrupt; otherwise we'd loop
        // forever.
        if v := atomic.Loaduintptr(gpp); v != pdReady && v != 0 {
            throw("runtime: double wait")
        }
    }

    // need to recheck error states after setting gpp to pdWait
    // this is necessary because runtime_pollUnblock/runtime_pollSetDeadline/deadlineimpl
    // do the opposite: store to closing/rd/wd, publishInfo, load of rg/wg
    if waitio || netpollcheckerr(pd, mode) == 0 {
                // 这里才是重头戏了,调用gopark把当前协程停住了。并且停住之后gopark会调用netpollblockcommit把当前协程的结构体g写到gpp(也就是pollDesc.wg或者pollDesc.rg)上,而每个pollDesc都会在netpoll的链表上。之后go的sysmon或者schedule就能从这里恢复运行协程。
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // be careful to not lose concurrent pdReady notification
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}
 
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
      // 设置协程地址到gpp
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

来看gopark逻辑

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    if reason != waitReasonSleep {
        checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
    }
    mp := acquirem()  // 锁定当前m,不被调度给其他g运行
    gp := mp.curg // 获取m上运行的g,在这个情境下就是发起读或者写的协程
    status := readgstatus(gp) 
    if status != _Grunning && status != _Gscanrunning {
        throw("gopark: bad g status")
    }
    mp.waitlock = lock // 也就是gpp
    mp.waitunlockf = unlockf // 是netpollblockcommit地址
    gp.waitreason = reason
    mp.waittraceev = traceEv
    mp.waittraceskip = traceskip
    releasem(mp) // 解锁
    // can't do anything that might move the G between Ms here.
    mcall(park_m) //切换到g0栈执行park_m
}

这里mcall会切换到系统栈执行park_m,并把调用他的g传给park_m。

// park continuation on g0.
func park_m(gp *g) {
    _g_ := getg() // _g_这个时候是g0了

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting) //修改传入的g状态为等待中
    dropg() // 双向解绑g和m

    if fn := _g_.m.waitunlockf; fn != nil {
        ok := fn(gp, _g_.m.waitlock) // netpollblockcommit执行
        _g_.m.waitunlockf = nil
        _g_.m.waitlock = nil
        if !ok {
            if trace.enabled {
                traceGoUnpark(gp, 2)
            }
            casgstatus(gp, _Gwaiting, _Grunnable)
            execute(gp, true) // Schedule it back, never returns.
        }
    }
    schedule() // 开启新一轮调度,永不返回
}

func dropg() {
    _g_ := getg()

    setMNoWB(&_g_.m.curg.m, nil)
    setGNoWB(&_g_.m.curg, nil)
}

至此从内核报syscall.EAGAIN错误开始到最终调用schedule()的链路打通了。go的runtime把本应该阻塞或者轮询的执行流(比如线程)换成了go自己协程的休眠。而协程的调度管理又是go语言的强大之处。

接下来分析如何唤醒协程,既然上面是让协程休眠了,那么可以猜测到在go的调度协程运行的地方会有唤醒因网络I/O阻塞的协程逻辑。在go的寻找可执行协程函数findrunnable我们能看见如下代码

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
              // 这里返回可执行的协程列表
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

其中主要会调用netpoll

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
    if epfd == -1 {
        return gList{}
    }
    var waitms int32
    if delay < 0 {
        waitms = -1
    } else if delay == 0 {  
              // 一般走到这里
        waitms = 0
    } else if delay < 1e6 {
        waitms = 1
    } else if delay < 1e15 {
        waitms = int32(delay / 1e6)
    } else {
        // An arbitrary cap on how long to wait for a timer.
        // 1e9 ms == ~11.5 days.
        waitms = 1e9
    }
    var events [128]epollevent // 为什么是128
retry:
      // 看起来是这里了
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        if n != -_EINTR {
            println("runtime: epollwait on fd", epfd, "failed with", -n)
            throw("runtime: netpoll failed")
        }
        // If a timed sleep was interrupted, just return to
        // recalculate how long we should sleep now.
        if waitms > 0 {
            return gList{}
        }
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }

        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            if ev.events != _EPOLLIN {
                println("runtime: netpoll: break fd ready for", ev.events)
                throw("runtime: netpoll: break fd ready for something unexpected")
            }
            if delay != 0 {
                // netpollBreak could be picked up by a
                // nonblocking poll. Only read the byte
                // if blocking.
                var tmp [16]byte
                read(int32(netpollBreakRd), noescape(unsafe.Pointer(&tmp[0])), int32(len(tmp)))
                atomic.Store(&netpollWakeSig, 0)
            }
            continue
        }
                // 设置读还是写
        var mode int32
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
                      // 还原pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.setEventErr(ev.events == _EPOLLERR)
                        // 取出之前保存的阻塞的g,添加到待运行列表
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}
// int32 runtime·epollwait(int32 epfd, EpollEvent *ev, int32 nev, int32 timeout);
TEXT runtime·epollwait(SB),NOSPLIT,$0
    // This uses pwait instead of wait, because Android O blocks wait.
        // 前四行都是取入参到对应寄存器
    MOVL    epfd+0(FP), DI 
    MOVQ    ev+8(FP), SI
    MOVL    nev+16(FP), DX
    MOVL    timeout+20(FP), R10
    MOVQ    $0, R8 // 这一步没明白
    MOVL    $SYS_epoll_pwait, AX // 系统调用号281对应就是epoll_pwait系统调用
    SYSCALL
    MOVL    AX, ret+24(FP) // 返回值赋值
    RET

那么epoll_ctl和epoll_create在哪里调用呢

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC) // 系统调用
    if epfd < 0 {
        epfd = epollcreate(1024) // 系统调用
        if epfd < 0 {
            println("runtime: epollcreate failed with", -epfd)
            throw("runtime: netpollinit failed")
        }
        closeonexec(epfd)
    }
    r, w, errno := nonblockingPipe()
    if errno != 0 {
        println("runtime: pipe failed with", -errno)
        throw("runtime: pipe failed")
    }
    ev := epollevent{
        events: _EPOLLIN,
    }
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    if errno != 0 {
        println("runtime: epollctl failed with", -errno)
        throw("runtime: epollctl failed")
    }
    netpollBreakRd = uintptr(r)
    netpollBreakWr = uintptr(w)
}

go使用netpollopen对epoll_ctl进行了封装,主要是在调用netFD的connect方法时触发。大部分情况下就是dial建立连接的时候。

func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 系统调用
}

最后值得一提的是go对epoll的接口的交互都是通过pollDesc结构体进行的。而pollDesc的内存空间是go语言负责分配管理(不受垃圾回收管理),不存在类似mmap的操作。

参考
https://blog.csdn.net/qu1993/article/details/111550425
Go netpoller 原生网络模型之源码全面揭秘 - Strike Freedom
https://zhuanlan.zhihu.com/p/464301587

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容