EMQX - Kafka 插件(二)

准备

项目结构

$ tree -L 2
.
|-- _build
|   |-- default
|-- erlang_ls.config
|-- get-rebar3
|-- include
|   |-- emqx_plugin_kafka.hrl           #头文件
|-- LICENSE
|-- Makefile
|-- priv
|   |-- emqx_plugin_kafka.hocon         #配置文件
|   |-- example
|-- README.md
|-- rebar.config
|-- rebar.lock
|-- src
    |-- emqx_plugin_kafka_app.erl       #app启动入口
    |-- emqx_plugin_kafka.app.src       #app描述
    |-- emqx_plugin_kafka.erl           #加载/卸载应用
    |-- emqx_plugin_kafka_evt.erl       #数据转换为事件格式
    |-- emqx_plugin_kafka_hook.erl      #钩子处理
    |-- emqx_plugin_kafka_producer.erl  #kafka连接、生产者 emqx_resource
    |-- emqx_plugin_kafka_schema.erl    #配置格式定义
    |-- emqx_plugin_kafka_sup.erl
    |-- emqx_plugin_kafka_util.erl      #工具集

启动流程

  • emqx_plugin_kafka_app:start/2 启动应用

  • emqx_plugin_kafka:load/0 加载入口,read_config/0 读取配置并填入默认值,emqx_plugin_kafka_util:check_crc32cer_nif/0 确保 crc32cer NIF 加载成功,start_resource/0 启动 kafka 连接器,hooks/0 加载钩子

  • emqx_plugin_kafka_hook:hooks/3 解析钩子配置并挂载,在 kafka 连接器中添加挂载点对应的生成者进程

  • 触发钩子回调后,调用 emqx_plugin_kafka_evt 模块的 evtmsg_func 函数将数据转换为事件数据,并发送到 kafka 生产者进程中投递数据

代码说明

emqx_plugin_kafka_schema.erl

% hocon配置文件的格式定义,包含此字段数据类型、是否必须、默认值、描述等
% -----
roots() -> [plugin_kafka].

