背景:
- emq开源版集成了许多插件,但没有提供kafka的。但官方提供了插件的模板,我们只需要照葫芦画瓢即可;
- 由于emq是基于Erlang编程语言,所以我们需要搭建Erlang环境(Erlang这语言,不到万不得已,千万别碰!!);
- 选择v2.3.11是因为截止写这篇博客的时候,v2版本才是真正的稳定版。
Ok,开始吧!
一、Erlang环境
- 安装依赖
yum install fop
yum install gcc gcc-c++ glibc-devel kernel-devel make openssl-devel autoconf
yum -y install ncurses ncurses-devel
yum install m4 openssl-devel unixODBC unixODBC-devel
- 下载安装包
wget http://erlang.org/download/otp_src_20.3.tar.gz
- 解压、安装
tar -zxvf otp_src_20.3.tar.gz
./configure --prefix=/usr/local/erlang
make
make install
- 添加环境变量
vi /etc/profile
#set erlang environment
export PATH=$PATH:/usr/local/erlang/bin
source /etc/profile
-
验证
二、测试安装Emq
因为我们后续要加上自己的kafka插件代码,所以这步只是测试是否能正常编译
- 下载源码
wget https://codeload.github.com/emqx/emqx-rel/tar.gz/v2.3.11
mv v2.3.11 emq-v2.3.11.tar.gz
- 解压
tar -zxvf emq-v2.3.11.tar.gz
cd emqx-rel-2.3.11
- 编译
make
编译需要花点时间,如果编译成功,就可以进入下一步了。
如果有错误,那么一定是linux环境的问题,因为这个Erlang版本和Emq版本是我测试过的。我之前就因为一直没通过,所以重新在本地建了一个centos7虚拟机,再按照步骤来,就成功了。
三、编写插件
非常感谢写这篇博客《EMQ集成Kafka插件编写过程 emq_plugin_kafka》的作者啊,帮了大忙!
除了有一些小问题~
-
第一次编译之后,文件结构如下:
- 复制一份插件模板,并更名:
cd deps
cp -r emq_plugin_template emq_plugin_kafka
cd emq_plugin_kafka
-
插件目录结构
- 修改关键字“template”为“kafka”
将插件目录emq_plugin_kafka中所有文件名和里面的内容中的“template”改为“kafka”
(你总不希望别人看你的代码代码时,上面的“template”几个大字赫然在目吧?),包括src目录下、Makefile文件、etc下等 - 初步处理src下的文件
- 因为没用到权限验证和登录授权验证,所以删除acl和auth代码
rm -rf emq_acl_demo.erl
rm -rf emq_auth_demo.erl
- 修改emq_plugin_kafka_app.erl文件,把acl和auth的模块注册代码去掉,并加些打印语句
vim emq_plugin_kafka_app.erl
-module(emq_plugin_kafka_app).
-behaviour(application).
%% Application callbacks
-export([start/2, stop/1]).
start(_StartType, _StartArgs) ->
{ok, Sup} = emq_plugin_kafka_sup:start_link(),
%% ok = emqttd_access_control:register_mod(auth, emq_auth_demo, []),
%% ok = emqttd_access_control:register_mod(acl, emq_acl_demo, []),
emq_plugin_kafka:load(application:get_all_env()),
io:format("emq_plugin_kafka start.~n", []),
{ok, Sup}.
stop(_State) ->
%% ok = emqttd_access_control:unregister_mod(auth, emq_auth_demo),
%% ok = emqttd_access_control:unregister_mod(acl, emq_acl_demo),
emq_plugin_kafka:unload(),
io:format("emq_plugin_kafka stop.~n", []).
- 暂时测试一下
①修改配置文件relx.config:vim relx.config
添加一行:{emq_plugin_kafka, load},
②修改Makefile:vim Makefile
添加emq_plugin_kafka:
DEPS += emqttd emq_modules emq_dashboard emq_retainer emq_recon emq_reloader \
emq_auth_clientid emq_auth_username emq_auth_ldap emq_auth_http \
emq_auth_mysql emq_auth_pgsql emq_auth_redis emq_auth_mongo \
emq_sn emq_coap emq_stomp emq_plugin_template emq_web_hook \
emq_lua_hook emq_auth_jwt emq_plugin_kafka
③设置emq启动时加载插件:vim data/loaded_plugins
emq_recon.
emq_modules.
emq_retainer.
emq_dashboard.
emq_plugin_kafka.
④在emq根目录make,如果不成功,可能是文件/内容修改不完全
⑤控制台启动emq
./_rel/emqttd/bin/emqttd console
启动日志:
6.1. 连接单节点的kafka的代码——ekaf工具
如何搭建kafka,可参考我的另一篇博客《三台虚拟机搭建kafka集群(和zookeeper集群)》
假设你已经搭建好了kafka。
为了连接上kafka,需要用到基于Erlang写的工具ekaf。
- 修改配置文件
① 配置文件变更历史,官方介绍
通过我的后期实践的理解,conf后缀对应key=value这种配置格式,config后缀对应Erlang原始配置格式
Makefile里会有对应配置文件路径:
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
② 进入emq_plugin_kafka/etc,修改文件名后缀为conf,因为Makefile里写的是conf
mv emq_plugin_kafka.config emq_plugin_kafka.conf
③ 编辑该文件:vim emq_plugin_kafka.conf
emq.plugin.kafka.server = 192.168.220.129:9092
emq.plugin.kafka.topic = kafka-topic
④ 在deps/emq_plugin_kafka目录下,新建priv文件夹,然后新建schema文件
mkdir priv
vim priv/emq_plugin_kafka.schema
我复制上面提到的博客《EMQ集成Kafka插件编写过程 emq_plugin_kafka》里的代码,老是解析这个文件出错,不得已,自己一点一点敲,最后变成了这个样子:
{
mapping,
"emq.plugin.kafka.server",
"emq_plugin_kafka.server",
[
{
mapping,
"emq.plugin.kafka.server",
"emq_plugin_kafka.server",
[
{default, {"127.0.0.1", 9092}},
{datatype, [integer, ip, string]}
]
}.
{
mapping,
"emq.plugin.kafka.topic",
"emq_plugin_kafka.server",
[
{default, "test"},
{datatype, string},
hidden
]
}.
{
translation,
"emq_plugin_kafka.server",
fun(Conf) ->
{RHost, RPort} = case cuttlefish:conf_get("emq.plugin.kafka.server", Conf) of
{Ip, Port} -> {Ip, Port};
S -> case string:tokens(S, ":") of
[Domain] -> {Domain, 9092};
[Domain, Port] -> {Domain, list_to_integer(Port)}
end
end,
Topic = cuttlefish:conf_get("emq.plugin.kafka.topic", Conf),
[
{host, RHost},
{port, RPort},
{topic, Topic}
]
end
}.
⑤ 修改Makefile文件,增加ekaf依赖
PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11
BUILD_DEPS = emqttd cuttlefish ekaf
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master
ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'
NO_AUTOPATCH = cuttlefish
COVER = true
include erlang.mk
app:: rebar.config
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.conf -i priv/emq_plugin_kafka.schema -d data
⑥ 编写逻辑代码:vim src/emq_plugin_kafka.erl
主要是写了个 ekaf_send(Message, _Env) 方法,然后再消息到来的时候调用
-module(emq_plugin_kafka).
-include_lib("emqttd/include/emqttd.hrl").
-define(APP, emq_plugin_kafka).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
ekaf_init(Env),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
ekaf_send(Message, _Env),
{ok, Message}.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
ekaf_init(_Env) ->
{ok, Kafka_Env} = application:get_env(?APP, server),
Host = proplists:get_value(host, Kafka_Env),
Port = proplists:get_value(port, Kafka_Env),
Broker = {Host, Port},
%Broker = {"192.168.52.130", 9092},
Topic = proplists:get_value(topic, Kafka_Env),
%Topic = "test-topic",
application:set_env(ekaf, ekaf_partition_strategy, strict_round_robin),
application:set_env(ekaf, ekaf_bootstrap_broker, Broker),
application:set_env(ekaf, ekaf_bootstrap_topics, list_to_binary(Topic)),
%%设置数据上报间隔,ekaf默认是数据达到1000条或者5秒,触发上报
application:set_env(ekaf, ekaf_buffer_ttl, 100),
{ok, _} = application:ensure_all_started(ekaf).
%io:format("Init ekaf with ~p~n", [Broker]),
%Json = mochijson2:encode([
% {type, <<"connected">>},
% {client_id, <<"test-client_id">>},
% {cluster_node, <<"node">>}
%]),
%io:format("send : ~w.~n",[ekaf:produce_async_batched(list_to_binary(Topic), list_to_binary(Json))]).
ekaf_send(Message, _Env) ->
From = Message#mqtt_message.from,
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
ClientId = get_form_clientid(From),
Username = get_form_username(From),
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Str = [
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, Qos},
{dup, Dup},
{retain, Retain}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
],
%io:format("Str : ~s~n", [Str]),
Json = mochijson2:encode(Str),
KafkaTopic = get_topic(),
ekaf:produce_sync_batched(KafkaTopic, list_to_binary(Json)).
get_form_clientid({ClientId, Username}) -> ClientId;
get_form_clientid(From) -> From.
get_form_username({ClientId, Username}) -> Username;
get_form_username(From) -> From.
get_topic() ->
{ok, Topic} = application:get_env(ekaf, ekaf_bootstrap_topics),
Topic.
6.2 连接集群的kafka的代码——brod工具
ekaf工具不支持集群,brod支持,当然,它也支持单节点~
因为我用了git工具,所以大家可以看看,在6.1的基础上,做了哪些改动
[root@localhost emq_plugin_kafka]# git status
# 位于分支 based-on-brod
# 尚未暂存以备提交的变更:
# (使用 "git add/rm <file>..." 更新要提交的内容)
# (使用 "git checkout -- <file>..." 丢弃工作区的改动)
#
# 修改: Makefile
# 删除: etc/emq_plugin_kafka.conf
# 修改: src/emq_plugin_kafka.erl
#
# 未跟踪的文件:
# (使用 "git add <file>..." 以包含要提交的内容)
#
# etc/emq_plugin_kafka.config
修改尚未加入提交(使用 "git add" 和/或 "git commit -a")
所以你们只需要:
① 修改Makefile
PROJECT = emq_plugin_kafka
PROJECT_DESCRIPTION = EMQ Plugin Kafka
PROJECT_VERSION = 2.3.11
BUILD_DEPS = emqttd cuttlefish ekaf brod
dep_emqttd = git https://github.com/emqtt/emqttd v2.3.11
dep_cuttlefish = git https://github.com/emqtt/cuttlefish v2.0.11
dep_ekaf = git https://github.com/helpshift/ekaf master
dep_brod = git https://github.com/klarna/brod.git 3.7.3
ERLC_OPTS += +debug_info
ERLC_OPTS += +'{parse_transform, lager_transform}'
NO_AUTOPATCH = cuttlefish
COVER = true
include erlang.mk
app:: rebar.config
app.config::
./deps/cuttlefish/cuttlefish -l info -e etc/ -c etc/emq_plugin_kafka.config -i priv/emq_plugin_kafka.schema -d data
② 删除etc/emq_plugin_kafka.conf
rm -f etc/emq_plugin_kafka.conf
③ 修改src/emq_plugin_kafka.erl
-module(emq_plugin_kafka).
-include_lib("emqttd/include/emqttd.hrl").
-include_lib("brod/include/brod_int.hrl").
-define(TEST_TOPIC, <<"test-topic">>).
-export([load/1, unload/0]).
%% Hooks functions
-export([on_client_connected/3, on_client_disconnected/3]).
-export([on_client_subscribe/4, on_client_unsubscribe/4]).
-export([on_session_created/3, on_session_subscribed/4, on_session_unsubscribed/4, on_session_terminated/4]).
-export([on_message_publish/2, on_message_delivered/4, on_message_acked/4]).
%% Called when the plugin application start
load(Env) ->
brod_init([Env]),
emqttd:hook('client.connected', fun ?MODULE:on_client_connected/3, [Env]),
emqttd:hook('client.disconnected', fun ?MODULE:on_client_disconnected/3, [Env]),
emqttd:hook('client.subscribe', fun ?MODULE:on_client_subscribe/4, [Env]),
emqttd:hook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4, [Env]),
emqttd:hook('session.created', fun ?MODULE:on_session_created/3, [Env]),
emqttd:hook('session.subscribed', fun ?MODULE:on_session_subscribed/4, [Env]),
emqttd:hook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4, [Env]),
emqttd:hook('session.terminated', fun ?MODULE:on_session_terminated/4, [Env]),
emqttd:hook('message.publish', fun ?MODULE:on_message_publish/2, [Env]),
emqttd:hook('message.delivered', fun ?MODULE:on_message_delivered/4, [Env]),
emqttd:hook('message.acked', fun ?MODULE:on_message_acked/4, [Env]).
on_client_connected(ConnAck, Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s connected, connack: ~w~n", [ClientId, ConnAck]),
Json = mochijson2:encode([
{type, <<"connected">>},
{client_id, ClientId},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_1">>, list_to_binary(Json)),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_1">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
io:format("brod_produce_reply:ok ~n"),
ok
after 5000 ->
io:format("brod_produce_reply:exit ~n"),
erlang:exit(timeout)
%%ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Client}.
on_client_disconnected(Reason, _Client = #mqtt_client{client_id = ClientId}, _Env) ->
io:format("client ~s disconnected, reason: ~w~n", [ClientId, Reason]),
Json = mochijson2:encode([
{type, <<"disconnected">>},
{client_id, ClientId},
{reason, Reason},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_2">>, list_to_binary(Json)),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, 0, <<"mykey_2">>, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
ok.
on_client_subscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) will subscribe: ~p~n", [Username, ClientId, TopicTable]),
{ok, TopicTable}.
on_client_unsubscribe(ClientId, Username, TopicTable, _Env) ->
io:format("client(~s/~s) unsubscribe ~p~n", [ClientId, Username, TopicTable]),
{ok, TopicTable}.
on_session_created(ClientId, Username, _Env) ->
io:format("session(~s/~s) created.", [ClientId, Username]).
on_session_subscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) subscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
{ok, {Topic, Opts}}.
on_session_unsubscribed(ClientId, Username, {Topic, Opts}, _Env) ->
io:format("session(~s/~s) unsubscribed: ~p~n", [Username, ClientId, {Topic, Opts}]),
ok.
on_session_terminated(ClientId, Username, Reason, _Env) ->
io:format("session(~s/~s) terminated: ~p.", [ClientId, Username, Reason]).
%% transform message and return
%%根据topic前缀来分发到对应方法:以$SYS/开头
on_message_publish(Message = #mqtt_message{topic = <<"$SYS/", _/binary>>}, _Env) ->
{ok, Message};
on_message_publish(Message, _Env) ->
io:format("publish ~s~n", [emqttd_message:format(Message)]),
Id = Message#mqtt_message.id,
From = Message#mqtt_message.from, %需要登录和不需要登录这里的返回值是不一样的
Topic = Message#mqtt_message.topic,
Payload = Message#mqtt_message.payload,
Qos = Message#mqtt_message.qos,
Dup = Message#mqtt_message.dup,
Retain = Message#mqtt_message.retain,
Timestamp = Message#mqtt_message.timestamp,
ClientId = c(From),
Username = u(From),
%%ClientId作为Key
Key = iolist_to_binary(ClientId),
%%获得分区数
Partition = getPartition(Key),
%%读取配置文件
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaTopic = proplists:get_value(kafka_topic, Kafka),
Json = mochijson2:encode([
{type, <<"publish">>},
{partition_num, Partition},
{client_id, ClientId},
{message, [
{username, Username},
{topic, Topic},
{payload, Payload},
{qos, i(Qos)},
{dup, i(Dup)},
{retain, i(Retain)}
]},
{cluster_node, node()},
{ts, emqttd_time:now_ms()}
]),
%%ok = brod:produce_sync(brod_client_1, ?TEST_TOPIC, 0, <<"mykey_3">>, list_to_binary(Json)),
{ok, CallRef} = brod:produce(brod_client_1, KafkaTopic, Partition, ClientId, list_to_binary(Json)),
receive
#brod_produce_reply{ call_ref = CallRef
, result = brod_produce_req_acked
} ->
ok
after 5000 ->
ct:fail({?MODULE, ?LINE, timeout})
end,
{ok, Message}.
%%key的md5的最后一位进行取模,获取分区数
getPartition(Key) ->
{ok, Kafka} = application:get_env(?MODULE, kafka),
PartitionNum = proplists:get_value(kafka_producer_partition, Kafka),
<<Fix:120, Match:8>> = crypto:hash(md5, Key),
abs(Match) rem PartitionNum.
on_message_delivered(ClientId, Username, Message, _Env) ->
io:format("delivered to client(~s/~s): ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
on_message_acked(ClientId, Username, Message, _Env) ->
io:format("client(~s/~s) acked: ~s~n", [Username, ClientId, emqttd_message:format(Message)]),
{ok, Message}.
%% Called when the plugin application stop
unload() ->
%%application:stop(brod),
emqttd:unhook('client.connected', fun ?MODULE:on_client_connected/3),
emqttd:unhook('client.disconnected', fun ?MODULE:on_client_disconnected/3),
emqttd:unhook('client.subscribe', fun ?MODULE:on_client_subscribe/4),
emqttd:unhook('client.unsubscribe', fun ?MODULE:on_client_unsubscribe/4),
emqttd:unhook('session.created', fun ?MODULE:on_session_created/3),
emqttd:unhook('session.subscribed', fun ?MODULE:on_session_subscribed/4),
emqttd:unhook('session.unsubscribed', fun ?MODULE:on_session_unsubscribed/4),
emqttd:unhook('session.terminated', fun ?MODULE:on_session_terminated/4),
emqttd:unhook('message.publish', fun ?MODULE:on_message_publish/2),
emqttd:unhook('message.delivered', fun ?MODULE:on_message_delivered/4),
emqttd:unhook('message.acked', fun ?MODULE:on_message_acked/4).
%% ===================================================================
%% brod_init https://github.com/klarna/brod
%% ===================================================================
brod_init(_Env) ->
{ok, _} = application:ensure_all_started(brod),
{ok, Kafka} = application:get_env(?MODULE, kafka),
KafkaBootstrapEndpoints = proplists:get_value(bootstrap_broker, Kafka),
%%KafkaBootstrapEndpoints = [{"127.0.0.1", 9092}], %%localhost,172.16.6.161
%%KafkaBootstrapEndpoints = [{"localhost", 9092}], %%localhost,172.16.6.161
%%ClientConfig = [{reconnect_cool_down_seconds, 10}],%% socket error recovery
ClientConfig = [],%% socket error recovery
{ok, Kafka} = application:get_env(?MODULE, kafka),
Topic = proplists:get_value(kafka_topic, Kafka),
Partition = 0,
%%下面两行是初始化client,一个client只能发送到一个topic,如果要多个topic,则创建多个client
ok = brod:start_client(KafkaBootstrapEndpoints, brod_client_1, ClientConfig),
ok = brod:start_producer(brod_client_1, Topic, _ProducerConfig = []),
%%ok = brod:produce_sync(brod_client_1, Topic, Partition, <<"key1">>, <<"value1">>),
%%{ok, CallRef} = brod:produce(brod_client_1, Topic, Partition, <<"key1">>, <<"value2">>),
io:format("Init brod with ~p~n", [KafkaBootstrapEndpoints]).
i(true) -> 1;
i(false) -> 0;
i(I) when is_integer(I) -> I.
c({ClientId, Username}) -> ClientId;
c(From) -> From.
u({ClientId, Username}) -> Username;
u(From) -> From.
④ 新增etc/emq_plugin_kafka.config
[
{emq_plugin_kafka, [
{kafka, [
{ bootstrap_broker, [{"192.168.220.129", 9092},{"192.168.220.130", 9092},{"192.168.220.131", 9092}] },
{ query_api_versions, false },
{ reconnect_cool_down_seconds, 10},
{kafka_producer_partition, 3},
{kafka_topic, <<"kafka-new-topic">>}
]}
]}
].
- 再修改一下emq的配置文件:
vim relx.config
添加:{ekaf, load},
四、大功告成
- 先删除_rel文件夹
make
- 控制台启动:
./_rel/emqttd/bin/emqttd console
- 打开kafka控制台查看日志,这里的“kafka-topic”就是emq_plugin_kafka.conf文件里的配置项:
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic kafka-topic
-
网页测试: