前言
开源版本的EMQ服务器不支持消息的持久化,但是支持通过开发扩展插件的方式实现消息的持久化。本文记录一下EMQ通过插件将消息发往kafka的实现过程。内容写的比较详细,其实不复杂,一步步来就可以实现的。
参考文档:
1、物联网架构成长之路(3)-EMQ消息服务器了解
2、物联网架构成长之路(4)-EMQ插件创建
3、EMQ集成Kafka插件编写过程 emq_plugin_kafka
4、物联网架构成长之路(5)-EMQ插件配置
1和2帮助了解EMQ服务器,对源代码进行编译,新增扩展插件,并编译;
3和4进行实际的扩展插件开发及编译,启动测试等。
一、源代码编译
EMQ服务器是基于Erlang/OTP语言平台开发的,需要先准备OTP环境
1.1 Erlang/OTP环境安装
我自己到官网下载的opt_src_21安装的过程中出现了错误,好像是openssl的加密库有变化,后面使用opt_src_19还比较顺利,可以通过我的分享下载该版本,opt_src_19下载。
具体的安装过程不再赘述,请参考CentOS 部署EMQ服务的2.2章节 准备Erlang环境
1.2 EMQ源代码下载及编译
命令行执行:git clone https://github.com/emqtt/emq-relx
下载EMQ源码
执行:cd emq-relx
,进入源码文件路径
执行:make
,进行编译,这个过程会比较耗时,需要下载较多的依赖。
如果没有报错,会在路径下看到_rel文件夹,里面是编译好的emqttd相关文件,进入到bin目录,执行sh emqttd console
,正常的话是可以启动EMQ服务的。
二、新增一个扩展插件
emq-rex的文件路径如图:
1、执行:cd deps
,进入deps目录
2、执行:cp -r emp_plugin_template emp_plugin_kafka
,template是自带的插件模板,我们复制一份作为kafka插件的基础
3、执行:cd emq_plugin_kafka
,进入kafka目录,执行make clean
,接着把全部文件名中的template
替换为kafka
,注意是全部文件及子目录下的文件,包括文件的内容,都要替换。具体如下:
a、etc路径下.config后缀改为.conf后缀,文件名中template
替换为kafka
,内容清空
b、Makefile中增加:
内容中的
template
替换为kafka
c、src路径下文件名及内容的
template
替换为kafka
,三个_demo.erl文件随便加个后缀,如_demo_gx.erl,同时修改内容中的template
替换为kafka
,内容中module后面()里面的内容修改为文件名。emq_plugin_kafka_app.erl文件中引用到_demo的地方也增加刚刚添加的后缀src文件目录:
emq_acl_demo_gx.erl文件中
emq_plugin_kafka_app.erl文件中
d、test路径下文件名及内容的
template
替换为kafka
4、在Makefile文件的当前路径下,执行make
,编译插件emq_plugin_kafka,直到编译成功。 如果没有报错,就是成功了,如下图:
5、回到emq-relx目录下,执行vi Makefile
,增加我们新增的自定义插件emq_plugin_kafka,如下图:
执行vi relx.config
,增加如下图:
6、执行rm -rf _rel
,先删除上次编译生成的文件,执行make clean && make
,直到编译成功
7、执行cd _rel/emqttd/bin
,进入编译好的文件路径,执行sh emqttd console
,启动EMQ服务,启动成功后如下图:
浏览器访问本机ip:18083,登录账号密码为(admin、public),启动emq_plugin_kafka插件
到此我们新增的一个插件就配置好了,只是该插件还未实现具体功能
三、实现emq_plugin_kafka的具体功能
1、回到emq-relx/deps/emq_plugin_kafka
,插件目录,执行mkdir priv
,创建一个文件夹,进入priv
路径,新增一个文件emq_plugin_kafka.schema
,这个文件的作用是给最后编译出来的emqttd的配置属性中增加两个配置参数,kafka中消息的topic和server地址。文件内容如下:
%% emq.plugin.kafka.server, %是Erlang语言的注释标记,表示给emqttd增加一个emq.plugin.kafka.server的可配置属性
{
mapping,
"emq.plugin.kafka.server",
"emq_plugin_kafka.kafka",
[
{default, {"127.0.0.1", 9092}},
{datatype, [integer, ip, string]}
]
}.
%% emq.plugin.kafka.topic 表示给emqttd增加一个emq.plugin.kafka.topic的可配置属性
{
mapping,
"emq.plugin.kafka.topic",
"emq_plugin_kafka.kafka",
[
{default, "test"},
{datatype, string},
hidden
]
}.
%% translation
{
translation,
"emq_plugin_kafka.kafka",
fun(Conf) ->
{RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
{Ip, Port} -> {Ip, Port};
S -> case string:tokens(S, ":") of
[Domain] -> {Domain, 9092};
[Domain, Port] -> {Domain, list_to_integer(Port)}
end
end,
Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
[
{host, RHost},
{port, RPort},
{topic, Topic}
]
end
}.
2、执行vi etc/emq_plugin_kafka.conf
,编辑配置文件,增加下面两行:
emq.plugin.kafka.server = 127.0.0.1:9092
emq.plugin.kafka.topic = test_emq
server是插件要连接的kafka服务器地址(kafka还不太熟悉,我是本地起的kafka服务,可以正常连接),topic是发往kafka的消息的topic
3、执行vi src/emq_plugin_kafka.erl
,编辑插件的实现代码,内容如下:
%%--------------------------------------------------------------------
%% Copyright (c) 2013-2018 EMQ Enterprise, Inc. (http://emqtt.io)
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emq_plugin_kafka).
-include_lib("emqttd/include/emqttd.hrl").
-define(APP, emq_plugin_kafka).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
ekaf_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]),
io:format("load completed~n", []).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
ekaf_send(<<"connected">>, ClientId, {}, _Env),
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
ekaf_send(<<"disconnected">>, ClientId, {}, _Env),
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ekaf_send(<<"subscribed">>, ClientId, {Topic, Opts}, _Env),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ekaf_send(<<"unsubscribed">>, ClientId, {Topic, Opts}, _Env),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
% ekaf_send(Message, _Env),
ekaf_send(<<"public">>, {}, Message, _Env),
{ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ==================== ekaf_init STA.===============================%%
ekaf_init(_Env) ->
% clique 方式读取配置文件
Env = application:get_env(?APP, kafka),
{ok, Kafka} = Env,
Host = proplists:get_value(host, Kafka),
Port = proplists:get_value(port, Kafka),
Broker = {Host, Port},
Topic = proplists:get_value(topic, Kafka),
io:format("~w ~w ~w ~n", [Host, Port, Topic]),
% init kafka
application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
%application:set_env(ekaf, ekaf_bootstrap_broker, {"127.0.0.1", 9092}),
%application:set_env(ekaf, ekaf_bootstrap_topics, <<"test">>),
io:format("Init ekaf with ~s:~b~n", [Host, Port]),
%%ekaf:produce_async_batched(<<"test">>, list_to_binary(Json)),
ok.
%% ==================== ekaf_init END.===============================%%
%% ==================== ekaf_send STA.===============================%%
ekaf_send(Type, ClientId, {}, _Env) ->
Json = mochijson2:encode([
{type, Type},
{client_id, ClientId},
{message, {}},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
ekaf_send_sync(Json);
ekaf_send(Type, ClientId, {Reason}, _Env) ->
Json = mochijson2:encode([
{type, Type},
{client_id, ClientId},
{cluster_node, node()},
{message, Reason},
{ts, emqttd_time:now_ms()}
]),
ekaf_send_sync(Json);
ekaf_send(Type, ClientId, {Topic, Opts}, _Env) ->
Json = mochijson2:encode([
{type, Type},
{client_id, ClientId},
{cluster_node, node()},
{message, [
{topic, Topic},
{opts, Opts}
]},
{ts, emqttd_time:now_ms()}
]),
ekaf_send_sync(Json);
ekaf_send(Type, _, Message, _Env) ->
Id = Message#mqtt_message.id,
From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From),
Username = u(From),
Json = mochijson2:encode([
{type, Type},
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, i(Qos)},
{dup, i(Dup)},
{retain, i(Retain)}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
ekaf_send_sync(Json).
ekaf_send_sync(Msg) ->
Topic = ekaf_get_topic(),
ekaf_send_sync(Topic, Msg).
ekaf_send_sync(Topic, Msg) ->
ekaf:produce_sync_batched(list_to_binary(Topic), list_to_binary(Msg)).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
%% ==================== ekaf_send END.===============================%%
%% ==================== ekaf_set_topic STA.===============================%%
ekaf_set_topic(Topic) ->
application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
ok.
ekaf_get_topic() ->
Env = application:get_env(?APP, kafka),
{ok, Kafka} = Env,
Topic = proplists:get_value(topic, Kafka),
Topic.
%% ==================== ekaf_set_topic END.===============================%%
插件启动后,在load(Env)
方法中调用ekaf_init([Env])
,初始化ekaf,ekaf是Erlang语言编写的kafka生产者工具,用于发送消息到kafka。
在EMQ的各种方法钩子,如on_client_connected、on_message_publish中调用ekaf_send()方法即可。
4、执行 make clean && make
,重新编译emq_plugin_kafka插件,编译成功入下图:
5、回到emq-relx
目录下,执行vi relx.config
,增加如图:
6、执行vi data/loaded_plugins
,这个文件是emq启动时自动加载并启动的插件,添加一行emq_plugin_kafka.
,这样emq启动时,会自动加载并启动我们的emq_plugin_kafka插件
7、执行rm -rf _rel
,执行make clean && make
,执行cd _rel/emqttd/bin
,进入编译好的emqttd的bin目录,执行sh emqttd console
,启动emqttd服务。
8、下载kafka压缩包,解压缩后进入bin目录,分别启动自带的zookeeper、启动kafka服务、创建一个上文定义的topic、启动一个kafka消费者,正常的话kafka消费者会接收到插件发来的消息。参见kafka quick start
下面是kafka消费者接收到的消息:
四、总结
上文简单实现了EMQ服务器端监听客户端连接/断开、话题订阅/取消订阅、消息发布过程中发消息到kafka中,采用的是同步发消息的方式(保证消息顺序),可自行调整消息格式或采用异步发消息等。