Skynet是多线程框架,其中对应了一些服务(Service),每个服务对应一个Lua虚拟机,一个虚拟机上可以跑多个协程,但同一时刻只能有一个协程,每条消息处理由协程来完成,且运行在保护模式下。Lua层实现的协议池和时序相关的队列,可以类比C++协程相关实现。
Skynet中的协程
Skynet本质上只是一个消息分发器,以服务为单位并给每个服务分配一个独立的ID,可以从任意服务向另一个服务发送消息。在此基础上,在服务中接入Lua虚拟机,并将消息收发的API封装成了Lua模块。
目前使用Lua编写的服务在最底层只有一个入口,就是接收和处理一条Skynet框架转发过来的消息。可以通过skynet.core.callback
这个内部用C编写的API(通常由skynet.start
调用),把一个Lua函数设置到所属的服务模块中。
每个模块必须设置且只能设置一个回调函数,这个回调函数在每次收到一条消息时,会接收5个参数:消息类型、消息指针、消息长度、消息Session、消息来源。
消息分为两类:
- 别人对你发起的请求
- 你过去对外的请求所收到的回应
无论是哪一类,都是通过同一个回调函数进入。在实际使用Skynet时可以直接使用rpc
的语法,向外部服务发起一个远程调用,等待对方发送了回应消息后,逻辑接着向下走。那么,框架如何把回调函数的模式转换为阻塞API调用的形式的呢?这多亏了Lua支持协程coroutine
,使一段代码运行了一半时挂起,在之后合适的时候再继续运行。
为了实现这点,需要在收到每条请求消息时先创建一个协程,在协程中去运行该类消息的dispatch
函数,可使用框架中skynet.dispath
函数设置消息的处理函数。之所以必须先创建协程而不能直接调用消息处理函数,是因为无法预知在消息处理的过程中会不会因为阻塞API而需要挂起执行流程。等到第一次需要挂起时才把执行流程绑定到协程上是做不到的。
接着,所有的阻塞都通过coroutine.yield
函数挂起当前协程,并把挂起类型以及可能用到的数据传出来。框架会捕获这些参数也就进一步知道去做什么,也也就解释了阻塞API为什么必须在消息处理函数中调用,而不能直接卸载服务的主体代码中的原因。因为初始化部分的代码并不运行在框架创建出来的协程中。
例如:对于skynet.call
其实是生成一个对当前服务来说唯一的session
号,调用yield
给框架发送CALL
指令。框架中的resume
捕获到CALL
之后,会把Session和Coroutine对象记录在表中,然后挂起协程,并结束当前的回调函数。等待Skynet底层框架后续消息进来时再处理。实际上,这里还会处理skynet.fork
创建的额外线程。
服务调度API
local skynet = require "skynet"
skynet.sleep(time)
设置当前任务休眠等待的微秒数
skynet.fork(func, ...)
fork
用于创建并启动新任务
fork
用于启动一个新的任务去执行函数func
,实质上它是开了一个协程,函数调用完成后会返回线程句柄。虽然可以使用原生的coroutine.create
来创建协程,但这样做会打乱Skynet的工作流程。
skynet.yield()
yield
让出当前任务执行流程
yield
会让出当前任务执行流程,使本服务内其它任务有机会执行,随后会继续运行。
skynet.wait()
wait
让出当前任务执行流程直到使用wakeup
唤醒它
skynet.wakeup(co)
wakeup
用于唤醒使用wait
或sleep
后处于等待状态的任务
skynet.timeout(time, func)
timeout
用于设定一个定时触发函数func
,在time * 0.01s
后触发。
skynet.starttime()
starttime
用于返回当前进程的启动UTC时间(秒)
skynet.now()
now
用于返回当前进程启动后经过的时间(微秒)
skynet.time()
time
用于通过starttime
和now
计算出当前UTC时间秒数
sleep 休眠 定时器
skynet.sleep(ti)
函数是将当前协程挂起ti
个单位时间,一个单位时间是1/100秒。sleep
向框架注册注册了一个定时器的实现,框架会在ti
时间后发送一个定时器消息来唤醒这个协程。sleep
函数是一个阻塞API,返回值nil
会告诉你时间到了,返回值BREAK
则表示被skynet.wakeup
给唤醒了。
$ cd skynet
$ vim demo/service_sleep.lua
local skynet = require "skynet"
skynet.start(function()
skynet.error("sleep begin")
skynet.sleep(300)
skynet.error("sleep end")
end)
$ cp example/config demo/config
$ cp example/config.path demo/config.path
$ vim demo/config.path
# 将example替换为demo
$ vim demo/config
# 将main替换为service_sleep
$ ./skynet demo/config
[:01000009] sleep begin
service_sleep # 手工输入
[:01000009] sleep end
[:01000002] KILL self
注意:在console
服务中输入service_sleep
后会发现,新服务不会立即启动,因为console
服务正忙于第一个服务的初始化,需要等待3秒后新服务才会被console
处理。这种做法实际上是错误的,在skynet.start
中服务初始化中是不允许有阻塞的存在,服务初始化要求尽量快的执行完成,所以业务逻辑代码一般不应该写在skynet.start
函数中。
fork 在服务中开启新线程
在Skynet的服务中,可以开启一个新的线程用来处理业务,注意这里的线程并非传统意义上的线程,而更像是虚拟线程,其实是通过协程来模拟的。
在Skynet中所有的Lua层函数都是以协程的方式被执行的,包括skynet.fork
产生的函数。除非在skynet.start
之外调用函数,由于start
函数调用timeout
产生协程,而fork
则产生的是协程列表
Lua层设置的回调函数skynet.dispatch_message
主要调用了raw_dispatch_message
,这里才是驱动协程函数执行的位置。一个协程结束或挂起后将由suspend
函数来接管。
如果入口函数start
中没有调用fork
、sleep
、wait
之类的函数,那么驱动start
执行的消息将结束。
$ vim demo/service_fork.lua
local skynet = require "skynet"
function task(timeout)
skynet.error("coroutine fork: ", coroutine.running())
skynet.error("sleep begin ", timeout)
skynet.sleep(timeout)
skynet.error("sleep end")
end
skynet.start(function()
skynet.error("coroutine start: ", coroutine.running())
-- 开启新线程来执行task任务
skynet.fork(task, 500)
end)
$ vim demo/config
start = "service_fork"
$ ./skynet demo/service_fork
[:01000009] coroutine start: thread: 0x7f173b4f1068 false
[:01000009] coroutine fork: thread: 0x7f173b4f1148 false
[:01000009] sleep begin 500
[:01000002] KILL self
[:01000009] sleep end
注意:可以看到当service_fork
启动后,console
服务仍然可以接收终端输入的服务并启动。若是需要长时间运行并且出现阻塞的情况,可使用skynet.fork
创建新的协程来运行。
查看skynet/lualib/skynet.lua
文件可知道skynet.fork()
函数其实是使用的coroutine.create()
函数来实现的。
$ vim lublib/skynet.lua
local coroutine_pool = setmetable({}, {__mode = "kv"})
local function co_create(f)
local co = table.remove(coroutine_pool)
if co==nil then
co = coroutine.create(function(...)
-- 函数执行完毕
f(...)
while true do
f = nil
coroutine_pool[#coroutine_pool + 1] = co
-- 协程挂起,将由suspend函数接管,执行cmd=="TEXT"分支
f = coroutine_yield "EXIT"
f(coroutine_yield())
end
end)
else
-- 回到前一次消息挂起的位置返回的f就是要执行的
coroutine_resume(co, f)
end
return co
end
每次使用skynet.fork()
其实都是从协程池中获取未被使用的协程,并把该协程加入到fork
队列中,等待一个消息调度,然会会一次把fork
队列中的协程拿出来执行一遍。执行结束后会把协程重新丢入协程池中,这样可避免重复开启和关闭协程带来的额外开销。
案例:长时间占用执行权限的任务