浅析skynet底层框架下篇

这是最后一篇了,其实还有很多重要的模块要分析的,但留给以后有多余时间再去研究吧,有兴趣的可以自行下载源码分析。这部分主要是围绕第三小问题展开,并附加些其他skynet中与此有关的设计,即:当并发时,如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);包括创建协程处理消息,挂起协程,切换。这块其实是针对lua上层来说的,底层框架的消息队列只是保证消息顺序入队列且出队列,如果交叉执行比如lua层的协程挂起,那么就会出现时序问题。

先简单回顾下前几篇博客的分析,包括skynet本身的设计,及C++协程。对于C++协程,比如一个请求a过来后,从协程池中pop一个协程并处理该请求a,如果需要等待,则让出协程并挂上定时器,然后再处理下一个请求b,如果此时a和b是相关联的,且b有可能依赖于a的执行结果,那么就会出现问题。这对于游戏中的业务来说,尤其涉及到金钱相关的逻辑,那是大问题。而那种独立的请求间,只是读之类的操作,那是没问题的。如果需要结合业务,那么就需要改造。

而对于skynet来说,当并发上来时,考虑到这个时序问题,底层实现相关的顺序队列,大概思路就是lua协程执行a到一半后,哪怕有b的消息被协程调度处理,此时会把这个b协程压入队列(lua中的table也可以,使用数组部分),必须等a执行完毕或超时后,再处理b的,也就是在业务上层串行化了服务的消息处理。这样保证了时序。

但这又引起了另一个问题,即可能存在后面的消息都超时了,然而上层如果无法识别继续处理,那么就白白浪费了资源,处理了无用的消息。这类的相关介绍在另一篇“谈谈缓存穿透雪崩和过载保护以及一致性等问题”中有相关的介绍及应对方案。

本节分为两个小点讨论,即:
1)如何保证消息的正确时序,以及如何使用协程处理消息(同步/异步/超时);
2)创建协程处理消息,挂起协程,切换;

第一小点,撇开语言方面的限制,考虑skynet本身的框架设计,而不掺杂业务框架的设计。对于单进程多线程,要想并发的处理同一个客户端的请求,不管是读还是写,都必须路由到同一个线程处理,这样就保证了不会导致同一个client的请求分发到不同的线程,在skynet底层抽象client为一个agent service,有自己的消息队列,并且当工作线程处理这个agent消息时,先把这个消息队列从全局队列出摘出来,从这个队列pop一条消息,处理完毕后,再把这个消息队列挂到全局队列中;而对于push消息到agent队列则没有这种过程,只要获得自旋锁即可,相关源码可以见前面的分析。

这一层就保证了消息不会乱序,但是对于业务层,使用lua协程来提高并发,那么就要好好设计。

这里举例比如在主场景中,这样可以考虑到client的所有消息都路由到场景后需要考虑到的时序问题。
当与client有关的两条有依赖关系的消息a和b被场景服务dispatch分发处理时,不考虑读还是写,都会创建一个协程,并执行相关的处理函数。比如数据安全性不是特别严重的例子,玩家在帮派中,然后点领取今日奖励b消息,此时帮主把玩家踢出帮派a消息,本来是a先执行完毕后再b执行的顺序,这时可能出现a先执行导致挂起,而b执行完毕后,接着执行a的情况,多领了一份奖励。当然这里只是为了举例,通过检查可以避免这种问题。

