skynet的消息发送:send和call

skynet是一个轻量级的游戏服务器框架。

skynet的核心是服务,服务之间通过消息来通信,消息的来源主要有:

  • 定时器
  • 网络
  • 服务之间的调用(skynet.sendskynet.call)

skynet.send和skynet.call

假设我们有两个服务A和B,A发了两条消息给B:


skynet_send_call.png

这里skynet.sendskynet.call的主要区别,在于call会阻塞,等待消息的返回值,而send将消息发送出去之后,就继续执行后续的指令。那么这里skynet.call之后,是怎么获取这个返回值的呢?我们来看看代码。

skynet.send的代码比较简单:

function skynet.send(addr, typename, ...)
    local p = proto[typename]
    return c.send(addr, p.id, 0, p.pack(...))
end

根据类型,将数据打包,然后调用底层的c.send将消息发送给目标地址。

再来看看skynet.call的代码:

function skynet.call(addr, typename, ...)
    --调试相关代码
    --...

    local p = proto[typename]
    local session = c.send(addr, p.id , nil , p.pack(...))
    if session == nil then
        error("call to invalid address " .. skynet.address(addr))
    end
    return p.unpack(yield_call(addr, session))
end

这里我们看到,skynet.callskynet.send在发送数据时,调用的底层函数是一样的,都是c.send,区别在于参数不同:

  • skynet.send调用c.send时,第三个参数是0,表示不用分配会话(session)
  • skynet.call调用c.send时,第三个参数是nil,表示需要分配会话ID(session)

这里的session,是系统一个自增的ID,每次分配时增加1,相当于给这一次的call分配一个唯一ID。

最后,skynet.call的返回是p.unpack(yield_call(addr, session))
p.unpack是解包数据,而yield_call,看名字就知道,是一个挂起的调用:
<span id="yield_call"></span>

local function yield_call(service, session)
    watching_session[session] = service
    session_id_coroutine[session] = running_thread
    local succ, msg, sz = coroutine_yield "SUSPEND" 
    watching_session[session] = nil
    if not succ then
        error "call failed"
    end
    return msg,sz
end

这里,用到了刚刚分配的session,记录了session对应的服务地址执行协程,然后,调用coroutine_yield将线程挂起,参数是"SUSPEND",等到目标服务返回结果后,才重新回到这个协程。

处理消息并返回

服务A调用skynet.call发送消息给服务B之后,A的协程挂起了,收到消息的服务B,是怎么处理这个消息,并返回给服务A的呢?
skynet的体系中,每个服务都有一个消息处理函数。对于skynet的lua服务,在启动时,skynet.start的第一行代码,就是设置lua层面的回调函数:

function skynet.start(start_func)
    c.callback(skynet.dispatch_message)
    --...其他代码
end

skynet.dispatch_message中的第一句,则是以pcall的方式调用raw_dispatch_message,这个函数一共有5个参数:

  • ptototype: 消息类型
  • msg: 消息体
  • sz:消息长度
  • session:会话ID,使用send的话,则是0
  • source:消息来源的服务地址
local function raw_dispatch_message(prototype, msg, sz, session, source)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then
        --...处理响应消息
    else
        local p = proto[prototype]
        if p == nil then
            --...错误处理
        end

        local f = p.dispatch
        if f then
            local co = co_create(f)                 -- 取得一个协程
            session_coroutine_id[co] = session      -- 并关联协程和会话
            session_coroutine_address[co] = source  -- 以及来源
            
                    --... trace调试相关代码
          
            suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
        else
            --...错误处理
        end
    end
end

关键看这一句:suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
从里向外,有三个函数调用:

  • p.unpack(msg, sz):根据消息类型预设好的unpack函数,来解析消息,返回解析后的参数。
  • coroutine_resume(co, session, source, ...):执行协程,协程参数为session,source,以及解析后的参数。这里实际上就是执行到skynet.dispatch中设置的消息处理函数(上面示例代码中,serverB的函数f)。
  • suspend(co, ...):处理完一条消息,挂起后的一些处理。

skynet.call的返回

从上面的消息处理来看,并没有对skynet.call做特别的处理,实际上,对于skynet.call的消息,我们必须手动调用skynet.retpack来返回数据。
通常,在消息处理函数中,我们可以通过session,来判断要不要使用skynet.retpack:

if session > 0 then
    skynet.retpack(func(...))
else
    func(...)
end

