Golang 游戏leaf系列(二) 网络消息流程概述

最开始接触到Leaf,就是被它的网络消息功能吸引的。那么先看看这部分功能吧。从文档中得知:

Leaf 可以单独使用 TCP 协议或 WebSocket 协议,也可以同时使用两者,换而言之,服务器可以同时接受 TCP 连接和 WebSocket 连接,对开发者而言消息来自 TCP 还是 WebSocket 是完全透明的。

一、network和gate

这个功能在源码中是如何实现的呢,看看network目录下tcp开头的,和ws开头的,有xx_conn,xx_msg,xx_server,正好各有3个文件。在conn.go里有个Conn interface,所以xx_conn肯定是实现这个接口的两个不同类型。按照这个思路,顺便看一下processor.go里的解析器接口,也是有json.go和protobuf.go两种实现。

type Conn interface {
    ReadMsg() ([]byte, error)
    WriteMsg(args ...[]byte) error
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    Close()
    Destroy()
}
1.gate目录

然后xx_conn这两种连接方式,要对外透明,是封装在gate包下面,一起使用的。先看一下agent.go:

type Agent interface {
    WriteMsg(msg interface{})
    LocalAddr() net.Addr
    RemoteAddr() net.Addr
    Close()
    Destroy()
    UserData() interface{}
    SetUserData(data interface{})
}

在gate.go里,会有一个agent 结构体来实现Agent接口。除了Agent接口中的方法,agent还实现了Run方法和OnClose方法。

type agent struct {
    conn     network.Conn
    gate     *Gate
    userData interface{}
}

这个结构体又引入了一个Gate,这是啥?在gate.go里也能找到:

type Gate struct {
    MaxConnNum      int
    PendingWriteNum int
    MaxMsgLen       uint32
    Processor       network.Processor
    AgentChanRPC    *chanrpc.Server

    // websocket
    WSAddr      string
    HTTPTimeout time.Duration
    CertFile    string
    KeyFile     string

    // tcp
    TCPAddr      string
    LenMsgLen    int
    LittleEndian bool
}

看起来有一些配置参数,还有一个数据解析器Processor,和AgentChanRPC *chanrpc.Server,看一下怎么用的吧。
Gate只有两个方法,OnDestroy目前是空的,还有一个是Run,不出意外的话,应该是解析那些配置参数,启动服务:

func (gate *Gate) Run(closeSig chan bool) {
    var wsServer *network.WSServer
    if gate.WSAddr != "" {
        wsServer = new(network.WSServer)
        wsServer.Addr = gate.WSAddr
        wsServer.MaxConnNum = gate.MaxConnNum
        wsServer.PendingWriteNum = gate.PendingWriteNum
        wsServer.MaxMsgLen = gate.MaxMsgLen
        wsServer.HTTPTimeout = gate.HTTPTimeout
        wsServer.CertFile = gate.CertFile
        wsServer.KeyFile = gate.KeyFile
        wsServer.NewAgent = func(conn *network.WSConn) network.Agent {
            a := &agent{conn: conn, gate: gate}
            if gate.AgentChanRPC != nil {
                gate.AgentChanRPC.Go("NewAgent", a)
            }
            return a
        }
    }

    var tcpServer *network.TCPServer
    if gate.TCPAddr != "" {
        tcpServer = new(network.TCPServer)
        tcpServer.Addr = gate.TCPAddr
        tcpServer.MaxConnNum = gate.MaxConnNum
        tcpServer.PendingWriteNum = gate.PendingWriteNum
        tcpServer.LenMsgLen = gate.LenMsgLen
        tcpServer.MaxMsgLen = gate.MaxMsgLen
        tcpServer.LittleEndian = gate.LittleEndian
        tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
            a := &agent{conn: conn, gate: gate}
            if gate.AgentChanRPC != nil {
                gate.AgentChanRPC.Go("NewAgent", a)
            }
            return a
        }
    }

    if wsServer != nil {
        wsServer.Start()
    }
    if tcpServer != nil {
        tcpServer.Start()
    }
    <-closeSig
    if wsServer != nil {
        wsServer.Close()
    }
    if tcpServer != nil {
        tcpServer.Close()
    }
}

这里启动了两种不同类型的Server,closeSig那个暂时忽略不说。Server使用NewAgent回调,把gate传走了,呃,有点懵逼。还是回到官方例子中看看整个使用流程吧

二、 官方例子LeafServer中的Module
1.Module初始化

首先在main.go中

    leaf.Run(
        game.Module,
        gate.Module,
        login.Module,
    )

这里gate.Module实际上是由gate包里的external.go暴露出来的(这也是leaf的使用习惯,所有module都这样暴露)。

//src/server/gate/external.go
type Module struct {
    *gate.Gate
}

func (m *Module) OnInit() {
    m.Gate = &gate.Gate{
        MaxConnNum:      conf.Server.MaxConnNum,
        PendingWriteNum: conf.PendingWriteNum,
        MaxMsgLen:       conf.MaxMsgLen,
        WSAddr:          conf.Server.WSAddr,
        HTTPTimeout:     conf.HTTPTimeout,
        CertFile:        conf.Server.CertFile,
        KeyFile:         conf.Server.KeyFile,
        TCPAddr:         conf.Server.TCPAddr,
        LenMsgLen:       conf.LenMsgLen,
        LittleEndian:    conf.LittleEndian,
        Processor:       msg.Processor,
        AgentChanRPC:    game.ChanRPC,
    }
}