简单分析下,在skynet的做法中,为每个服务加个lua层的消息队列,进入该队列的消息会被依次处理完毕,不管中间是否挂起,这样带来的问题是,并发度降底了且引入了一定的复杂度。

 17     dispatch = function(session, from, ...)
 18         table.insert(message_queue, {session = session, addr = from, ... })
 19         if thread_id then //有消息,如果有等待则wakeup
 20             skynet.wakeup(thread_id)
 21             thread_id = nil
 22         end
 23     end

 26 local function do_func(f, msg)
 27     return pcall(f, table.unpack(msg))
 28 end
 29 
 30 local function message_dispatch(f)
 31     while true do
 32         if #message_queue==0 then  //没消息则挂起
 33             thread_id = coroutine.running()
 34             skynet.wait()
 35         else
 36             local msg = table.remove(message_queue,1)  //依次处理消息
 37             local session = msg.session
 38             if session == 0 then  //不需要响应
 39                 local ok, msg = do_func(f, msg)
 40                 if ok then
 41                     if msg then
 42                         skynet.fork(message_dispatch,f)
 44                     end
 45                 else
 46                     skynet.fork(message_dispatch,f)
 48                 end
 49             else
 50                 local data, size = skynet.pack(do_func(f,msg))
 51                 -- 1 means response
 52                 c.send(msg.addr, 1, session, data, size) //需要响应
 53             end
 54         end
 55     end
 56 end

上面代码实现细节不作过多分析,简单注释了下,大致就是从table数组中remove前面的消息并处理之,如果会挂起则等响应结果或超时,再处理下一条。

如上面的实现,新消息来了fork一个协程处理:

533 function skynet.fork(func,...)
534     local args = table.pack(...)  //打包参数
535     local co = co_create(function()
536         func(table.unpack(args,1,args.n)) //设置协程执行函数和参数
537     end)
538     table.insert(fork_queue, co) //回收协程资源
539     return co
540 end

104 local function co_create(f)
105     local co = table.remove(coroutine_pool)
106     if co == nil then
107         co = coroutine.create(function(...)
108             f(...)
109             while true do
110                 local session = session_coroutine_id[co]
111                 if session and session ~= 0 then
112                     local source = debug.getinfo(f,"S")
                        //log error
117                 end
118                 f = nil
119                 coroutine_pool[#coroutine_pool+1] = co
120                 f = coroutine_yield "EXIT"
121                 f(coroutine_yield())
122             end
123         end)
124     else
125         coroutine_resume(co, f)
126     end
127     return co
128 end

上面co_create就从协程池中取一个协程对象处理消息,如果没有协程对象则创建。你一定会好奇执行完后,返回结果在哪?

对于lua的协程api,当create协程时它的状态还没开始,处于挂起suspended状态,然后resume后会处理running状态,执行完后为dead状态,引用下面的:
a)coroutine.create(arg):根据一个函数创建一个协同程序,参数为一个函数;
b)coroutine.resume(co):使协同从挂起变为运行(1)激活coroutine,也就是让协程函数开始运行;(2)唤醒yield,使挂起的协同接着上次的地方继续运行。该函数可以传入参数;
c)coroutine.yield():使正在运行的协同挂起,可以传入参数;

而真正强大之处在于当第二次resume时,resume和yield相关交换数据,具体怎么交互的建议看下lua协程基础。

在skynet中进行了对lua原始协程api进行封装并管理,下面说明第二个小点,当然会把第一小点也部分说明下,毕竟是个整体,从创建到处理到回收,以及中间的注意点。通过几个常用的接口来说明这套工作流程。

以下实现是wakeup相关:

493 function skynet.wakeup(token)
494     if sleep_session[token] then
495         table.insert(wakeup_queue, token) //在下一次suspend时被处理
496         return true
497     end
498 end

339 function skynet.wait(token)
340     local session = c.genid()
341     token = token or coroutine.running()
342     local ret, msg = coroutine_yield("SLEEP", session, token)//切出协程(A)
343     sleep_session[token] = nil  //协程切回来重置相关数据
344     session_id_coroutine[session] = nil
345 end

130 local function dispatch_wakeup()
131     local token = table.remove(wakeup_queue,1)
132     if token then
133         local session = sleep_session[token]
134         if session then
135             local co = session_id_coroutine[session]
136             local tag = session_coroutine_tracetag[co]
137             if tag then c.trace(tag, "resume") end
138             session_id_coroutine[session] = "BREAK"
139             return suspend(co, coroutine_resume(co, false, "BREAK"))(B)  调度被挂起的协程
140         end
141     end
142 end

