Go net (TCP Service) 的使用及源码分析

注:本文分析基于Go1.14

简单实例

一个简单的tcp server例子

package main

import (
    "context"
    "fmt"
    "net"
    "time"
)

func main() {
    listener, err := net.Listen("tcp", "0.0.0.0:7777")
    if err != nil {
        fmt.Println("Listen error:", err)
        return
    }
    defer listener.Close()

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    for {
        conn, err := listener.Accept()
        if err != nil {
            if ne, ok := err.(net.Error); ok && ne.Temporary() {
                fmt.Println("Accept temp error:", ne)
                time.Sleep(time.Second)
                continue
            }
            fmt.Println("Accept error:", err)
            break
        }
        go handleConn(ctx, conn)
    }
}

func handleConn(ctx context.Context, conn net.Conn) {
    defer conn.Close()

    buf := make([]byte, 1024)
    for {
        select {
        case <-ctx.Done():
            return
        default:
        }
        n, err := conn.Read(buf)
        if err != nil {
            fmt.Println("Read error:", err)
        }
        _, err = conn.Write(buf[:n])
        if err != nil {
            fmt.Println("Write error:", err)
        }
    }
}

完整实现

上面是简单的TCP Server使用示例,如果想获取完整的服务实现,可以使用或者参考github上的一个go net 开源项目

https://github.com/izhw/gnet

该项目使用go原生net package实现,不依赖其他第三方库,可以帮助开发者快速搭建一个net服务(TCP Server or Client)。
比如,开发者只要实现gnet.NetEventHandler事件回调(也可内置默认实现),使用可选的Functional options,就能快速搭建一个TCP Server,简单使用代码如下:

package main

import (
    "fmt"
    "log"

    "github.com/izhw/gnet"
    "github.com/izhw/gnet/tcp/tcpserver"
)

type ServerHandler struct {
    *gnet.NetEventHandler
}

func (h *ServerHandler) OnReadMsg(c gnet.Conn, data []byte) error {
    fmt.Println("read msg:", string(data))
    c.Write(data)
    return nil
}

func main() {
    s := tcpserver.NewServer("0.0.0.0:7777", &ServerHandler{})
    log.Fatal("Exit:", s.Serve())
}

Go net源码分析 (go1.14)

开始的简单示例中,我们看到几个关键的调用:net.Listen()listener.Accept()conn.Read()conn.Write,下面分别进行源码分析。

Listen

  • net.Listen返回的是实现了net.Listener接口的*TCPListener
  • 其中,Listen方法内,生成系统文件描述符sysfd,使用该sysfd设置参数、调用syscall.Bindsyscall.Listen完成绑定、监听,并初始化一些重要的结构信息,创建epoll句柄、注册epoll事件,然后构造返回TCPListener

调用链源码如下(只贴了关键代码)。

// `net/net.go`
// Listener接口
type Listener interface {
    // 阻塞等待,有新连接事件的时候,返回一个net.Conn
    Accept() (Conn, error)
    // 关闭listener,任何阻塞的Accept操作变为不阻塞,并返回错误
    Close() error
    // 返回listener的网络地址
    Addr() Addr
}

// `net/tcpsock.go`
type TCPListener struct {
    // Go封装的网络描述符,后面会具体讲
    fd *netFD
    // Listen配置
    lc ListenConfig
}

// `net/dial.go`
// 根据不同的 'network'和'address'构建相应的'Listener'
func Listen(network, address string) (Listener, error) {
    var lc ListenConfig
    return lc.Listen(context.Background(), network, address)
}

func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
    // resolve解析addrs
    // ...
    sl := &sysListener{
        ListenConfig: *lc,
        network:      network,
        address:      address,
    }
    // 根据不同的addrs类型调用不同的listen
    var l Listener
    la := addrs.first(isIPv4)
    switch la := la.(type) {
    case *TCPAddr:
        l, err = sl.listenTCP(ctx, la)
    case *UnixAddr:
        l, err = sl.listenUnix(ctx, la)
    default:
        // return error...
    }
    // return error...
    return l, nil
}

// `net/tcpsock_posix.go`
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
    // internetSocket内部解析AddrFamily并调用socket()
    // socket内部创建关键的结构对象netFD,并初始化绑定、监听等
    // 注意这里的syscall.SOCK_STREAM,后面会用到
    fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
    if err != nil {
        return nil, err
    }
    // 用上面创建并初始化好的fd构造TCPListener
    return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}

// `net/sock_posix.go`
// 返回一个初始化好的异步I/O `netFD` 网络描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
    // `sysSocket`内部调用syscall.Socket,
    // 并置为非阻塞和close-on-exec (Syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC),
    // 返回系统描述符: s
    s, err := sysSocket(family, sotype, proto)
    if err != nil {
        return nil, err
    }
    // ...
    // 用`sysSocket`创建的`s`创建`netFD`
    if fd, err = newFD(s, family, sotype, net); err != nil {
        poll.CloseFunc(s)
        return nil, err
    }
    // ...
    // 上面提到过,在listenTCP中传入的参数为syscall.SOCK_STREAM
    // 此处判断该sotype类型,调用fd.listenStream()
    if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
        fd.Close()
        return nil, err
    }
    // ...
    return fd, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
    var err error
    // 设置默认参数 SO_REUSEADDR
    if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
        return err
    }
    var lsa syscall.Sockaddr
    if lsa, err = laddr.sockaddr(fd.family); err != nil {
        return err
    }
    if ctrlFn != nil {
        // ...
    }
    // syscall.Bind 绑定
    if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
        return os.NewSyscallError("bind", err)
    }
    // syscall.Listen 监听
    if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
        return os.NewSyscallError("listen", err)
    }
    // netFD初始化,fd.init()->fd.pfd.Init()->fd.pfd.pd.init()`
    // 最终调用的是`runtime_pollServerInit`、`runtime_pollOpen`
    // netFD初始化部分后面会接着讲
    if err = fd.init(); err != nil {
        return err
    }
    lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
    fd.setAddr(fd.addrFunc()(lsa), nil)
    return nil
}

为了更好的理解,需要讲几个重要的struct:

  • netFD是网络描述符,其结构体中有一个poll.FD对象。
  • poll.FD是文件描述符,表示一个网络连接或者OS文件。
  • poll.FD结构中主要看Sysfd intpd pollDesc两个变量,前者是系统返回的文件描述符,后者其内部封装了运行时上下文,包括读、写goroutine及其状态,读写超时等基本信息。通过将pollDesc指针信息存入epollevent.data(8字节数组)中,然后调用epollctl(epoll_ctl)将fdepollevent信息注册到epoll实例上,实现epoll事件回调和用户态协程调用的关联。

相关结构源码如下:

// `net/fd_unix.go`
type netFD struct {
    pfd poll.FD

    // Close前不可变
    family      int
    sotype      int
    isConnected bool // handshake completed or use of association with peer
    net         string
    laddr       Addr
    raddr       Addr
}

// `internal/poll/fd_unix.go`
type FD struct {
    // sysfd和Read/Write方法锁
    fdmu fdMutex
    // 系统文件描述符
    Sysfd int
    // I/O poller,封装了运行时上下文
    pd pollDesc
    // Writev 缓存.
    iovecs *[]syscall.Iovec
    // 文件关闭的时候发送信号量
    csema uint32
    // 非阻塞模式时非零
    isBlocking uint32
    // 是streaming还是packet-based UDP
    IsStream bool
    // 读到零字节是否表示EOF,对于基于消息的套接字连接为false
    ZeroReadIsEOF bool
    // 是一个file而并非network socket
    isFile bool
}

// internal/poll/fd_poll_runtime.go
// `pollDesc`结构中只有一个`uintptr`变量,`runtimeCtx`封装了运行时上下文,其具体信息后面会讲
type pollDesc struct {
    runtimeCtx uintptr
}

