准备
钩子
emqx_hooks
模块负责实现钩子(hooks
)机制,这是一种允许开发人员在 MQTT
消息代理的特定操作点插入自定义逻辑的功能。钩子可以被视为插件或回调,它们在特定的事件发生时被触发,例如当一个客户端成功连接到代理服务器或当一个消息被发布时。
钩子允许开发者监听和干预 EMQX
运行时的关键事件,从而实现额外的业务逻辑,如自定义认证、权限检查、统计信息收集或数据的后处理等。
通过使用钩子,EMQX
提供了一种强大而灵活的方式来扩展和定制 MQTT
消息代理的行为。这使得 EMQX
不仅是一个高性能的 MQTT
代理,也是一个可高度定制和可扩展的消息处理平台。
%%===== emqx_hooks.erl
% -----
init([]) ->
ok = emqx_utils_ets:new(?TAB, [{keypos, #hook.name}, {read_concurrency, true}]),
%预定义的钩子点
ok = emqx_hookpoints:register_hookpoints(),
{ok, #{}}.
handle_call({add, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
Reply =
case
lists:any(
fun(#callback{action = {M0, F0, _}}) ->
M0 =:= M andalso F0 =:= F
end,
Callbacks = lookup(HookPoint)
)
of
true -> {error, already_exists};%同个钩子下的action中回调函数中,无论参数几个函数名必须唯一
false -> insert_hook(HookPoint, add_callback(Callback, Callbacks))
end,
{reply, Reply, State};
handle_call({put, HookPoint, Callback = #callback{action = {M, F, _}}}, _From, State) ->
Callbacks = del_callback({M, F}, lookup(HookPoint)),
Reply = update_hook(HookPoint, add_callback(Callback, Callbacks)),
{reply, Reply, State};
handle_call(Req, _From, State) ->
?SLOG(error, #{msg => "unexpected_call", req => Req}),
{reply, ignored, State}.
handle_cast({del, HookPoint, Action}, State) ->
case del_callback(Action, lookup(HookPoint)) of
[] ->
ets:delete(?TAB, HookPoint);
Callbacks ->
insert_hook(HookPoint, Callbacks)
end,
{noreply, State};
% -----
钩子常用的函数
%%===== emqx_channel.erl
% -----
%% @doc `put/3,4` updates the existing hook, add it if not exists.
%钩子不存在时add,存在时删除旧的钩子,再添加
-spec put(hookpoint(), action(), integer()) -> ok.
put(HookPoint, Action, Priority) when is_integer(Priority) ->
do_put(HookPoint, #callback{action = Action, priority = Priority}).
-spec put(hookpoint(), action(), integer(), filter()) -> ok.
put(HookPoint, Action, Priority, Filter) when is_integer(Priority) ->
do_put(HookPoint, #callback{action = Action, filter = Filter, priority = Priority}).
do_put(HookPoint, Callback) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
case do_add(HookPoint, Callback) of
ok -> ok;
{error, already_exists} -> gen_server:call(?SERVER, {put, HookPoint, Callback}, infinity)
end.
%% @doc Unregister a callback.
-spec del(hookpoint(), action() | {module(), atom()}) -> ok.
del(HookPoint, Action) ->
gen_server:cast(?SERVER, {del, HookPoint, Action}).
%% @doc Run hooks.
%执行回调函数
-spec run(hookpoint(), list(Arg :: term())) -> ok.
run(HookPoint, Args) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
do_run(lookup(HookPoint), Args).
%% @doc Run hooks with Accumulator.
%执行回调函数附带累加器
-spec run_fold(hookpoint(), list(Arg :: term()), Acc :: term()) -> Acc :: term().
run_fold(HookPoint, Args, Acc) ->
ok = emqx_hookpoints:verify_hookpoint(HookPoint),
do_run_fold(lookup(HookPoint), Args, Acc).
do_run([#callback{action = Action, filter = Filter} | Callbacks], Args) ->
case filter_passed(Filter, Args) andalso safe_execute(Action, Args) of
%% stop the hook chain and return
stop -> ok;
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop'
_ -> do_run(Callbacks, Args)
end;
do_run([], _Args) ->
ok.
do_run_fold([#callback{action = Action, filter = Filter} | Callbacks], Args, Acc) ->
Args1 = Args ++ [Acc],
case filter_passed(Filter, Args1) andalso safe_execute(Action, Args1) of
%% stop the hook chain
stop -> Acc;
%% stop the hook chain with NewAcc
{stop, NewAcc} -> NewAcc;
%% continue the hook chain with NewAcc
{ok, NewAcc} -> do_run_fold(Callbacks, Args, NewAcc);
%% continue the hook chain, in following cases:
%% - the filter validation failed with 'false'
%% - the callback returns any term other than 'stop' or {'stop', NewAcc}
_ -> do_run_fold(Callbacks, Args, Acc)
end;
do_run_fold([], _Args, Acc) ->
Acc.
% -----
可以在一个钩子上注册多个回调,执行顺序如下
- 优先级高的回调先执行,比如
priority = 2
在priority = 1
之前执行 - 同样优先级的情况下,回调函数组成的结构值小先执行,比如
action1 = {foo1, bar, undefined}
、action1 = {foo2, bar, undefined}
,action1<action2
,action1
先执行
%%===== emqx_hooks.hrl
%% Definitions for Hook Priorities
%% Highest Priority = 1000, don't change this value as the plugins may depend on it.
-define(HP_HIGHEST, 1000).
%% hooks used by the emqx core app
-define(HP_PSK, 990).
-define(HP_REWRITE, 980).
-define(HP_AUTHN, 970).
-define(HP_AUTHZ, 960).
-define(HP_SYS_MSGS, 950).
-define(HP_TOPIC_METRICS, 940).
-define(HP_RETAINER, 930).
-define(HP_AUTO_SUB, 920).
-define(HP_RULE_ENGINE, 900).
%% apps that can work with the republish action
-define(HP_SLOW_SUB, 880).
-define(HP_BRIDGE, 870).
-define(HP_DELAY_PUB, 860).
%% apps that can stop the hooks chain from continuing
-define(HP_NODE_REBALANCE, 110).
-define(HP_EXHOOK, 100).
%% == Lowest Priority = 0, don't change this value as the plugins may depend on it.
-define(HP_LOWEST, 0).
插件(erlang)
在 初识
文章中介绍使用官方的插件模板,创建一个应用 my_emqx_plugin
。应用启动模块如下
%%===== my_emqx_plugin_app.erl
% -----
start(_StartType, _StartArgs) ->
{ok, Sup} = my_emqx_plugin_sup:start_link(),
%加载插件
my_emqx_plugin:load(application:get_all_env()),
%注册控制台命令 my_emqx_plugin
emqx_ctl:register_command(my_emqx_plugin, {my_emqx_plugin_cli, cmd}),
{ok, Sup}.
stop(_State) ->
emqx_ctl:unregister_command(my_emqx_plugin),
my_emqx_plugin:unload().
% -----
my_emqx_plugin
模块
%%===== my_emqx_plugin.erl
% -----
%% Called when the plugin application start
load(Env) ->
%挂载钩子
hook('client.connect', {?MODULE, on_client_connect, [Env]}),
% -----
%% Called when the plugin application stop
unload() ->
%卸载钩子
unhook('client.connect', {?MODULE, on_client_connect}),
% -----
hook(HookPoint, MFA) ->
%% use highest hook priority so this module's callbacks
%% are evaluated before the default hooks in EMQX
emqx_hooks:add(HookPoint, MFA, _Property = ?HP_HIGHEST).
unhook(HookPoint, MFA) ->
emqx_hooks:del(HookPoint, MFA).
% -----
%钩子回调
on_client_connect(ConnInfo, Props, _Env) ->
%% this is to demo the usage of EMQX's structured-logging macro
%% * Recommended to always have a `msg` field,
%% * Use underscore instead of space to help log indexers,
%% * Try to use static fields
?SLOG(debug, #{msg => "demo_log_msg_on_client_connect",
conninfo => ConnInfo,
props => Props}),
%% If you want to refuse this connection, you should return with:
%% {stop, {error, ReasonCode}}
%% the ReasonCode can be found in the emqx_reason_codes.erl
{ok, Props}.
% -----
my_emqx_plugin_cli
模块
%%===== emqx_channel.erl
% -----
%控制台调用 my_emqx_plugin 命令时回调
cmd(["arg1", "arg2"]) ->
emqx_ctl:print("ok");
cmd(_) ->
emqx_ctl:usage([{"cmd arg1 arg2", "cmd demo"}]).
% -----