注:本文分析基于Go1.14
简单实例
一个简单的tcp server例子
package main
import (
"context"
"fmt"
"net"
"time"
)
func main() {
listener, err := net.Listen("tcp", "0.0.0.0:7777")
if err != nil {
fmt.Println("Listen error:", err)
return
}
defer listener.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for {
conn, err := listener.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
fmt.Println("Accept temp error:", ne)
time.Sleep(time.Second)
continue
}
fmt.Println("Accept error:", err)
break
}
go handleConn(ctx, conn)
}
}
func handleConn(ctx context.Context, conn net.Conn) {
defer conn.Close()
buf := make([]byte, 1024)
for {
select {
case <-ctx.Done():
return
default:
}
n, err := conn.Read(buf)
if err != nil {
fmt.Println("Read error:", err)
}
_, err = conn.Write(buf[:n])
if err != nil {
fmt.Println("Write error:", err)
}
}
}
完整实现
上面是简单的TCP Server使用示例,如果想获取完整的服务实现,可以使用或者参考github上的一个go net 开源项目:
https://github.com/izhw/gnet
该项目使用go原生net package实现,不依赖其他第三方库,可以帮助开发者快速搭建一个net服务(TCP Server or Client)。
比如,开发者只要实现gnet.NetEventHandler事件回调(也可内置默认实现),使用可选的Functional options,就能快速搭建一个TCP Server,简单使用代码如下:
package main
import (
"fmt"
"log"
"github.com/izhw/gnet"
"github.com/izhw/gnet/tcp/tcpserver"
)
type ServerHandler struct {
*gnet.NetEventHandler
}
func (h *ServerHandler) OnReadMsg(c gnet.Conn, data []byte) error {
fmt.Println("read msg:", string(data))
c.Write(data)
return nil
}
func main() {
s := tcpserver.NewServer("0.0.0.0:7777", &ServerHandler{})
log.Fatal("Exit:", s.Serve())
}
Go net源码分析 (go1.14)
开始的简单示例中,我们看到几个关键的调用:net.Listen()、listener.Accept()、conn.Read()、conn.Write,下面分别进行源码分析。
Listen
-
net.Listen返回的是实现了net.Listener接口的*TCPListener。 - 其中,
Listen方法内,生成系统文件描述符sysfd,使用该sysfd设置参数、调用syscall.Bind、syscall.Listen完成绑定、监听,并初始化一些重要的结构信息,创建epoll句柄、注册epoll事件,然后构造返回TCPListener。
调用链源码如下(只贴了关键代码)。
// `net/net.go`
// Listener接口
type Listener interface {
// 阻塞等待,有新连接事件的时候,返回一个net.Conn
Accept() (Conn, error)
// 关闭listener,任何阻塞的Accept操作变为不阻塞,并返回错误
Close() error
// 返回listener的网络地址
Addr() Addr
}
// `net/tcpsock.go`
type TCPListener struct {
// Go封装的网络描述符,后面会具体讲
fd *netFD
// Listen配置
lc ListenConfig
}
// `net/dial.go`
// 根据不同的 'network'和'address'构建相应的'Listener'
func Listen(network, address string) (Listener, error) {
var lc ListenConfig
return lc.Listen(context.Background(), network, address)
}
func (lc *ListenConfig) Listen(ctx context.Context, network, address string) (Listener, error) {
// resolve解析addrs
// ...
sl := &sysListener{
ListenConfig: *lc,
network: network,
address: address,
}
// 根据不同的addrs类型调用不同的listen
var l Listener
la := addrs.first(isIPv4)
switch la := la.(type) {
case *TCPAddr:
l, err = sl.listenTCP(ctx, la)
case *UnixAddr:
l, err = sl.listenUnix(ctx, la)
default:
// return error...
}
// return error...
return l, nil
}
// `net/tcpsock_posix.go`
func (sl *sysListener) listenTCP(ctx context.Context, laddr *TCPAddr) (*TCPListener, error) {
// internetSocket内部解析AddrFamily并调用socket()
// socket内部创建关键的结构对象netFD,并初始化绑定、监听等
// 注意这里的syscall.SOCK_STREAM,后面会用到
fd, err := internetSocket(ctx, sl.network, laddr, nil, syscall.SOCK_STREAM, 0, "listen", sl.ListenConfig.Control)
if err != nil {
return nil, err
}
// 用上面创建并初始化好的fd构造TCPListener
return &TCPListener{fd: fd, lc: sl.ListenConfig}, nil
}
// `net/sock_posix.go`
// 返回一个初始化好的异步I/O `netFD` 网络描述符
func socket(ctx context.Context, net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, ctrlFn func(string, string, syscall.RawConn) error) (fd *netFD, err error) {
// `sysSocket`内部调用syscall.Socket,
// 并置为非阻塞和close-on-exec (Syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC),
// 返回系统描述符: s
s, err := sysSocket(family, sotype, proto)
if err != nil {
return nil, err
}
// ...
// 用`sysSocket`创建的`s`创建`netFD`
if fd, err = newFD(s, family, sotype, net); err != nil {
poll.CloseFunc(s)
return nil, err
}
// ...
// 上面提到过,在listenTCP中传入的参数为syscall.SOCK_STREAM
// 此处判断该sotype类型,调用fd.listenStream()
if err := fd.listenStream(laddr, listenerBacklog(), ctrlFn); err != nil {
fd.Close()
return nil, err
}
// ...
return fd, nil
}
func (fd *netFD) listenStream(laddr sockaddr, backlog int, ctrlFn func(string, string, syscall.RawConn) error) error {
var err error
// 设置默认参数 SO_REUSEADDR
if err = setDefaultListenerSockopts(fd.pfd.Sysfd); err != nil {
return err
}
var lsa syscall.Sockaddr
if lsa, err = laddr.sockaddr(fd.family); err != nil {
return err
}
if ctrlFn != nil {
// ...
}
// syscall.Bind 绑定
if err = syscall.Bind(fd.pfd.Sysfd, lsa); err != nil {
return os.NewSyscallError("bind", err)
}
// syscall.Listen 监听
if err = listenFunc(fd.pfd.Sysfd, backlog); err != nil {
return os.NewSyscallError("listen", err)
}
// netFD初始化,fd.init()->fd.pfd.Init()->fd.pfd.pd.init()`
// 最终调用的是`runtime_pollServerInit`、`runtime_pollOpen`
// netFD初始化部分后面会接着讲
if err = fd.init(); err != nil {
return err
}
lsa, _ = syscall.Getsockname(fd.pfd.Sysfd)
fd.setAddr(fd.addrFunc()(lsa), nil)
return nil
}
为了更好的理解,需要讲几个重要的struct:
-
netFD是网络描述符,其结构体中有一个poll.FD对象。 -
poll.FD是文件描述符,表示一个网络连接或者OS文件。 -
poll.FD结构中主要看Sysfd int和pd pollDesc两个变量,前者是系统返回的文件描述符,后者其内部封装了运行时上下文,包括读、写goroutine及其状态,读写超时等基本信息。通过将pollDesc指针信息存入epollevent.data(8字节数组)中,然后调用epollctl(epoll_ctl)将fd和epollevent信息注册到epoll实例上,实现epoll事件回调和用户态协程调用的关联。
相关结构源码如下:
// `net/fd_unix.go`
type netFD struct {
pfd poll.FD
// Close前不可变
family int
sotype int
isConnected bool // handshake completed or use of association with peer
net string
laddr Addr
raddr Addr
}
// `internal/poll/fd_unix.go`
type FD struct {
// sysfd和Read/Write方法锁
fdmu fdMutex
// 系统文件描述符
Sysfd int
// I/O poller,封装了运行时上下文
pd pollDesc
// Writev 缓存.
iovecs *[]syscall.Iovec
// 文件关闭的时候发送信号量
csema uint32
// 非阻塞模式时非零
isBlocking uint32
// 是streaming还是packet-based UDP
IsStream bool
// 读到零字节是否表示EOF,对于基于消息的套接字连接为false
ZeroReadIsEOF bool
// 是一个file而并非network socket
isFile bool
}
// internal/poll/fd_poll_runtime.go
// `pollDesc`结构中只有一个`uintptr`变量,`runtimeCtx`封装了运行时上下文,其具体信息后面会讲
type pollDesc struct {
runtimeCtx uintptr
}
接着看netFD初始化源码:
// `net/fd_unix.go`
// netFD初始化
func (fd *netFD) init() error {
// 调用 pfd (poll.FD) 的Init方法
return fd.pfd.Init(fd.net, true)
}
// `internal/poll/fd_unix.go`
// FD初始化
func (fd *FD) Init(net string, pollable bool) error {
// ...
// 调用 pd (pollDesc) 的init方法
err := fd.pd.init(fd)
// ...
return err
}
// internal/poll/fd_poll_runtime.go
// serverInit全局变量,只执行一次runtime_pollServerInit,
// 并在其内部调用runtime.netpollinit()创建epoll实例;
// runtime_pollOpen内部调用runtime.netpollopen,
// 将listener fd注册到epoll实例中,初始化pollDesc并返回ctx,赋值runtimeCtx
var serverInit sync.Once
func (pd *pollDesc) init(fd *FD) error {
serverInit.Do(runtime_pollServerInit)
ctx, errno := runtime_pollOpen(uintptr(fd.Sysfd))
if errno != 0 {
if ctx != 0 {
runtime_pollUnblock(ctx)
runtime_pollClose(ctx)
}
return errnoErr(syscall.Errno(errno))
}
pd.runtimeCtx = ctx
return nil
}
上面pollDesc的init()方法中,runtime_pollServerInit和runtime_pollOpen实际link的是runtime包下的poll_runtime_pollServerInit和poll_runtime_pollOpen函数,具体实现在runtime/netpoll.go中。
首先,看一下pollDesc在runtime包下的具体封装信息:
// `runtime/netpoll.go`
// Network poller descriptor.
// No heap pointers.
//go:notinheap
type pollDesc struct {
link *pollDesc // in pollcache, protected by pollcache.lock
lock mutex // protects the following fields
fd uintptr
closing bool
everr bool // marks event scanning error happened
user uint32 // user settable cookie
rseq uintptr // protects from stale read timers
rg uintptr // pdReady, pdWait, G waiting for read or nil
rt timer // read deadline timer (set if rt.f != nil)
rd int64 // read deadline
wseq uintptr // protects from stale write timers
wg uintptr // pdReady, pdWait, G waiting for write or nil
wt timer // write deadline timer
wd int64 // write deadline
}
pollDesc结构中的重要变量:
-
lock锁 防止内部成员变量并发读写问题 -
fd为文件描述符 -
rt和wt分别表示读写定时器,用来防止读写超时 -
rg和wg分别保存了用户态操作pollDesc的读、写goroutine地址,以及goroutine的ready/wait状态,用于goroutine读写阻塞时挂起、就绪时恢复运行
接着看一下runtime包下面的Init和Open,Init全局只初始化一次。
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
func poll_runtime_pollServerInit() {
netpollGenericInit()
}
func netpollGenericInit() {
// 判断netpoll是否已经初始化过
if atomic.Load(&netpollInited) == 0 {
// 全局锁
lock(&netpollInitLock)
if netpollInited == 0 {
// 调用netpollinit()
netpollinit()
atomic.Store(&netpollInited, 1)
}
unlock(&netpollInitLock)
}
}
type epollevent struct {
// 事件
events uint32
data [8]byte // unaligned uintptr
}
// `runtime/netpoll_epoll.go`
func netpollinit() {
// 调用 OS epoll_create,创建一个epoll实例,
// 把生成的epoll fd赋值给全局变量 `epfd`
// 后续listener以及accept的所有sockets相关的epoll操作都是基于这个`epfd`
epfd = epollcreate1(_EPOLL_CLOEXEC)
// ...
ev := epollevent{
// 读事件
events: _EPOLLIN,
}
// netpollBreakRd: for netpollBreak
// 在后面有事件回调时会用到,判断是否为netpollBreakRd
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
// 系统调用 epoll_ctl,注册epoll事件
errno = epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
// ...
}
接着看Open
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
// pollcache为全局pollDesc链表缓存,调用 alloc()获取一个*pollDesc
pd := pollcache.alloc()
lock(&pd.lock)
// 锁住,初始化、赋值成员变量
// ...
unlock(&pd.lock)
var errno int32
// 调用 netpollopen(),实现见下面
errno = netpollopen(fd, pd)
return pd, int(errno)
}
// alloc 如果链表头`first`为空,则分配内存并初始化n个`*pollDesc`节点链表,然后pop出头节点;
// 如果`first`不为空则直接pop出头部节点。
func (c *pollCache) alloc() *pollDesc {
lock(&c.lock)
if c.first == nil {
const pdSize = unsafe.Sizeof(pollDesc{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
// Must be in non-GC memory because can be referenced
// only from epoll/kqueue internals.
mem := persistentalloc(n*pdSize, 0, &memstats.other_sys)
for i := uintptr(0); i < n; i++ {
pd := (*pollDesc)(add(mem, i*pdSize))
pd.link = c.first
c.first = pd
}
}
pd := c.first
c.first = pd.link
unlock(&c.lock)
return pd
}
// netpollopen
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
// 触发事件,读、写、挂起关闭、边缘触发
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
// 调用`epollctl`注册fd到epoll实例
// 同时把`*pollDesc`保存到`epollevent.data`里传入内核
// 实现内核态事件和用户态协程的关联
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}
从上面的源码可以看到,Listen实现了TCP Server的绑定、监听,通过调用epoll_create、epoll_ctl 创建epoll句柄、注册epoll事件,并将goroutine信息与回调事件相关联。
Accept
listener.Accept是Listener的接口方法,TCPListener实现了该方法,它阻塞等待下一次调用并返回一个Conn。
// `net/tcpsock.go`
func (l *TCPListener) Accept() (Conn, error) {
if !l.ok() {
return nil, syscall.EINVAL
}
c, err := l.accept()
if err != nil {
return nil, &OpError{Op: "accept", Net: l.fd.net, Source: nil, Addr: l.fd.laddr, Err: err}
}
return c, nil
}
// `net/tcpsock_posix.go`
func (ln *TCPListener) accept() (*TCPConn, error) {
// 关注1:
fd, err := ln.fd.accept()
if err != nil {
return nil, err
}
// 关注2:
tc := newTCPConn(fd)
if ln.lc.KeepAlive >= 0 {
setKeepAlive(fd, true)
ka := ln.lc.KeepAlive
if ln.lc.KeepAlive == 0 {
ka = defaultTCPKeepAlive
}
setKeepAlivePeriod(fd, ka)
}
return tc, nil
}
// `net/fd_unix.go`
func (fd *netFD) accept() (netfd *netFD, err error) {
// 调用`poll.FD`的`Accept`方法接受新的socket连接,返回socket的fd
d, rsa, errcall, err := fd.pfd.Accept()
// ...
// 用上面返回的fd(d)创建一个netFD
if netfd, err = newFD(d, fd.family, fd.sotype, fd.net);
// ...
// 调用`netFD`的`init`方法完成`pollDesc`初始化,并将事件加入epoll实例
if err = netfd.init(); err != nil {
netfd.Close()
return nil, err
}
// ...
return netfd, nil
}
// `internal/poll/fd_unix.go`
// Accept 封装了accept网络调用
func (fd *FD) Accept() (int, syscall.Sockaddr, string, error) {
// ...
for {
// 其内调用syscall.Accept4/syscall.Accept,设置为syscall.SOCK_NONBLOCK|syscall.SOCK_CLOEXEC
s, rsa, errcall, err := accept(fd.Sysfd)
// listener fd在创建的时候设置为非阻塞模式,accept()会立即返回,
// 判断err,为nil则说明有新连接事件,直接return
if err == nil {
return s, rsa, "", err
}
switch err {
case syscall.EAGAIN:
if fd.pd.pollable() {
// 如果err为syscall.EAGAIN,并且pollDesc的runtimeCtx不为空,则调用pollDesc.waitRead,
// 其中调用了`runtime_pollWait`,实际连接调用的是`runtime.poll_runtime_pollWait`
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
case syscall.ECONNABORTED:
continue
}
return -1, nil, errcall, err
}
}
func (pd *pollDesc) waitRead(isFile bool) error {
return pd.wait('r', isFile)
}
func (pd *pollDesc) waitWrite(isFile bool) error {
return pd.wait('w', isFile)
}
func (pd *pollDesc) wait(mode int, isFile bool) error {
if pd.runtimeCtx == 0 {
return errors.New("waiting for unsupported file type")
}
res := runtime_pollWait(pd.runtimeCtx, mode)
return convertErr(res, isFile)
}
-
Accept接收客户端连接请求建立新连接,通过netFD的accept()创建系统fd,将socket设置为非阻塞I/O模式。 - 后面逻辑与前面讲的一样,创建并初始化
netFD,其内完成pollDesc初始化、调用runtime.netpollopen将fd、epollevent添加到epoll实例。 - 因为是非阻塞模式,当
accept()返回err为syscall.EAGAIN时,若pollDesc的runtimeCtx不为空,则调用pollDesc.waitRead,其中调用了runtime_pollWait。 -
runtime_pollWait实际link的是runtime.poll_runtime_pollWait,其中调用netpollblock,源码如下:
// `runtime/netpoll.go`
//go:linkname poll_runtime_pollWait internal/poll.runtime_pollWait
func poll_runtime_pollWait(pd *pollDesc, mode int) int {
// netpollcheckerr and check GOOS
///...
// for循环等待 IO ready
for !netpollblock(pd, int32(mode), false) {
err = netpollcheckerr(pd, int32(mode))
if err != 0 {
return err
}
}
return 0
}
// IO reday返回true,超时或者关闭返回false
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
// gpp根据mode的值取rg或者wg,后面调用gopark时,会把当前的goroutine的g结构指针存入gpp
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
// set the gpp semaphore to WAIT
// 判断状态为 pdReady 则 return, 否则设置为 pdWait
for {
old := *gpp
// old == pdReady 表示此时已有 I/O 事件发生,
// 直接返回 unblock 当前 goroutine 并执行相应的 I/O 操作
if old == pdReady {
*gpp = 0
return true
}
if old != 0 {
throw("runtime: double wait")
}
// CAS原子操作将gpp置为`pdWait`
if atomic.Casuintptr(gpp, 0, pdWait) {
break
}
}
// recheck error states
if waitio || netpollcheckerr(pd, mode) == 0 {
// 防止此时可能会有其他的并发操作修改pd里的内容,所以需要再次检查错误状态。
// gopark将当前goroutine置于等待状态并等待下一次的调度
// `netpollblockcommit`回调函数在gopark内部回调时,CAS将当前goroutine指针存到传入的参数`gpp`
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
// 通过原子操作将gpp的值设置为0,返回修改前的值并判断是否pdReady
old := atomic.Xchguintptr(gpp, 0)
if old > pdWait {
throw("runtime: corrupted polldesc")
}
return old == pdReady
}
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
关于Go的调度,此处不深入,简单讲一下。
gopark调用mcall(park_m),mcall是通过汇编实现的,其函数原型及作用为:
func mcall(fn func(*g))
- 从当前g栈切换到g0栈;
- 在g0栈上执行函数fn(g),此处为
park_m; - 保存当前g的信息,将PC/SP存储到
g.sched中,当被重新调度时能够获取相关信息继续执行。
在park_m()中将当前goroutine状态由_GrunningCAS为_Gwaiting、与当前的m解绑,并回调netpollblockcommit将gr/gw由pdWaitCAS为goroutine指针,然后调用schedule()。
schedule函数永远不会返回,其调用逻辑为:schedule() -> execute() -> gogo() -> goroutine 任务 -> goexit() -> goexit1() -> mcall() -> goexit0() -> schedule()。
当goroutine对应的fd上发生期望的事件时,它就会被重新调度,从g.sched中获取之前保存的信息,继续执行后面的逻辑,此时gpp为pdReady状态。
Read、Write
TCPListener的accept()中创建并初始化netFD后,会调用newTCPConn()创建并返回*TCPConn,它实现了net.Conn接口,我们主要看Read和Write方法。
Read调用链源码:
// `net/tcpsock.go`
func newTCPConn(fd *netFD) *TCPConn {
c := &TCPConn{conn{fd}}
setNoDelay(c.fd, true)
return c
}
type TCPConn struct {
conn
}
// `net/net.go`
type conn struct {
fd *netFD
}
// Read implements the Conn Read method.
func (c *conn) Read(b []byte) (int, error) {
if !c.ok() {
return 0, syscall.EINVAL
}
n, err := c.fd.Read(b)
if err != nil && err != io.EOF {
err = &OpError{Op: "read", Net: c.fd.net, Source: c.fd.laddr, Addr: c.fd.raddr, Err: err}
}
return n, err
}
// `net/fd_unix.go`
func (fd *netFD) Read(p []byte) (n int, err error) {
n, err = fd.pfd.Read(p)
runtime.KeepAlive(fd)
return n, wrapSyscallError("read", err)
}
// `internal/poll/fd_unix.go`
// Read implements io.Reader.
func (fd *FD) Read(p []byte) (int, error) {
// ...
for {
// syscall.Read系统调用从socket读取数据,因为 socket在被accept的时候设置为非阻塞 I/O,不会阻塞
n, err := syscall.Read(fd.Sysfd, p)
if err != nil {
n = 0
if err == syscall.EAGAIN && fd.pd.pollable() {
// 当err为syscall.EAGAIN,调用waitRead,
// 从上面的分析知道,其内通过gopark将当前goroutine挂起
if err = fd.pd.waitRead(fd.isFile); err == nil {
continue
}
}
if runtime.GOOS == "darwin" && err == syscall.EINTR {
continue
}
}
err = fd.eofError(n, err)
return n, err
}
}
conn.Write与conn.Read逻辑类似,通过FD.Write调用syscall.Write,因为为非阻塞 I/O,如果返回err为syscall.EAGAIN,也会类似Read,调用pollDesc.waitWrite,
执行runtime_pollWait->netpollblock,gopark住当前goroutine,直到有事件发生被重新调度。
netpoll
通过前面的分析,我们了解了Go通过gopark住 goroutine 达到阻塞 Accept/Read/Write 的效果。现在会有两个疑问:
- 当相应的 I/O 事件发生之后,如何唤醒这些
gopark住的goroutine从而继续调度执行呢? - 我们前面讲到了跟epoll相关的两个调用
epoll_create、epoll_ctl,还有一个重要的epoll_wait在哪里调用的呢?
通过源码,可以发现,在runtime/netpoll_epoll.go中有一个netpoll()方法,它内部调用 epollwait(epoll_wait) 获取就绪的fd事件epollevent列表,然后将每个epollevent.data值取出转化为*pollDesc,并调用netpollready->netpollunblock, 将rg/wg的状态转化为pdReady(ioready),同时将读、写g指针添加到goroutine列表gList返回。
相关源码如下:
// runtime/proc.go
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
head guintptr
}
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
// epfd、delay判断waitms赋值
// ...
var events [128]epollevent
retry:
// 获取就绪的fd事件列表
n := epollwait(epfd, &events[0], int32(len(events)), waitms)
if n < 0 {
// ...
goto retry
}
var toRun gList
for i := int32(0); i < n; i++ {
ev := &events[i]
if ev.events == 0 {
continue
}
// 判断是否为netpollinit注册epoll实例时设置的netpollBreakRd
if *(**uintptr)(unsafe.Pointer(&ev.data)) == &netpollBreakRd {
// ...
continue
}
var mode int32
// 读事件
if ev.events&(_EPOLLIN|_EPOLLRDHUP|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'r'
}
// 写事件
if ev.events&(_EPOLLOUT|_EPOLLHUP|_EPOLLERR) != 0 {
mode += 'w'
}
if mode != 0 {
// 取出保存在epollevent.data中的pollDesc
pd := *(**pollDesc)(unsafe.Pointer(&ev.data))
pd.everr = false
if ev.events == _EPOLLERR {
pd.everr = true
}
// 调用netpollready,传入就绪fd的pollDesc
netpollready(&toRun, pd, mode)
}
}
return toRun
}
// netpollready 调用netpollunblock,把pollDesc中相应的可读、写goroutine取出
// 并将pollDesc.rg/wg转换状态为pdReady,然后将取出的goroutine push到链表 toRun 中
//go:nowritebarrier
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
if mode == 'r' || mode == 'r'+'w' {
rg = netpollunblock(pd, 'r', true)
}
if mode == 'w' || mode == 'r'+'w' {
wg = netpollunblock(pd, 'w', true)
}
if rg != nil {
toRun.push(rg)
}
if wg != nil {
toRun.push(wg)
}
}
func netpollunblock(pd *pollDesc, mode int32, ioready bool) *g {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
for {
old := *gpp
if old == pdReady {
return nil
}
if old == 0 && !ioready {
// Only set READY for ioready. runtime_pollWait
// will check for timeout/cancel before waiting.
return nil
}
var new uintptr
if ioready {
new = pdReady
}
// gpp CAS 为 pdReady
if atomic.Casuintptr(gpp, old, new) {
if old == pdReady || old == pdWait {
old = 0
}
// 将之前存入pollDesc的 g结构指针 old 转换为 *g
return (*g)(unsafe.Pointer(old))
}
}
}
我们看到Go 在netpoll()中获取触发读写事件的goroutine列表,而netpoll()会在多处被调用(runtime/proc.go):
- startTheWorldWithSema()
StartTheWorld时会调用netpoll获取gList进行调度 - findrunnable()
该方法会在Go scheduler的核心方法schedule()中被调用,从而调用netpoll获取gList - pollWork()
后台工作循环(比如idle GC)检查时会调用netpoll获取gList - sysmon()
在程序启动时调用,不需要P,使用独立的M作为监控线程,sysmon每 20us~10ms运行一次,调用netpoll获取gList
当上面这些调用获取gList后,都会调用injectglist()方法(findrunnable()中会先pop出一个g,将g状态由_GwaitingCAS为_Grunnable,然后再调用injectglist()),injectglist方法会将gList中的g的状态由_GwaitingCAS为_Grunnable,然后放入全局运行队列(globrunqput(gp)),从而被重新调度,当goroutine被重新调度时,会从g.sched中取出PC/SP信息,继续执行之前的逻辑。
// Injects the list of runnable G's into the scheduler and clears glist.
// Can run concurrently with GC.
func injectglist(glist *gList) {
if glist.empty() {
return
}
if trace.enabled {
for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
// 从glist中循环取出gp
for n = 0; !glist.empty(); n++ {
gp := glist.pop()
// 由 _Gwaiting 变为 _Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 放入全局运行队列
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
// 循环获取空闲P,并调度 M 去运行 P
startm(nil, false)
}
*glist = gList{}
}
Go基于epoll/kqueue/iocp和自身的运行时调度机制,实现了自己的I/O多路复用netpoll网络模型,从上面源码可以看到,Accept/Read/Write等方法其底层实现均采用非阻塞方式,而我们在开发过程中调用的方式很显然是同步模式,这就大大降低了网络开发难度,从这也可以看出Go语言设计的初衷之一:简单而高效。
至此,本文关于Go net(TCP service)相关内容就介绍完了,总结一下涉及的内容:
- TCP Server的简单使用示例
- 简单介绍了一个可使用(或作为参考)的、基于Go原生net package实现的go net项目(github开源项目)
- 主要进行了
netpoll相关源码的分析(基于Go1.14)