准备
项目结构
$ tree -L 2
.
|-- _build
| |-- default
|-- erlang_ls.config
|-- get-rebar3
|-- include
| |-- emqx_plugin_kafka.hrl #头文件
|-- LICENSE
|-- Makefile
|-- priv
| |-- emqx_plugin_kafka.hocon #配置文件
| |-- example
|-- README.md
|-- rebar.config
|-- rebar.lock
|-- src
|-- emqx_plugin_kafka_app.erl #app启动入口
|-- emqx_plugin_kafka.app.src #app描述
|-- emqx_plugin_kafka.erl #加载/卸载应用
|-- emqx_plugin_kafka_evt.erl #数据转换为事件格式
|-- emqx_plugin_kafka_hook.erl #钩子处理
|-- emqx_plugin_kafka_producer.erl #kafka连接、生产者 emqx_resource
|-- emqx_plugin_kafka_schema.erl #配置格式定义
|-- emqx_plugin_kafka_sup.erl
|-- emqx_plugin_kafka_util.erl #工具集
启动流程
emqx_plugin_kafka_app:start/2
启动应用emqx_plugin_kafka:load/0
加载入口,read_config/0
读取配置并填入默认值,emqx_plugin_kafka_util:check_crc32cer_nif/0
确保crc32cer NIF
加载成功,start_resource/0
启动kafka
连接器,hooks/0
加载钩子emqx_plugin_kafka_hook:hooks/3
解析钩子配置并挂载,在kafka
连接器中添加挂载点对应的生成者进程触发钩子回调后,调用
emqx_plugin_kafka_evt
模块的evtmsg_func
函数将数据转换为事件数据,并发送到kafka
生产者进程中投递数据
代码说明
emqx_plugin_kafka_schema.erl
% hocon配置文件的格式定义,包含此字段数据类型、是否必须、默认值、描述等
% -----
roots() -> [plugin_kafka].
fields(plugin_kafka) ->
[
{connection, ?HOCON(?R_REF(connection), #{desc => ?DESC("connect_timeout")})},
{producer, ?HOCON(?R_REF(producer), #{desc => ?DESC("connect_timeout")})},
{hooks, ?HOCON(?ARRAY(?R_REF(hook)),
#{
required => true,
default => [],
desc => ?DESC("hooks")
})}
];
% -----
emqx_plugin_kafka.erl
% -----
load() ->
%% 读取配置 默认为相对路径: etc/emqx_plugin_kafka.hocon
load(read_config()).
load(Conf = #{connection := _, producer := _, hooks := _}) ->
%% 检查 crc32cer NIF
emqx_plugin_kafka_util:check_crc32cer_nif(),
%% 启动 emqx_plugin_kafka 的资源管理器
{ok, _} = start_resource(Conf),
%% 挂载钩子
hooks(Conf);
load(_) ->
{error, "config_error"}.
% -----
unload() ->
%% 卸载钩子
emqx_plugin_kafka_hook:unhook(),
ResId = emqx_plugin_kafka_util:resource_id(),
%% 移除资源
emqx_resource:remove_local(ResId).
% -----
emqx_plugin_kafka_hook.erl
% -----
hooks([Hook | T], Producer, Acc) ->
Ret = hook(emqx_plugin_kafka_util:resource_id(), Hook#{producer => Producer}),
hooks(T, Producer, [Ret | Acc]);
hooks([], _, Acc) ->
%%存入persistent_term中,供资源管理器使用
persistent_term:put({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, Acc).
hook(ResId, Hook = #{endpoint := Endpoint0, filter := Filter}) ->
{ok, Endpoint} = emqx_utils:safe_to_existing_atom(Endpoint0),
ChannelId = emqx_plugin_kafka_util:channel_id(Endpoint),
%%在资源管理器中添加通道数据
emqx_resource_manager:add_channel(ResId, ChannelId, Hook),
Opts = #{
channel_id => ChannelId,
filter => Filter
},
%%触发钩子
trigger_hook(Endpoint, endpoint_func(Endpoint), Opts),
{ChannelId, Hook}.
trigger_hook(_, undefined, _) ->
ok;
trigger_hook(Endpoint, Func, Opts) ->
emqx_hooks:add(Endpoint, {?MODULE, Func, [Opts]}, _Property = ?HP_HIGHEST).
% -----
%%消息发布
on_message_publish(Message, Opts = #{filter := Filter}) ->
%%匹配主题
case match_topic(Message, Filter) of
true ->
%%发送至资源管理器处理
query(?evt_mod:eventmsg_publish(Message), Opts);
false ->
ok
end,
{ok, Message}.
% -----
emqx_plugin_kafka_evt.erl
% -----
%%消息发布
eventmsg_publish(
Message = #message{
id = Id,
from = ClientId,
qos = QoS,
flags = Flags,
topic = Topic,
payload = Payload,
timestamp = Timestamp
}
) ->
with_basic_columns(
%%转换为kafka的事件数据
'message.publish',
#{
id => emqx_guid:to_hexstr(Id),
clientid => ClientId,
username => emqx_message:get_header(username, Message, undefined),
payload => Payload,
peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
topic => Topic,
qos => QoS,
flags => Flags,
pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
publish_received_at => Timestamp
}
).
% -----
emqx_plugin_kafka_producer.erl
% -----
%%query模式
query_mode(_) ->
simple_async_internal_buffer.
%%callback模式
callback_mode() ->
async_if_possible.
%%上面那个模式对应为on_query_async的异步处理
%%资源初始化时操作
on_start(
_InstId,
#{connection := Connection}
) ->
C = fun(Key) -> emqx_plugin_kafka_util:check_config(Key, Connection) end,
Hosts = C(bootstrap_hosts),
ClientId = C(client_id),
ClientConfig = #{
connect_timeout => C(connect_timeout),
connection_strategy => C(connection_strategy),
min_metadata_refresh_interval => C(min_metadata_refresh_interval),
query_api_versions => C(query_api_versions),
request_timeout => C(request_timeout),
sasl => C(sasl),
ssl => C(ssl)
},
%%开启kafka连接器
ok = ensure_client(ClientId, Hosts, ClientConfig),
%%检查客户端连通性
case check_client_connectivity(ClientId) of
ok ->
{ok, #{
client_id => ClientId,
channels => #{}
}};
{error, {find_client, Reason}} ->
%% Race condition? Crash? We just checked it with `ensure_client'...
{error, Reason};
{error, {connectivity, Reason}} ->
{error, Reason}
end.
%%资源状态检查
on_get_status(
_InstId,
#{client_id := ClientId} = State
) ->
case check_client_connectivity(ClientId) of
ok ->
?status_connected;
{error, {find_client, _Error}} ->
?status_connecting;
{error, {connectivity, Error}} ->
{?status_connecting, State, Error}
end.
%%资源停止
on_stop(_InstId, #{client_id := ClientId, channels := Channels}) ->
?SLOG(info, #{
msg => "kafka_client_on_stop",
client_id => ClientId
}),
%%移除生产者进程
maps:foreach(fun(_, ChannelState) -> remove_producers(ClientId, ChannelState) end, Channels),
%%关闭连接
deallocate_client(ClientId),
%%销毁persistent_term
persistent_term:erase({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}),
ok.
%%添加通道数据
on_add_channel(
InstId,
#{
client_id := ClientId,
channels := Channels
} = OldState,
ChannelId,
ChannelConfig
) ->
%%开启kafka生产者
{ok, ChannelState} = start_producers(InstId, ChannelId, ClientId, ChannelConfig),
NChannels = maps:put(ChannelId, ChannelState, Channels),
NewState = OldState#{channels => NChannels},
{ok, NewState}.
on_get_channels(_InstId) ->
persistent_term:get({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, []).
on_get_channel_status(
_InstId,
_ChannelId,
_State
) ->
?status_connected.
%%移除通道数据
on_remove_channel(
_InstId,
#{
client_id := ClientId,
channels := Channels
} = OldState,
ChannelId
) ->
case maps:take(ChannelId, Channels) of
{ChannelState, NChannels} ->
remove_producers(ClientId, ChannelState),
NewState = OldState#{channels => NChannels},
{ok, NewState};
error ->
{ok, OldState}
end.
%%异步执行query
on_query_async(
InstId,
{ChannelId, Message},
_,
#{channels := Channels} = _ConnectorState
) ->
#{
message_template := Template,
producers := Producers,
encode_payload_type := EncodePayloadType
} = maps:get(ChannelId, Channels),
try
%%模板变量渲染
KafkaMessage = render_message(Template, Message, EncodePayloadType),
%%发送至kafka
do_send_msg(KafkaMessage, Producers)
catch
Error:Reason :Stack ->
?SLOG(error, #{
msg => "emqx_plugin_kafka_producer on_query_async error",
error => Error,
instId => InstId,
reason => Reason,
stack => Stack
}),
{error, {Error, Reason}}
end.
% -----