接着看netFD初始化源码:

// `net/fd_unix.go`
//  netFD初始化
func (fd *netFD) init() error {
    // 调用 pfd (poll.FD) 的Init方法
    return fd.pfd.Init(fd.net, true)
}

// `internal/poll/fd_unix.go`
// FD初始化
func (fd *FD) Init(net string, pollable bool) error {
    // ...
    // 调用 pd (pollDesc) 的init方法
    err := fd.pd.init(fd)
    // ...
    return err
}

// internal/poll/fd_poll_runtime.go
// serverInit全局变量,只执行一次runtime_pollServerInit,
// 并在其内部调用runtime.netpollinit()创建epoll实例;
// runtime_pollOpen内部调用runtime.netpollopen,
// 将listener fd注册到epoll实例中,初始化pollDesc并返回ctx,赋值runtimeCtx
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
    serverInit.Do(runtime_pollServerInit)
    ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
    if errno != 0 {
        if ctx != 0 {
            runtime_pollUnblock(ctx)
            runtime_pollClose(ctx)
        }
        return errnoErr(syscall.Errno(errno))
    }
    pd.runtimeCtx = ctx
    return nil
}

上面pollDescinit()方法中,runtime_pollServerInitruntime_pollOpen实际link的是runtime包下的poll_runtime_pollServerInitpoll_runtime_pollOpen函数,具体实现在runtime/netpoll.go中。
首先,看一下pollDesc在runtime包下的具体封装信息:

// `runtime/netpoll.go`
// Network poller descriptor.
// No heap pointers.
//go:notinheap
type pollDesc struct {
    link *pollDesc // in pollcache, protected by pollcache.lock

    lock    mutex // protects the following fields
    fd      uintptr
    closing bool
    everr   bool    // marks event scanning error happened
    user    uint32  // user settable cookie
    rseq    uintptr // protects from stale read timers
    rg      uintptr // pdReady, pdWait, G waiting for read or nil
    rt      timer   // read deadline timer (set if rt.f != nil)
    rd      int64   // read deadline
    wseq    uintptr // protects from stale write timers
    wg      uintptr // pdReady, pdWait, G waiting for write or nil
    wt      timer   // write deadline timer
    wd      int64   // write deadline
}

pollDesc结构中的重要变量:

  • lock锁 防止内部成员变量并发读写问题
  • fd为文件描述符
  • rtwt分别表示读写定时器,用来防止读写超时
  • rgwg分别保存了用户态操作pollDesc的读、写goroutine地址,以及goroutine的ready/wait状态,用于goroutine读写阻塞时挂起、就绪时恢复运行

接着看一下runtime包下面的InitOpenInit全局只初始化一次。

// `runtime/netpoll.go`
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
    netpollGenericInit()
}

func netpollGenericInit() {
    // 判断netpoll是否已经初始化过
    if atomic.Load(&netpollInited) == 0 {
        // 全局锁
        lock(&netpollInitLock)
        if netpollInited == 0 {
            // 调用netpollinit()
            netpollinit()
            atomic.Store(&netpollInited, 1)
        }
        unlock(&netpollInitLock)
    }
}

type epollevent struct {
    // 事件
    events uint32
    data   [8]byte // unaligned uintptr
}

// `runtime/netpoll_epoll.go`
func netpollinit() {
    // 调用 OS epoll_create,创建一个epoll实例,
    // 把生成的epoll fd赋值给全局变量 `epfd`
    // 后续listener以及accept的所有sockets相关的epoll操作都是基于这个`epfd`
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    // ...
    ev := epollevent{
        // 读事件
        events: _EPOLLIN,
    }
    // netpollBreakRd: for netpollBreak
    // 在后面有事件回调时会用到,判断是否为netpollBreakRd
    *(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
    // 系统调用 epoll_ctl,注册epoll事件
    errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
    // ...
}

接着看Open

//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
    // pollcache为全局pollDesc链表缓存,调用 alloc()获取一个*pollDesc
    pd := pollcache.alloc()
    lock(&pd.lock)
    // 锁住,初始化、赋值成员变量
    // ...
    unlock(&pd.lock)

    var errno int32
    // 调用 netpollopen(),实现见下面
    errno = netpollopen(fd, pd)
    return pd, int(errno)
}

