作者:shihuaping0918@163.com,转载请注明作者
第5篇讲到了消息的处理,消息的处理实际上就是对工作队列里的消息不停地调回调函数。那么消息是怎么放进消息队列的呢。带着这个疑问,让我们从lua层开始追根溯源。
在lua层有两个api,一个是skynet.send,这个是非阻塞发消息。另一个是skynet.call,这个是阻塞式发完消息等回应。skynet.call使用一个session来实现等待,这个session实际就是一个自增的数字,溢出了以后又从1开始。
以skynet.send为例进行分析。skynet.send位于skynet/lualib/skynet.lua文件中。
function skynet.send(addr, typename, ...)
local p = proto[typename]
return c.send(addr, p.id, 0 , p.pack(...)) -- c就是skynet.core
end
可以看出skynet.send实际上是调了skynet.core里的send函数。而skynet.core的定义在skynet/lualib-src/lua-skynet.c这个文件中。因为下面有一部分代码涉及到lua c api,这部分我不打算说明了。
LUAMOD_API int
luaopen_skynet_core(lua_State *L) { //这个就是skynet.core
luaL_checkversion(L);
luaL_Reg l[] = {
{ "send" , lsend }, //这个就是send对应的函数lsend
{ "genid", lgenid },
省略.....
{ NULL, NULL },
};
}
从上面的代码可以知道,skynet.core.send实际对应的是lsend函数。
static int
lsend(lua_State *L) {
return send_message(L, 0, 2);
}
而lsend又调的是send_message函数。
static int
send_message(lua_State *L, int source, int idx_type) {
略...
switch (mtype) {
case LUA_TSTRING: {
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type, session , msg, len);
} else {
session = skynet_send(context, source, dest, type, session , msg, len);
}
break;
}
case LUA_TLIGHTUSERDATA: {
void * msg = lua_touserdata(L,idx_type+2);
int size = luaL_checkinteger(L,idx_type+3);
if (dest_string) {
session = skynet_sendname(context, source, dest_string, type | PTYPE_TAG_DONTCOPY, session, msg, size);
} else {
session = skynet_send(context, source, dest, type | PTYPE_TAG_DONTCOPY, session, msg, size);
}
break;
}
default:
luaL_error(L, "invalid param %s", lua_typename(L, lua_type(L,idx_type+2)));
}
略...
}
从上面代码看出,最终调的是skynet_sendname和skynet_send,而skynet_sendname实际上是调用的skynet_send。
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
return session;
}
if (skynet_harbor_message_isremote(destination)) { //远程消息
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz;
skynet_harbor_send(rmsg, source, session); //分布式消息发送
} else { //本地消息发送
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
//消息队列加一条消息
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
看到这里终于看到消息结构体了。
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle); //增加ctx引用计数
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message); //消息入队完成
skynet_context_release(ctx); //减少ctx引用计数
return 0;
}
从上面的代码分析可以很清楚地看到,skynet.send实际上就是往目标服务的消息队列里增加一条消息。
在这篇文章里,消息处理涉及到了lua c api,这个东西不是一两篇文章能说得清楚的。所以直接略过了。
从代码跟踪的过程来看,发送一个消息实际上要进行6层函数调用。消息最终才能投到目标服务的消息队列中。而消息队列是怎么被处理的,在第5篇中已经讲过了。剩下的就是,消息的回调到底是怎么被注册的。这个将在下一篇中讲到。