匿名结构体Gate了,又额外实现一个OnInit方法,感觉像是有一个IModule这样的接口呢,找一找:
在源码的module.go中,确实找到了:

type Module interface {
    OnInit()
    OnDestroy()
    Run(closeSig chan bool)
}

结合上面第一部分说的Gate实现了OnDestroy和Run方法,官方例子中的gate/external.go确是实现了Module接口。注意其OnInit中,除了一堆属性从conf配置中读取,还引入了msg.Processor,这明显是个网络消息解析器。然后game.ChanRPC,这看起来是转到game模块去了,所以在一开始main.go中的leaf.Run中,也是先传入的game.Module,然后才是gate.Module。

//leaf.go
func Run(mods ...module.Module) {
    ...

    log.Release("Leaf %v starting up", version)

    // module
    for i := 0; i < len(mods); i++ {
        module.Register(mods[i])
    }
    module.Init()
    ...
2.module是怎么运行起来的

再次回到源码module.go,节选一部分代码过来

type module struct {
    mi       Module
    closeSig chan bool
    wg       sync.WaitGroup
}

var mods []*module

func Register(mi Module) {
    m := new(module)
    m.mi = mi
    m.closeSig = make(chan bool, 1)

    mods = append(mods, m)
}

func Init() {
    for i := 0; i < len(mods); i++ {
        mods[i].mi.OnInit()
    }

    for i := 0; i < len(mods); i++ {
        m := mods[i]
        m.wg.Add(1)
        go run(m)
    }
}

func run(m *module) {
    m.mi.Run(m.closeSig)
    m.wg.Done()
}

看到这些,是不是想起来官方文档说的这段话:

Leaf 首先会在同一个 goroutine 中按模块注册顺序执行模块的 OnInit 方法,等到所有模块 OnInit 方法执行完成后则为每一个模块启动一个 goroutine 并执行模块的 Run 方法。最后,游戏服务器关闭时(Ctrl + C 关闭游戏服务器)将按模块注册相反顺序在同一个 goroutine 中执行模块的 OnDestroy 方法。

三、综述
1.流程

现在来理一理思路。从main.go里开始,leaf.Run注册并运行了game,gate,login三个module。重点关注gate这个module,这个module通过组合方式实现了Module接口,即自己项目里实现OnInit方法,通过匿名结构体gate.Gate在源码里实现OnDestroy和Run方法。其中,OnInit方法里把gate.Gate制造出来了,部分属性读取conf的配置,Processor指定成自己项目的消息解析,AgentChanRPC指定了自己项目里的game模块。

...
Processor:       msg.Processor,
AgentChanRPC:    game.ChanRPC,
...

然后按照流程继续走,gate模块的OnInit执行完,就要去执行Run了。这个方法在本文第一部分就看过了,当时卡在一个懵逼的地方:

tcpServer.NewAgent = func(conn *network.TCPConn) network.Agent {
    a := &agent{conn: conn, gate: gate}
    if gate.AgentChanRPC != nil {
        gate.AgentChanRPC.Go("NewAgent", a)
    }
    return a
}

现在有点感觉了吧,也就是说tcpServer执行NewAgent时,看单词名字意思是一个新连接事件发生时,实际会转交给gate.AgentChanRPC去执行,也就是例子中的game.ChanRPC。转交方式是.Go("NewAgent", a),就像抛出一个事件一样,有一个名称,有一个参数。可以去game模块的chanrpc.go验证一下

//game.internal.chanrpc.go
func init() {
    skeleton.RegisterChanRPC("NewAgent", rpcNewAgent)
    skeleton.RegisterChanRPC("CloseAgent", rpcCloseAgent)
}

func rpcNewAgent(args []interface{}) {
    a := args[0].(gate.Agent)
    _ = a
}

func rpcCloseAgent(args []interface{}) {
    a := args[0].(gate.Agent)
    _ = a
}
2.tcpServer什么时候执行NewAgent

在gate的Run方法中,提到了tcpServer会根据参数生成并运行

...
if tcpServer != nil {
    tcpServer.Start()
}
...

然后去tcp_server.go看一下

func (server *TCPServer) Start() {
    server.init()
    go server.run()
}

init和run细节有点多,先忽略掉吧。我们是来找NewAgent的,终于在run中找到了:

...
tcpConn := newTCPConn(conn, server.PendingWriteNum, server.msgParser)
agent := server.NewAgent(tcpConn)
go func() {
    agent.Run()

    // cleanup
    tcpConn.Close()
    server.mutexConns.Lock()
    delete(server.conns, conn)
    server.mutexConns.Unlock()
    agent.OnClose()

    server.wgConns.Done()
}()

首先这段代码是在一个for循环中的,也就是收到tcp消息时,才会执行。具体基础知识参考Golang socket websocket。agent在拿到具体的tcpConn,会执行自己的Run方法,回到源码gate.go的agent结构体可以看到:

type agent struct {
    conn     network.Conn
    gate     *Gate
    userData interface{}
}

func (a *agent) Run() {
    for {
        data, err := a.conn.ReadMsg()
        if err != nil {
            log.Debug("read message: %v", err)
            break
        }

        if a.gate.Processor != nil {
            msg, err := a.gate.Processor.Unmarshal(data)
    ...
}

开始使用相应的Processor去读取数据了。

本篇暂时先到这里,还有许多细节,留待后续系列慢慢深究。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,658评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,482评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,213评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,395评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,487评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,523评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,525评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,300评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,753评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,048评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,223评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,905评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,541评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,168评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,417评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,094评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,088评论 2 352