// alloc 如果链表头`first`为空,则分配内存并初始化n个`*pollDesc`节点链表,然后pop出头节点;
// 如果`first`不为空则直接pop出头部节点。
func (c *pollCache) alloc() *pollDesc {
    lock(&c.lock)
    if c.first == nil {
        const pdSize = unsafe.Sizeof(pollDesc{})
        n := pollBlockSize / pdSize
        if n == 0 {
            n = 1
        }
        // Must be in non-GC memory because can be referenced
        // only from epoll/kqueue internals.
        mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
        for i := uintptr(0); i < n; i++ {
            pd := (*pollDesc)(add(mem, i*pdSize))
            pd.link = c.first
            c.first = pd
        }
    }
    pd := c.first
    c.first = pd.link
    unlock(&c.lock)
    return pd
}

// netpollopen
func netpollopen(fd uintptr, pd *pollDesc) int32 {
    var ev epollevent
    // 触发事件,读、写、挂起关闭、边缘触发
    ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
    *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
    // 调用`epollctl`注册fd到epoll实例
    // 同时把`*pollDesc`保存到`epollevent.data`里传入内核
    // 实现内核态事件和用户态协程的关联
    return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

从上面的源码可以看到,Listen实现了TCP Server的绑定、监听,通过调用epoll_createepoll_ctl 创建epoll句柄、注册epoll事件,并将goroutine信息与回调事件相关联。

Accept

listener.AcceptListener的接口方法,TCPListener实现了该方法,它阻塞等待下一次调用并返回一个Conn。

// `net/tcpsock.go`
func (l *TCPListener) Accept() (Conn, error) {
    if !l.ok() {
        return nil, syscall.EINVAL
    }
    c, err := l.accept()
    if err != nil {
        return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
    }
    return c, nil
}

// `net/tcpsock_posix.go`
func (ln *TCPListener) accept() (*TCPConn, error) {
    // 关注1:
    fd, err := ln.fd.accept()
    if err != nil {
        return nil, err
    }
    // 关注2:
    tc := newTCPConn(fd)
    if ln.lc.KeepAlive >= 0 {
        setKeepAlive(fd, true)
        ka := ln.lc.KeepAlive
        if ln.lc.KeepAlive == 0 {
            ka = defaultTCPKeepAlive
        }
        setKeepAlivePeriod(fd, ka)
    }
    return tc, nil
}

// `net/fd_unix.go`
func (fd *netFD) accept() (netfd *netFD, err error) {
    // 调用`poll.FD`的`Accept`方法接受新的socket连接,返回socket的fd
    d, rsa, errcall, err := fd.pfd.Accept()
    // ...
    // 用上面返回的fd(d)创建一个netFD
    if netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
    // ...
    // 调用`netFD`的`init`方法完成`pollDesc`初始化,并将事件加入epoll实例
    if err = netfd.init(); err != nil {
        netfd.Close()
        return nil, err
    }
    // ...
    return netfd, nil
}

// `internal/poll/fd_unix.go`
// Accept 封装了accept网络调用
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
    // ...
    for {
        // 其内调用syscall.Accept4/syscall.Accept,设置为syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC
        s, rsa, errcall, err := accept(fd.Sysfd)
        // listener fd在创建的时候设置为非阻塞模式,accept()会立即返回,
        // 判断err,为nil则说明有新连接事件,直接return
        if err == nil {
            return s, rsa, "", err
        }
        switch err {
        case syscall.EAGAIN:
            if fd.pd.pollable() {
                // 如果err为syscall.EAGAIN,并且pollDesc的runtimeCtx不为空,则调用pollDesc.waitRead,
                // 其中调用了`runtime_pollWait`,实际连接调用的是`runtime.poll_runtime_pollWait`
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
        case syscall.ECONNABORTED:
            continue
        }
        return -1, nil, errcall, err
    }
}

func (pd *pollDesc) waitRead(isFile bool) error {
    return pd.wait('r', isFile)
}

func (pd *pollDesc) waitWrite(isFile bool) error {
    return pd.wait('w', isFile)
}

func (pd *pollDesc) wait(mode int, isFile bool) error {
    if pd.runtimeCtx == 0 {
        return errors.New("waiting for unsupported file type")
    }
    res := runtime_pollWait(pd.runtimeCtx, mode)
    return convertErr(res, isFile)
}

  • Accept接收客户端连接请求建立新连接,通过netFDaccept()创建系统fd,将socket设置为非阻塞I/O模式。
  • 后面逻辑与前面讲的一样,创建并初始化netFD,其内完成pollDesc初始化、调用runtime.netpollopenfdepollevent添加到epoll实例。
  • 因为是非阻塞模式,当accept()返回err为syscall.EAGAIN时,若pollDescruntimeCtx不为空,则调用pollDesc.waitRead,其中调用了runtime_pollWait
  • runtime_pollWait实际link的是runtime.poll_runtime_pollWait,其中调用netpollblock,源码如下:
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
    // netpollcheckerr and check GOOS
    ///...
    // for循环等待 IO ready
    for !netpollblock(pd, int32(mode), false) {
        err = netpollcheckerr(pd, int32(mode))
        if err != 0 {
            return err
        }
    }
    return 0
}