157 function suspend(co, result, command, param, param2)
        //more code
183     elseif command == "SLEEP" then
184         local tag = session_coroutine_tracetag[co]
185         if tag then c.trace(tag, "sleep", co, 2) end
186         session_id_coroutine[param] = co
187         if sleep_session[param2] then
188             error(debug.traceback(co, "token duplicative"))
189         end
190         sleep_session[param2] = param
307     dispatch_wakeup()
308     dispatch_error_queue()
309 end

把要唤醒的协程通过token插入到wakeup_queue数组中(注意下,很多实现逻辑是使用table的数组部分,因为有序但带来的问题是从索引x处删除元素后,涉及到移动)

然后dispatch_wakeup会处理wakeup_queue,重点是这一句return suspend(co, coroutine_resume(co, false, "BREAK")),这部分在后面分析。
(A)处把当前协程切出去后,那三个参数作为主协程的返回值,即coroutine_resume的返回值,再加一个本身返回的true or false,然后调用suspend,同理coroutine_resume的后两个参数作为coroutine_yield的返回值。

以上部分还是比较容易理解,这里可以结合c++协程中的实现,有专门的协程调度器,要么超时要么有数据过来(响应)进而切回相应的协程处理。

不过经历过的项目貌似没有那种加限时的请求,如果call长时间收不到响应,可能会出问题,这个需要多研究下。不过,结合skynet基础实现也好办;另外底层框架也是skynet,lua层的源码部分都有返回,不管正确还是失败都会返回,除非这条call请求消息根本没有被目标服务的消息队列收到(可能出错),或者没有被工作线程调度,再或者没有被上层服务处理;前者可能基本为零,第一种可能性不大,因为框架已经保证消息一定会被发送到消息队列中(消息队列目前是无界的),而后面两种可能确实存在,比如一个死循环或者处理耗时的功能等,这些只能靠开发人员注意及必要code review了。

311 function skynet.timeout(ti, func)
312     local session = c.intcommand("TIMEOUT",ti)
313     assert(session)
314     local co = co_create(func)
315     assert(session_id_coroutine[session] == nil)
316     session_id_coroutine[session] = co
317 end
318 
319 function skynet.sleep(ti, token)
320     local session = c.intcommand("TIMEOUT",ti)
321     assert(session)
322     token = token or coroutine.running()
323     local succ, ret = coroutine_yield("SLEEP", session, token)
324     sleep_session[token] = nil
325     if succ then
326         return
327     end
328     if ret == "BREAK" then
329         return "BREAK"
330     else
331         error(ret)
332     end
333 end

上面就是超时的实现,也即弄个协程,向skynet框架注册个定时器,当超时时,发条消息到上层,上层创建协程处理。这个跟c++协程一样,实现中不能有sleep这种调用,只能用超时,然后挂到事件列表中,超时resume协程回调,不然阻塞其他。

剩下的不过多分析,这三篇只是简单分析了个大概,还有蛮多值得学习,关键在于思考为什么要这么做,可以根据自己的经验,去尝试改进或在github上提pr,分析别人的设计,可能并不像作者一路踩坑过来,并持续重构那样,恰到好处的设计。

接下来的一篇准备研究下锁的性能,主要是对前几天学习的一个总结。

skynet 中 Lua 服务的消息处理
Lua中的协同程序 coroutine
Lua Coroutine详解
skynet 里的 coroutine
skynet coroutine 运行笔记

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,504评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,434评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,089评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,378评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,472评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,506评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,519评论 3 413
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,292评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,738评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,022评论 2 329
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,194评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,873评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,536评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,162评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,413评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,075评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,080评论 2 352

推荐阅读更多精彩内容