skynet源码分析(10)--消息机制之消息注册和回调

作者:shihuaping0918@163.com,转载请注明作者

在第5篇和第6篇已经分析过消息的发送和消息的处理,但是没有谈到消息回调函数的注册,还有消息回调的详细过程。第9篇已经讲了一部分消息的回调处理。

skynet中的回调对C服务和对LUA服务的注册机制是不同的,C服务的回调可以直接挂载。但是lua服务不行,它必须经过一次中转。这个在第9篇中谈到过,但是第9篇主要是介绍lua c api的协议的。

本篇分为两部分,第一部分介绍C服务的回调。第二部分介绍lua服务的注册与回调。第一部分比较简短,第二部分会比较长。

第一部分:C服务的回调
C服务的回调非常简单,直接把函数挂上去就可以。我们以skynet中的日志服务为例说明一下这个过程。

skynet中的日志服务在skynet/service-src/service-logger.c中。在第一篇介绍模块(服务)的时候就提到过,每个服务有create/init/release/signal四类函数。而logger服务也不例外,回调的挂载就是在init函数中进行的。

int
logger_init(struct logger * inst, struct skynet_context *ctx, const char * parm) {
    if (parm) {
        inst->handle = fopen(parm,"w");
        if (inst->handle == NULL) {
            return 1;
        }
        inst->filename = skynet_malloc(strlen(parm)+1);
        strcpy(inst->filename, parm);
        inst->close = 1;
    } else {
        inst->handle = stdout;
    }
    if (inst->handle) {
        skynet_callback(ctx, inst, logger_cb);  //注册回调
        skynet_command(ctx, "REG", ".logger"); //注册服务
        return 0;
    }
    return 1;
}

C服务注册回调用的是skynet_callback函数,这个函数只有2行。

void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
    context->cb = cb;  //回调函数挂载
    context->cb_ud = ud; //这个是个辅助指针
}

C服务的回调挂载/注册就是这么简单。最后再来看一下这个回调是在哪里被调用的,是被怎么调用的。以便形成一个比较系统的概念,而不是盲人摸象。

回调的调用是在dispatch_message函数中进行的,这个函数前面已经分析过了,只是没有讲回调的注册过程。

static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
    assert(ctx->init);
    CHECKCALLING_BEGIN(ctx)
    pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
    int type = msg->sz >> MESSAGE_TYPE_SHIFT;
    size_t sz = msg->sz & MESSAGE_TYPE_MASK;
    if (ctx->logfile) {
        skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
    }
    ++ctx->message_count;
    int reserve_msg;
    if (ctx->profile) {
        ctx->cpu_start = skynet_thread_time();
//这里回调了,看到cb_ud了没有,它会在回调时传进去
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
        uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
        ctx->cpu_cost += cost_time;
    } else {
//这里回调了,看到cb_ud了没有,它会在回调时传进去
        reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz); 
    }
    if (!reserve_msg) {
        skynet_free(msg->data);
    }
    CHECKCALLING_END(ctx)
}

第一部分C服务回调的注册和调用到此就完全清楚了。

第二部分:LUA服务的回调

LUA服务的回调注册和回调的调用层次比较多,也不直观,所以理解起来难度会比较大,我尽可能地把这个过程简化描述。

在写服务的时候,消息注册的惯用法为:

local CMD = {}

skynet.dispatch("lua", function(session, source, cmd, ...)
  local f = assert(CMD[cmd])
  f(...)
end)

这个skynet.dispatch会把消息的回调函数注册到服务。当然这不是唯一的途径,但是这篇文章只讲这个途径。

依然从skynet/lualib/skynet.lua中找dispatch,找到了的话会是这样的:

function skynet.dispatch(typename, func)
    local p = proto[typename]
    if func then
        local ret = p.dispatch
        p.dispatch = func
        return ret
    else
        return p and p.dispatch
    end
end

这个proto是什么呢?好像到了这就分析不下去了,这个函数和c底层的ctx->cb有什么关系?还是看个全一点的东西比较好,找个能正经干活的例子来看选一个skynet/example/simpledb.lua试试。

skynet.start(function()
        skynet.dispatch("lua", function(session, address, cmd, ...)
                cmd = cmd:upper()
                if cmd == "PING" then
                        assert(session == 0)
                        local str = (...)
                        if #str > 20 then
                                str = str:sub(1,20) .. "...(" .. #str .. ")"
                        end
                        skynet.error(string.format("%s ping %s", skynet.address(address), str))
                        return
                end
                local f = command[cmd]
                if f then
                        skynet.ret(skynet.pack(f(...)))
                else
                        error(string.format("Unknown command %s", tostring(cmd)))
                end
        end)
        skynet.register "SIMPLEDB"
end)

好,出现新花样了,skynet.start这个函数

function skynet.start(start_func)
    c.callback(skynet.dispatch_message) -- skynet.core.callback
    skynet.timeout(0, function()
        skynet.init_service(start_func) --服务初始化
    end)
end

这里出现了前面说的,分析a的时候涉及到b.c.d的问题。一个一个来吧,先看c.callback这个东西是干什么用的。
1.c.callback
c.callback最终定位到是在skynet/lualib-src/lua-skynet.c这个文件中,具体跟踪过程和第6篇中一样。