// IO reday返回true,超时或者关闭返回false
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
    // gpp根据mode的值取rg或者wg,后面调用gopark时,会把当前的goroutine的g结构指针存入gpp
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }
    // set the gpp semaphore to WAIT
    // 判断状态为 pdReady 则 return, 否则设置为 pdWait
    for {
        old := *gpp
        // old == pdReady 表示此时已有 I/O 事件发生,
        // 直接返回 unblock 当前 goroutine 并执行相应的 I/O 操作
        if old == pdReady {
            *gpp = 0
            return true
        }
        if old != 0 {
            throw("runtime: double wait")
        }
        // CAS原子操作将gpp置为`pdWait`
        if atomic.Casuintptr(gpp, 0, pdWait) {
            break
        }
    }

    // recheck error states
    if waitio || netpollcheckerr(pd, mode) == 0 {
        // 防止此时可能会有其他的并发操作修改pd里的内容,所以需要再次检查错误状态。
        // gopark将当前goroutine置于等待状态并等待下一次的调度
        // `netpollblockcommit`回调函数在gopark内部回调时,CAS将当前goroutine指针存到传入的参数`gpp`
        gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
    }
    // 通过原子操作将gpp的值设置为0,返回修改前的值并判断是否pdReady
    old := atomic.Xchguintptr(gpp, 0)
    if old > pdWait {
        throw("runtime: corrupted polldesc")
    }
    return old == pdReady
}

func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
    r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
    if r {
        // Bump the count of goroutines waiting for the poller.
        // The scheduler uses this to decide whether to block
        // waiting for the poller if there is nothing else to do.
        atomic.Xadd(&netpollWaiters, 1)
    }
    return r
}

关于Go的调度,此处不深入,简单讲一下。
gopark调用mcall(park_m)mcall是通过汇编实现的,其函数原型及作用为:

func mcall(fn func(*g))
  • 从当前g栈切换到g0栈;
  • 在g0栈上执行函数fn(g),此处为park_m
  • 保存当前g的信息,将PC/SP存储到g.sched中,当被重新调度时能够获取相关信息继续执行。

park_m()中将当前goroutine状态由_GrunningCAS为_Gwaiting、与当前的m解绑,并回调netpollblockcommitgr/gwpdWaitCAS为goroutine指针,然后调用schedule()
schedule函数永远不会返回,其调用逻辑为:schedule() -> execute() -> gogo() -> goroutine 任务 -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()。
当goroutine对应的fd上发生期望的事件时,它就会被重新调度,从g.sched中获取之前保存的信息,继续执行后面的逻辑,此时gpppdReady状态。

