EMQX - 源码阅读(六)

准备

钩子

emqx_hooks 模块负责实现钩子(hooks)机制,这是一种允许开发人员在 MQTT 消息代理的特定操作点插入自定义逻辑的功能。钩子可以被视为插件或回调,它们在特定的事件发生时被触发,例如当一个客户端成功连接到代理服务器或当一个消息被发布时。

钩子允许开发者监听和干预 EMQX 运行时的关键事件,从而实现额外的业务逻辑,如自定义认证、权限检查、统计信息收集或数据的后处理等。

通过使用钩子,EMQX 提供了一种强大而灵活的方式来扩展和定制 MQTT 消息代理的行为。这使得 EMQX 不仅是一个高性能的 MQTT 代理,也是一个可高度定制和可扩展的消息处理平台。

%%===== emqx_hooks.erl
% -----
init([]) ->
    ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
    %预定义的钩子点
    ok = emqx_hookpoints:register_hookpoints(),
    {ok, #{}}.

handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
    Reply =
        case
            lists:any(
                fun(#callback{action = {M0, F0, _}}) ->
                    M0 =:= M andalso F0 =:= F
                end,
                Callbacks = lookup(HookPoint)
            )
        of
            true -> {error, already_exists};%同个钩子下的action中回调函数中,无论参数几个函数名必须唯一
            false -> insert_hook(HookPoint, add_callback(Callback, Callbacks))
        end,
    {reply, Reply, State};
handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
    Callbacks = del_callback({M, F}, lookup(HookPoint)),
    Reply = update_hook(HookPoint, add_callback(Callback, Callbacks)),
    {reply, Reply, State};
handle_call(Req, _From, State) ->
    ?SLOG(error, #{msg => "unexpected_call", req => Req}),
    {reply, ignored, State}.

handle_cast({del, HookPoint, Action}, State) ->
    case del_callback(Action, lookup(HookPoint)) of
        [] ->
            ets:delete(?TAB, HookPoint);
        Callbacks ->
            insert_hook(HookPoint, Callbacks)
    end,
    {noreply, State};
% -----

钩子常用的函数

%%===== emqx_channel.erl
% -----
%% @doc `put/3,4` updates the existing hook, add it if not exists.
%钩子不存在时add,存在时删除旧的钩子,再添加
-spec put(hookpoint(), action(), integer()) -> ok.
put(HookPoint, Action, Priority) when is_integer(Priority) ->
    do_put(HookPoint, #callback{action = Action, priority = Priority}).

-spec put(hookpoint(), action(), integer(), filter()) -> ok.
put(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
    do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).

do_put(HookPoint, Callback) ->
    ok = emqx_hookpoints:verify_hookpoint(HookPoint),
    case do_add(HookPoint, Callback) of
        ok -> ok;
        {error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity)
    end.

%% @doc Unregister a callback.
-spec del(hookpoint(), action() | {module(), atom()}) -> ok.
del(HookPoint, Action) ->
    gen_server:cast(?SERVER, {del, HookPoint, Action}).

%% @doc Run hooks.
%执行回调函数
-spec run(hookpoint(), list(Arg :: term())) -> ok.
run(HookPoint, Args) ->
    ok = emqx_hookpoints:verify_hookpoint(HookPoint),
    do_run(lookup(HookPoint), Args).

%% @doc Run hooks with Accumulator.
%执行回调函数附带累加器
-spec run_fold(hookpoint(), list(Arg :: term()), Acc :: term()) -> Acc :: term().
run_fold(HookPoint, Args, Acc) ->
    ok = emqx_hookpoints:verify_hookpoint(HookPoint),
    do_run_fold(lookup(HookPoint), Args, Acc).

do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
    case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
        %% stop the hook chain and return
        stop -> ok;
        %% continue the hook chain, in following cases:
        %%   - the filter validation failed with 'false'
        %%   - the callback returns any term other than 'stop'
        _ -> do_run(Callbacks, Args)
    end;
do_run([], _Args) ->
    ok.

do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
    Args1 = Args ++ [Acc],
    case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
        %% stop the hook chain
        stop -> Acc;
        %% stop the hook chain with NewAcc
        {stop, NewAcc} -> NewAcc;
        %% continue the hook chain with NewAcc
        {ok, NewAcc} -> do_run_fold(Callbacks, Args, NewAcc);
        %% continue the hook chain, in following cases:
        %%   - the filter validation failed with 'false'
        %%   - the callback returns any term other than 'stop' or {'stop', NewAcc}
        _ -> do_run_fold(Callbacks, Args, Acc)
    end;
do_run_fold([], _Args, Acc) ->
    Acc.
% -----

可以在一个钩子上注册多个回调,执行顺序如下

  • 优先级高的回调先执行,比如 priority = 2priority = 1 之前执行
  • 同样优先级的情况下,回调函数组成的结构值小先执行,比如 action1 = {foo1, bar, undefined}action1 = {foo2, bar, undefined}action1<action2action1先执行
%%===== emqx_hooks.hrl

%% Definitions for Hook Priorities

%% Highest Priority = 1000, don't change this value as the plugins may depend on it.
-define(HP_HIGHEST, 1000).

%% hooks used by the emqx core app
-define(HP_PSK, 990).
-define(HP_REWRITE, 980).
-define(HP_AUTHN, 970).
-define(HP_AUTHZ, 960).
-define(HP_SYS_MSGS, 950).
-define(HP_TOPIC_METRICS, 940).
-define(HP_RETAINER, 930).
-define(HP_AUTO_SUB, 920).
-define(HP_RULE_ENGINE, 900).
%% apps that can work with the republish action
-define(HP_SLOW_SUB, 880).
-define(HP_BRIDGE, 870).
-define(HP_DELAY_PUB, 860).
%% apps that can stop the hooks chain from continuing
-define(HP_NODE_REBALANCE, 110).
-define(HP_EXHOOK, 100).

%% == Lowest Priority = 0, don't change this value as the plugins may depend on it.
-define(HP_LOWEST, 0).

插件(erlang)

初识 文章中介绍使用官方的插件模板,创建一个应用 my_emqx_plugin。应用启动模块如下

%%===== my_emqx_plugin_app.erl
% -----
start(_StartType, _StartArgs) ->
    {ok, Sup} = my_emqx_plugin_sup:start_link(),
    %加载插件
    my_emqx_plugin:load(application:get_all_env()),
    %注册控制台命令 my_emqx_plugin
    emqx_ctl:register_command(my_emqx_plugin, {my_emqx_plugin_cli, cmd}),
    {ok, Sup}.

stop(_State) ->
    emqx_ctl:unregister_command(my_emqx_plugin),
    my_emqx_plugin:unload().
% -----

my_emqx_plugin 模块

%%===== my_emqx_plugin.erl
% -----

%% Called when the plugin application start
load(Env) ->
    %挂载钩子
    hook('client.connect',      {?MODULE, on_client_connect, [Env]}),
% -----
%% Called when the plugin application stop
unload() ->
    %卸载钩子
    unhook('client.connect',      {?MODULE, on_client_connect}),
% -----   
hook(HookPoint, MFA) ->
    %% use highest hook priority so this module's callbacks
    %% are evaluated before the default hooks in EMQX
    emqx_hooks:add(HookPoint, MFA, _Property = ?HP_HIGHEST).

unhook(HookPoint, MFA) ->
    emqx_hooks:del(HookPoint, MFA).
% -----
%钩子回调
on_client_connect(ConnInfo, Props, _Env) ->
    %% this is to demo the usage of EMQX's structured-logging macro
    %% * Recommended to always have a `msg` field,
    %% * Use underscore instead of space to help log indexers,
    %% * Try to use static fields
    ?SLOG(debug, #{msg => "demo_log_msg_on_client_connect",
                   conninfo => ConnInfo,
                   props => Props}),
    %% If you want to refuse this connection, you should return with:
    %% {stop, {error, ReasonCode}}
    %% the ReasonCode can be found in the emqx_reason_codes.erl
    {ok, Props}.
% -----

my_emqx_plugin_cli 模块

%%===== emqx_channel.erl
% -----
%控制台调用 my_emqx_plugin 命令时回调
cmd(["arg1", "arg2"]) ->
    emqx_ctl:print("ok");

cmd(_) ->
    emqx_ctl:usage([{"cmd arg1 arg2", "cmd demo"}]).
% -----
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容