static int
lcallback(lua_State *L) {
    struct skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
    int forward = lua_toboolean(L, 2);
    luaL_checktype(L,1,LUA_TFUNCTION); //检查是不是数据类型是不是函数
    lua_settop(L,1);
    lua_rawsetp(L, LUA_REGISTRYINDEX, _cb); //把_cb保存到用户表里,详见lua参考手册

    lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);
    lua_State *gL = lua_tothread(L,-1);

    if (forward) {
        skynet_callback(context, gL, forward_cb);
    } else {
        skynet_callback(context, gL, _cb); //这个地方调用了C函数
    }

    return 0;
}
//设置回调
void 
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
    context->cb = cb; //看这里
    context->cb_ud = ud;
}

很明显,c.callback就是设置回调函数到服务(模块)的上下文中,而且设置的是skynet.dispatch_message这个lua方法为回调函数。

2.从代码中看到,最终调用了skynet_callback这个C函数,这个C函数的第三个参数,是一个中转函数。所以lua服务的回调它不是被直接调的,首先要在_cb这个函数处理一下数据,在_cb里面去调lua的回调函数。_cb这个函数主要就是按照Lua api的协议,将参数准备好,然后调lua的函数。在第9篇分析过,这里简短地介绍一下:

static int
_cb(struct skynet_context * context, void * ud, int type, int session, uint32_t source, const void * msg, size_t sz) {
    lua_State *L = ud;
    int trace = 1;
    int r;
    int top = lua_gettop(L);
    if (top == 0) {
        lua_pushcfunction(L, traceback); //错误处理的函数
        lua_rawgetp(L, LUA_REGISTRYINDEX, _cb); //把表里的回调函数取出来
    } else {
        assert(top == 2);
    }
    lua_pushvalue(L,2); //回调函数入栈

    lua_pushinteger(L, type);  //参数type入栈
    lua_pushlightuserdata(L, (void *)msg); //参数msg入栈
    lua_pushinteger(L,sz); //参数sz,消息长度,入栈
    lua_pushinteger(L, session); //参数session入栈
    lua_pushinteger(L, source); //参数session入栈

    r = lua_pcall(L, 5, 0 , trace); //调用lua的回调函数,也就是skynet.dispatch_message

    if (r == LUA_OK) {
        return 0;
    }
    const char * self = skynet_command(context, "REG", NULL);
    switch (r) {
    case LUA_ERRRUN:
        skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : " KRED "%s" KNRM, source , self, session, sz, lua_tostring(L,-1));
        break;
    case LUA_ERRMEM:
        skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
        break;
    case LUA_ERRERR:
        skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
        break;
    case LUA_ERRGCMM:
        skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
        break;
    };

    lua_pop(L,1);

    return 0;
}

3.skynet.dispatch_message,这个函数呢又涉及到lua的协程,这个协程暂时不讲,我把函数精简一下。dispatch_message实际调的是raw_dispatch_message,这是个lua函数。


local function raw_dispatch_message(prototype, msg, sz, session, source)
    -- skynet.PTYPE_RESPONSE = 1, read skynet.h
    if prototype == 1 then
        local co = session_id_coroutine[session]
        if co == "BREAK" then
            session_id_coroutine[session] = nil
        elseif co == nil then
            unknown_response(session, source, msg, sz)
        else
            session_id_coroutine[session] = nil
            suspend(co, coroutine_resume(co, true, msg, sz))
        end
    else
        local p = proto[prototype] --skynet.dispatch对应的proto
        if p == nil then
            if session ~= 0 then
                c.send(source, skynet.PTYPE_ERROR, session, "")
            else
                unknown_request(session, source, msg, sz, prototype)
            end
            return
        end
        local f = p.dispatch  --取真正的回调函数,也就是skynet.dispath设的那个函数
        if f then
            local ref = watching_service[source]
            if ref then
                watching_service[source] = ref + 1
            else
                watching_service[source] = 1
            end
            local co = co_create(f)  --这里创建协程
            session_coroutine_id[co] = session
            session_coroutine_address[co] = source
            suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz))) //这里唤醒协程
        elseif session ~= 0 then
            c.send(source, skynet.PTYPE_ERROR, session, "")
        else
            unknown_request(session, source, msg, sz, proto[prototype].name)
        end
    end
end

第二部分到了这里,基本上流程就清楚了。第一步,skynet.dispatch把回调注册到proto表中,并在表中设置服务的回调函数。第二步,skynet.start会调用C层的lcallback函数,把lua函数skynet.dispatch_message设为lua层伪回调,这个伪回调被存在用户表里,这个lua层的伪回调会被C层的伪回调所调用。这个lua层的伪回调从proto表中取到dispatch注册的真正的服务的回调函数,然后调用它。第三步,lcallback函数会设置一个C层的伪回调,这个伪回调的作用是做c到lua层的协议转换。

语言描述可能还是不能为所有人理解,画个简单的关系图吧

skynet.dispatch(callback) ---------------------------> proto[typename].dispach = callback
                                                                        |
skynet.core.callback(skynet.dispatch_message) -----------tbl[k] = skynet.dispatch_message
                                                                        |          
                                                                        |
C dispatch_message->_cb ------------------------------------------------|

也就是说C层弄了一个函数叫_cb,它在lua注册服务时被注册。回调时被调用,回调时做lua api协议适配,然后取用户表里的一个lua回调函数,这个回调函数叫做skynet.dispatch_message。这个dispatch_message又会去一个叫proto的表里找到服务真正的回调函数,而这个真正的回调函数是通过skynet.dispatch注册到proto[typename].dispatch上的。

如果还是有人不理解我就无能为力了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容