Read、Write

TCPListeneraccept()中创建并初始化netFD后,会调用newTCPConn()创建并返回*TCPConn,它实现了net.Conn接口,我们主要看ReadWrite方法。
Read调用链源码:

// `net/tcpsock.go`
func newTCPConn(fd *netFD) *TCPConn {
    c := &TCPConn{conn{fd}}
    setNoDelay(c.fd, true)
    return c
}

type TCPConn struct {
    conn
}

// `net/net.go`
type conn struct {
    fd *netFD
}

// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    n, err := c.fd.Read(b)
    if err != nil && err != io.EOF {
        err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
    }
    return n, err
}

// `net/fd_unix.go`
func (fd *netFD) Read(p []byte) (n int, err error) {
    n, err = fd.pfd.Read(p)
    runtime.KeepAlive(fd)
    return n, wrapSyscallError("read", err)
}

// `internal/poll/fd_unix.go`
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
    // ...
    for {
        // syscall.Read系统调用从socket读取数据,因为 socket在被accept的时候设置为非阻塞 I/O,不会阻塞
        n, err := syscall.Read(fd.Sysfd, p)
        if err != nil {
            n = 0
            if err == syscall.EAGAIN && fd.pd.pollable() {
                // 当err为syscall.EAGAIN,调用waitRead,
                // 从上面的分析知道,其内通过gopark将当前goroutine挂起
                if err = fd.pd.waitRead(fd.isFile); err == nil {
                    continue
                }
            }
            if runtime.GOOS == "darwin" && err == syscall.EINTR {
                continue
            }
        }
        err = fd.eofError(n, err)
        return n, err
    }
}

conn.Writeconn.Read逻辑类似,通过FD.Write调用syscall.Write,因为为非阻塞 I/O,如果返回err为syscall.EAGAIN,也会类似Read,调用pollDesc.waitWrite
执行runtime_pollWait->netpollblock,gopark住当前goroutine,直到有事件发生被重新调度。

netpoll

通过前面的分析,我们了解了Go通过gopark住 goroutine 达到阻塞 Accept/Read/Write 的效果。现在会有两个疑问:

  1. 当相应的 I/O 事件发生之后,如何唤醒这些gopark住的goroutine从而继续调度执行呢?
  2. 我们前面讲到了跟epoll相关的两个调用epoll_createepoll_ctl,还有一个重要的epoll_wait在哪里调用的呢?

通过源码,可以发现,在runtime/netpoll_epoll.go中有一个netpoll()方法,它内部调用 epollwait(epoll_wait) 获取就绪的fd事件epollevent列表,然后将每个epollevent.data值取出转化为*pollDesc,并调用netpollready->netpollunblock, 将rg/wg的状态转化为pdReady(ioready),同时将读、写g指针添加到goroutine列表gList返回。
相关源码如下:

// runtime/proc.go
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
    head guintptr
}

// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
    // epfd、delay判断waitms赋值
    // ...
    var events [128]epollevent
retry:
    // 获取就绪的fd事件列表
    n := epollwait(epfd, &events[0], int32(len(events)), waitms)
    if n < 0 {
        // ...
        goto retry
    }
    var toRun gList
    for i := int32(0); i < n; i++ {
        ev := &events[i]
        if ev.events == 0 {
            continue
        }
        // 判断是否为netpollinit注册epoll实例时设置的netpollBreakRd
        if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
            // ...
            continue
        }

        var mode int32
        // 读事件
        if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'r'
        }
        // 写事件
        if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
            mode += 'w'
        }
        if mode != 0 {
            // 取出保存在epollevent.data中的pollDesc
            pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
            pd.everr = false
            if ev.events == _EPOLLERR {
                pd.everr = true
            }
            // 调用netpollready,传入就绪fd的pollDesc
            netpollready(&toRun, pd, mode)
        }
    }
    return toRun
}

