作者:shihuaping0918@163.com,转载请注明作者
在第5篇和第6篇已经分析过消息的发送和消息的处理,但是没有谈到消息回调函数的注册,还有消息回调的详细过程。第9篇已经讲了一部分消息的回调处理。
skynet中的回调对C服务和对LUA服务的注册机制是不同的,C服务的回调可以直接挂载。但是lua服务不行,它必须经过一次中转。这个在第9篇中谈到过,但是第9篇主要是介绍lua c api的协议的。
本篇分为两部分,第一部分介绍C服务的回调。第二部分介绍lua服务的注册与回调。第一部分比较简短,第二部分会比较长。
第一部分:C服务的回调
C服务的回调非常简单,直接把函数挂上去就可以。我们以skynet中的日志服务为例说明一下这个过程。
skynet中的日志服务在skynet/service-src/service-logger.c中。在第一篇介绍模块(服务)的时候就提到过,每个服务有create/init/release/signal四类函数。而logger服务也不例外,回调的挂载就是在init函数中进行的。
int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
if (parm) {
inst->handle = fopen(parm,"w");
if (inst->handle == NULL) {
return 1;
}
inst->filename = skynet_malloc(strlen(parm)+1);
strcpy(inst->filename, parm);
inst->close = 1;
} else {
inst->handle = stdout;
}
if (inst->handle) {
skynet_callback(ctx, inst, logger_cb); //注册回调
skynet_command(ctx, "REG", ".logger"); //注册服务
return 0;
}
return 1;
}
C服务注册回调用的是skynet_callback函数,这个函数只有2行。
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb; //回调函数挂载
context->cb_ud = ud; //这个是个辅助指针
}
C服务的回调挂载/注册就是这么简单。最后再来看一下这个回调是在哪里被调用的,是被怎么调用的。以便形成一个比较系统的概念,而不是盲人摸象。
回调的调用是在dispatch_message函数中进行的,这个函数前面已经分析过了,只是没有讲回调的注册过程。
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
//这里回调了,看到cb_ud了没有,它会在回调时传进去
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
//这里回调了,看到cb_ud了没有,它会在回调时传进去
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
第一部分C服务回调的注册和调用到此就完全清楚了。
第二部分:LUA服务的回调
LUA服务的回调注册和回调的调用层次比较多,也不直观,所以理解起来难度会比较大,我尽可能地把这个过程简化描述。
在写服务的时候,消息注册的惯用法为:
local CMD = {}
skynet.dispatch("lua", function(session, source, cmd, ...)
local f = assert(CMD[cmd])
f(...)
end)
这个skynet.dispatch会把消息的回调函数注册到服务。当然这不是唯一的途径,但是这篇文章只讲这个途径。
依然从skynet/lualib/skynet.lua中找dispatch,找到了的话会是这样的:
function skynet.dispatch(typename, func)
local p = proto[typename]
if func then
local ret = p.dispatch
p.dispatch = func
return ret
else
return p and p.dispatch
end
end
这个proto是什么呢?好像到了这就分析不下去了,这个函数和c底层的ctx->cb有什么关系?还是看个全一点的东西比较好,找个能正经干活的例子来看选一个skynet/example/simpledb.lua试试。
skynet.start(function()
skynet.dispatch("lua", function(session, address, cmd, ...)
cmd = cmd:upper()
if cmd == "PING" then
assert(session == 0)
local str = (...)
if #str > 20 then
str = str:sub(1,20) .. "...(" .. #str .. ")"
end
skynet.error(string.format("%s ping %s", skynet.address(address), str))
return
end
local f = command[cmd]
if f then
skynet.ret(skynet.pack(f(...)))
else
error(string.format("Unknown command %s", tostring(cmd)))
end
end)
skynet.register "SIMPLEDB"
end)
好,出现新花样了,skynet.start这个函数
function skynet.start(start_func)
c.callback(skynet.dispatch_message) -- skynet.core.callback
skynet.timeout(0, function()
skynet.init_service(start_func) --服务初始化
end)
end
这里出现了前面说的,分析a的时候涉及到b.c.d的问题。一个一个来吧,先看c.callback这个东西是干什么用的。
1.c.callback
c.callback最终定位到是在skynet/lualib-src/lua-skynet.c这个文件中,具体跟踪过程和第6篇中一样。
static int
lcallback(lua_State *L) {
struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION); //检查是不是数据类型是不是函数
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); //把_cb保存到用户表里,详见lua参考手册
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
lua_State *gL = lua_tothread(L,-1);
if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb); //这个地方调用了C函数
}
return 0;
}
//设置回调
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb; //看这里
context->cb_ud = ud;
}
很明显,c.callback就是设置回调函数到服务(模块)的上下文中,而且设置的是skynet.dispatch_message这个lua方法为回调函数。
2.从代码中看到,最终调用了skynet_callback这个C函数,这个C函数的第三个参数,是一个中转函数。所以lua服务的回调它不是被直接调的,首先要在_cb这个函数处理一下数据,在_cb里面去调lua的回调函数。_cb这个函数主要就是按照Lua api的协议,将参数准备好,然后调lua的函数。在第9篇分析过,这里简短地介绍一下:
static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
lua_State *L = ud;
int trace = 1;
int r;
int top = lua_gettop(L);
if (top == 0) {
lua_pushcfunction(L, traceback); //错误处理的函数
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); //把表里的回调函数取出来
} else {
assert(top == 2);
}
lua_pushvalue(L,2); //回调函数入栈
lua_pushinteger(L, type); //参数type入栈
lua_pushlightuserdata(L, (void *)msg); //参数msg入栈
lua_pushinteger(L,sz); //参数sz,消息长度,入栈
lua_pushinteger(L, session); //参数session入栈
lua_pushinteger(L, source); //参数session入栈
r = lua_pcall(L, 5, 0 , trace); //调用lua的回调函数,也就是skynet.dispatch_message
if (r == LUA_OK) {
return 0;
}
const char * self = skynet_command(context, "REG", NULL);
switch (r) {
case LUA_ERRRUN:
skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
break;
case LUA_ERRMEM:
skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
break;
case LUA_ERRERR:
skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
break;
case LUA_ERRGCMM:
skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
break;
};
lua_pop(L,1);
return 0;
}
3.skynet.dispatch_message,这个函数呢又涉及到lua的协程,这个协程暂时不讲,我把函数精简一下。dispatch_message实际调的是raw_dispatch_message,这是个lua函数。
local function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype == 1 then
local co = session_id_coroutine[session]
if co == "BREAK" then
session_id_coroutine[session] = nil
elseif co == nil then
unknown_response(session, source, msg, sz)
else
session_id_coroutine[session] = nil
suspend(co, coroutine_resume(co, true, msg, sz))
end
else
local p = proto[prototype] --skynet.dispatch对应的proto
if p == nil then
if session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch --取真正的回调函数,也就是skynet.dispath设的那个函数
if f then
local ref = watching_service[source]
if ref then
watching_service[source] = ref + 1
else
watching_service[source] = 1
end
local co = co_create(f) --这里创建协程
session_coroutine_id[co] = session
session_coroutine_address[co] = source
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) //这里唤醒协程
elseif session ~= 0 then
c.send(source, skynet.PTYPE_ERROR, session, "")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
第二部分到了这里,基本上流程就清楚了。第一步,skynet.dispatch把回调注册到proto表中,并在表中设置服务的回调函数。第二步,skynet.start会调用C层的lcallback函数,把lua函数skynet.dispatch_message设为lua层伪回调,这个伪回调被存在用户表里,这个lua层的伪回调会被C层的伪回调所调用。这个lua层的伪回调从proto表中取到dispatch注册的真正的服务的回调函数,然后调用它。第三步,lcallback函数会设置一个C层的伪回调,这个伪回调的作用是做c到lua层的协议转换。
语言描述可能还是不能为所有人理解,画个简单的关系图吧
skynet.dispatch(callback) ---------------------------> proto[typename].dispach = callback
|
skynet.core.callback(skynet.dispatch_message) -----------tbl[k] = skynet.dispatch_message
|
|
C dispatch_message->_cb ------------------------------------------------|
也就是说C层弄了一个函数叫_cb,它在lua注册服务时被注册。回调时被调用,回调时做lua api协议适配,然后取用户表里的一个lua回调函数,这个回调函数叫做skynet.dispatch_message。这个dispatch_message又会去一个叫proto的表里找到服务真正的回调函数,而这个真正的回调函数是通过skynet.dispatch注册到proto[typename].dispatch上的。
如果还是有人不理解我就无能为力了。