skynet.retpack实际上是对skynet.ret的调用:

skynet_ret.png
  1. 前面收到消息时,记录了当前协程对应的session,这里取出session。
  2. 如果session等于0,表示是send的消息,不需要返回。
  3. 前面收到消息时,还记录当前协程对应的消息来源,这里,给来源地址source发送一个PTYPE_RESPONSE类型的消息,成功将数据返回。

上面这些返回的操作,是在服务B中,而在服务A中,就收到了一个PTYPE_RESPONSE消息。此时前面发送skynet.call时的协程co还处于挂起的状态。

前面讲到raw_dispatch_message的时候,略过了PTYPE_RESPONSE的处理,现在再来看一下:

skynet_dispatch.png
  1. 通过session取得处理协程,在skynet.call => yield_call中,挂起之前,记录的session对应哪个协程,这里取回挂起的协程。
  2. RESPONSE并不只是skynet.ret才会用到,还有可能是skynet.timeout的定时时间到了,也会发送RESPONSE,这时co是一个字符串"BREAK"
  3. 收到一个未知的response的处理。
  4. 正常的skynet.call在这里获得返回值,这里的coroutine_resume,执行co协程,就是回到前面的yield_call
skynet_yield_call_ret.png
  1. 挂起的协程co恢复执行后,接收succ,msg,sz参数,最终yield_call返回的是msgsz
  2. yield_call的返回值,通过unpack解析之后,最终返回给调用者。至此,skynet.call终于取到了返回值。

Maybe forgot response session ... from ...

假设消息B在收到一个skynet.call的消息后,没有调用skynet.ret返回,那么会输出一个报错:Maybe forgot response session ... from ...skynet系统是怎么知道没有返回的呢?
前面在讲到消息处理raw_dispatch_message函数中,有一个步骤是从协程池中获取一个协程,并调用设置好的dispatch函数(示例中serviceB的函数f),实际上,这里并不是直接调用f,而是加了一层封装,我们来看看co_create的代码:

skynet_co_create.png

  1. 从池子里取出一条协程。
  2. 池子里没有协程时,创建协程。
  3. 协程的主函数,首先执行f(即传入的dispatch函数)。
  4. 执行完成之后,判断当前协程是否记录着session,当调用skynet.ret时,会清掉这个session。如果此时的session不等于0,就表示收到一个call之后没有使用skynet.ret返回,就在这里报个错。
  5. 清理数据。
  6. 将当前协程放入池子里,等待循环使用。
  7. 将协程挂起。
  8. 下一将调用co_create时,如果能从池子里找到co,则在这里开始执行协程,传入f,继续执行。

延迟返回

一般情况下,在处理call消息的协程中,我们必须调用skynet.retpack来返回数据,否则的话,会报错误Maybe forgot response
但有些情况下,我们希望在其他协程中返回数据(例如skynet.newservice 简介:服务的启动讲到的launch),这时候,我们可以使用skynet.response来生成一个响应函数。

function skynet.response(pack)
    pack = pack or skynet.pack

    local co_session = assert(session_coroutine_id[running_thread], "no session")
    session_coroutine_id[running_thread] = nil
    local co_address = session_coroutine_address[running_thread]
    if co_session == 0 then
        --  do not response when session == 0 (send)
        return function() end
    end
    local function response(ok, ...)
        if ok == "TEST" then
            return unresponse[response] ~= nil
        end
        if not pack then
            error "Can't response more than once"
        end

        local ret
        if unresponse[response] then
            if ok then
                ret = c.send(co_address, skynet.PTYPE_RESPONSE, co_session, pack(...))
                if ret == false then
                    -- If the package is too large, returns false. so we should report error back
                    c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
                end
            else
                ret = c.send(co_address, skynet.PTYPE_ERROR, co_session, "")
            end
            unresponse[response] = nil
            ret = ret ~= nil
        else
            ret = false
        end
        pack = nil
        return ret
    end
    unresponse[response] = co_address

    return response
end

这里实际上就是把返回需要用到的sessionsource用作一个函数的upValue,并返回这个函数,同时,清除session_coroutine_id中当前co对应的session,这样就不会触发到Maybe forgot response的警告了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,222评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,455评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,720评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,568评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,696评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,879评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,028评论 3 409
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,773评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,220评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,550评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,697评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,360评论 4 332
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,002评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,782评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,010评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,433评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,587评论 2 350

推荐阅读更多精彩内容