// netpollready 调用netpollunblock,把pollDesc中相应的可读、写goroutine取出
// 并将pollDesc.rg/wg转换状态为pdReady,然后将取出的goroutine push到链表 toRun 中
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
    var rg, wg *g
    if mode == 'r' || mode == 'r'+'w' {
        rg = netpollunblock(pd, 'r', true)
    }
    if mode == 'w' || mode == 'r'+'w' {
        wg = netpollunblock(pd, 'w', true)
    }
    if rg != nil {
        toRun.push(rg)
    }
    if wg != nil {
        toRun.push(wg)
    }
}

func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
    gpp := &pd.rg
    if mode == 'w' {
        gpp = &pd.wg
    }

    for {
        old := *gpp
        if old == pdReady {
            return nil
        }
        if old == 0 && !ioready {
            // Only set READY for ioready. runtime_pollWait
            // will check for timeout/cancel before waiting.
            return nil
        }
        var new uintptr
        if ioready {
            new = pdReady
        }
        // gpp CAS 为 pdReady
        if atomic.Casuintptr(gpp, old, new) {
            if old == pdReady || old == pdWait {
                old = 0
            }
            // 将之前存入pollDesc的 g结构指针 old 转换为 *g
            return (*g)(unsafe.Pointer(old))
        }
    }
}

我们看到Go 在netpoll()中获取触发读写事件的goroutine列表,而netpoll()会在多处被调用(runtime/proc.go):

  • startTheWorldWithSema()
    StartTheWorld时会调用netpoll获取gList进行调度
  • findrunnable()
    该方法会在Go scheduler的核心方法schedule()中被调用,从而调用netpoll获取gList
  • pollWork()
    后台工作循环(比如idle GC)检查时会调用netpoll获取gList
  • sysmon()
    在程序启动时调用,不需要P,使用独立的M作为监控线程,sysmon每 20us~10ms运行一次,调用netpoll获取gList

当上面这些调用获取gList后,都会调用injectglist()方法(findrunnable()中会先pop出一个g,将g状态由_GwaitingCAS为_Grunnable,然后再调用injectglist()),injectglist方法会将gList中的g的状态由_GwaitingCAS为_Grunnable,然后放入全局运行队列(globrunqput(gp)),从而被重新调度,当goroutine被重新调度时,会从g.sched中取出PC/SP信息,继续执行之前的逻辑。

// Injects the list of runnable G's into the scheduler and clears glist.
// Can run concurrently with GC.
func injectglist(glist *gList) {
    if glist.empty() {
        return
    }
    if trace.enabled {
        for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
            traceGoUnpark(gp, 0)
        }
    }
    lock(&sched.lock)
    var n int
    // 从glist中循环取出gp
    for n = 0; !glist.empty(); n++ {
        gp := glist.pop()
        // 由 _Gwaiting 变为 _Grunnable
        casgstatus(gp, _Gwaiting, _Grunnable)
        // 放入全局运行队列
        globrunqput(gp)
    }
    unlock(&sched.lock)
    for ; n != 0 && sched.npidle != 0; n-- {
        // 循环获取空闲P,并调度 M 去运行 P
        startm(nil, false)
    }
    *glist = gList{}
}

Go基于epoll/kqueue/iocp和自身的运行时调度机制,实现了自己的I/O多路复用netpoll网络模型,从上面源码可以看到,Accept/Read/Write等方法其底层实现均采用非阻塞方式,而我们在开发过程中调用的方式很显然是同步模式,这就大大降低了网络开发难度,从这也可以看出Go语言设计的初衷之一:简单而高效。

至此,本文关于Go net(TCP service)相关内容就介绍完了,总结一下涉及的内容:

  • TCP Server的简单使用示例
  • 简单介绍了一个可使用(或作为参考)的、基于Go原生net package实现的go net项目(github开源项目
  • 主要进行了 netpoll相关源码的分析(基于Go1.14)
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

友情链接更多精彩内容