fields(plugin_kafka) ->
    [
        {connection, ?HOCON(?R_REF(connection), #{desc => ?DESC("connect_timeout")})},
        {producer, ?HOCON(?R_REF(producer), #{desc => ?DESC("connect_timeout")})},
        {hooks, ?HOCON(?ARRAY(?R_REF(hook)),
            #{
                required => true,
                default => [],
                desc => ?DESC("hooks")
            })}
    ];
% -----

emqx_plugin_kafka.erl

% -----
load() ->
    %% 读取配置 默认为相对路径: etc/emqx_plugin_kafka.hocon
    load(read_config()).

load(Conf = #{connection := _, producer := _, hooks := _}) ->
    %% 检查 crc32cer NIF
    emqx_plugin_kafka_util:check_crc32cer_nif(),
    %% 启动 emqx_plugin_kafka 的资源管理器
    {ok, _} = start_resource(Conf),
    %% 挂载钩子
    hooks(Conf);
load(_) ->
    {error, "config_error"}.
% -----
unload() ->
    %% 卸载钩子
    emqx_plugin_kafka_hook:unhook(),
    ResId = emqx_plugin_kafka_util:resource_id(),
    %% 移除资源
    emqx_resource:remove_local(ResId).
% -----

emqx_plugin_kafka_hook.erl

% -----
hooks([Hook | T], Producer, Acc) ->
    Ret = hook(emqx_plugin_kafka_util:resource_id(), Hook#{producer => Producer}),
    hooks(T, Producer, [Ret | Acc]);
hooks([], _, Acc) ->
    %%存入persistent_term中,供资源管理器使用
    persistent_term:put({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, Acc).

hook(ResId, Hook = #{endpoint := Endpoint0, filter := Filter}) ->
    {ok, Endpoint} = emqx_utils:safe_to_existing_atom(Endpoint0),
    ChannelId = emqx_plugin_kafka_util:channel_id(Endpoint),
    %%在资源管理器中添加通道数据
    emqx_resource_manager:add_channel(ResId, ChannelId, Hook),
    Opts = #{
        channel_id => ChannelId,
        filter => Filter
    },
    %%触发钩子
    trigger_hook(Endpoint, endpoint_func(Endpoint), Opts),
    {ChannelId, Hook}.

trigger_hook(_, undefined, _) ->
    ok;
trigger_hook(Endpoint, Func, Opts) ->
    emqx_hooks:add(Endpoint, {?MODULE, Func, [Opts]}, _Property = ?HP_HIGHEST).
% -----
%%消息发布
on_message_publish(Message, Opts = #{filter := Filter}) ->
    %%匹配主题
    case match_topic(Message, Filter) of
        true ->
            %%发送至资源管理器处理
            query(?evt_mod:eventmsg_publish(Message), Opts);
        false ->
            ok
    end,
    {ok, Message}.
% -----

emqx_plugin_kafka_evt.erl

% -----
%%消息发布
eventmsg_publish(
    Message = #message{
        id = Id,
        from = ClientId,
        qos = QoS,
        flags = Flags,
        topic = Topic,
        payload = Payload,
        timestamp = Timestamp
    }
) ->
    with_basic_columns(
        %%转换为kafka的事件数据
        'message.publish',
        #{
            id => emqx_guid:to_hexstr(Id),
            clientid => ClientId,
            username => emqx_message:get_header(username, Message, undefined),
            payload => Payload,
            peerhost => ntoa(emqx_message:get_header(peerhost, Message, undefined)),
            topic => Topic,
            qos => QoS,
            flags => Flags,
            pub_props => printable_maps(emqx_message:get_header(properties, Message, #{})),
            publish_received_at => Timestamp
        }
    ).
% -----

emqx_plugin_kafka_producer.erl

% -----
%%query模式
query_mode(_) ->
    simple_async_internal_buffer.

%%callback模式
callback_mode() ->
    async_if_possible.

%%上面那个模式对应为on_query_async的异步处理

%%资源初始化时操作
on_start(
    _InstId,
    #{connection := Connection}
) ->
    C = fun(Key) -> emqx_plugin_kafka_util:check_config(Key, Connection) end,
    Hosts = C(bootstrap_hosts),
    ClientId = C(client_id),
    ClientConfig = #{
        connect_timeout => C(connect_timeout),
        connection_strategy => C(connection_strategy),
        min_metadata_refresh_interval => C(min_metadata_refresh_interval),
        query_api_versions => C(query_api_versions),
        request_timeout => C(request_timeout),
        sasl => C(sasl),
        ssl => C(ssl)
    },
    %%开启kafka连接器
    ok = ensure_client(ClientId, Hosts, ClientConfig),
    %%检查客户端连通性
    case check_client_connectivity(ClientId) of
        ok ->
            {ok, #{
                client_id => ClientId,
                channels => #{}
            }};
        {error, {find_client, Reason}} ->
            %% Race condition?  Crash?  We just checked it with `ensure_client'...
            {error, Reason};
        {error, {connectivity, Reason}} ->
            {error, Reason}
    end.

 %%资源状态检查
on_get_status(
    _InstId,
    #{client_id := ClientId} = State
) ->
    case check_client_connectivity(ClientId) of
        ok ->
            ?status_connected;
        {error, {find_client, _Error}} ->
            ?status_connecting;
        {error, {connectivity, Error}} ->
            {?status_connecting, State, Error}
    end.

 %%资源停止
on_stop(_InstId, #{client_id := ClientId, channels := Channels}) ->
    ?SLOG(info, #{
        msg => "kafka_client_on_stop",
        client_id => ClientId
    }),
    %%移除生产者进程
    maps:foreach(fun(_, ChannelState) -> remove_producers(ClientId, ChannelState) end, Channels),
    %%关闭连接
    deallocate_client(ClientId),
    %%销毁persistent_term
    persistent_term:erase({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}),
    ok.

%%添加通道数据
on_add_channel(
    InstId,
    #{
        client_id := ClientId,
        channels := Channels
    } = OldState,
    ChannelId,
    ChannelConfig
) ->
    %%开启kafka生产者
    {ok, ChannelState} = start_producers(InstId, ChannelId, ClientId, ChannelConfig),
    NChannels = maps:put(ChannelId, ChannelState, Channels),
    NewState = OldState#{channels => NChannels},
    {ok, NewState}.

on_get_channels(_InstId) ->
    persistent_term:get({?EMQX_PLUGIN_KAFKA_APP, ?EMQX_PLUGIN_KAFKA_CHANNELS}, []).

on_get_channel_status(
    _InstId,
    _ChannelId,
    _State
) ->
    ?status_connected.

%%移除通道数据
on_remove_channel(
    _InstId,
    #{
        client_id := ClientId,
        channels := Channels
    } = OldState,
    ChannelId
) ->
    case maps:take(ChannelId, Channels) of
        {ChannelState, NChannels} ->
            remove_producers(ClientId, ChannelState),
            NewState = OldState#{channels => NChannels},
            {ok, NewState};
        error ->
            {ok, OldState}
    end.

%%异步执行query
on_query_async(
    InstId,
    {ChannelId, Message},
    _,
    #{channels := Channels} = _ConnectorState
) ->
    #{
        message_template := Template,
        producers := Producers,
        encode_payload_type := EncodePayloadType
    } = maps:get(ChannelId, Channels),
    try
        %%模板变量渲染
        KafkaMessage = render_message(Template, Message, EncodePayloadType),
        %%发送至kafka
        do_send_msg(KafkaMessage, Producers)
    catch
        Error:Reason :Stack ->
            ?SLOG(error, #{
                msg => "emqx_plugin_kafka_producer on_query_async error",
                error => Error,
                instId => InstId,
                reason => Reason,
                stack => Stack
            }),
            {error, {Error, Reason}}
    end.
% -----

©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容