EMQ X Kuiper支持各种source(MQTT、ZeroMq、EdgeX),默认支持MQTT
1、关于Kuiper的介绍及简单使用,请参考:https://docs.emqx.io/kuiper/latest/cn/
2、关于Kuiper与EdgeX Foundry集成,请参照:https://www.jianshu.com/p/0726d41b00bf
本文实践ZeroMq作为消息总线源(实际上EdgeX内部使用的正是ZeroMq),Kuiper订阅其消息,并以此测试Kuiper的性能,kuiper官方给出性能测试结果:<u>https://docs.emqx.io/kuiper/latest/cn/</u>
操作系统: Ubuntu 18.04
测试原理描述:启动一个go应用向ZeroMq发送消息,同时Kuiper订阅来自ZeroMq的消息,经过Kuiper规则处理后输出消息
操作步骤:
1、创建一个go应用,向ZeroMq发送若干条消息,每条消息类似:
{"device":"demo","readings":[{"device":"Temperature device","name":"Temperature","value":"40"},{"device":"Humidity device","name":"Humidity","value":"45"}]},大小大概为157字节。
go应用源码:https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/benchmark/pub.go
go应用与kuiper在一台机器上
2、由于Kuiper对zmq源的支持是以插件的形式支持的,而且必须与Kuiper的版本相配套,故此处kuiper采用源码编译打包,同时编译zmq的插件
zj@zj-Z390-UD:~/文档/项目工作$ git clone https://github.com/emqx/kuiper.git
正克隆到 'kuiper'...
remote: Enumerating objects: 28, done.
remote: Counting objects: 100% (28/28), done.
remote: Compressing objects: 100% (19/19), done.
remote: Total 5326 (delta 10), reused 21 (delta 9), pack-reused 5298
接收对象中: 100% (5326/5326), 17.02 MiB | 597.00 KiB/s, 完成.
处理 delta 中: 100% (3332/3332), 完成.
zj@zj-Z390-UD:~/文档/项目工作$ cd kuiper/
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ make pkg
Build successfully
make[1]: 进入目录“/home/zj/文档/项目工作/kuiper”
Package build success
make[1]: 离开目录“/home/zj/文档/项目工作/kuiper”
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h _packages/
总用量 18M
drwxr-xr-x 2 zj zj 4.0K 6月 8 10:51 ./
drwxr-xr-x 15 zj zj 4.0K 6月 8 10:51 ../
-rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.tar.gz
-rw-r--r-- 1 zj zj 8.8M 6月 8 10:51 kuiper-0.4.1-linux-x86_64.zip
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h plugins/sources
总用量 5.0M
drwxr-xr-x 2 zj zj 4.0K 6月 8 10:54 ./
drwxr-xr-x 6 zj zj 4.0K 6月 8 10:50 ../
-rw-r--r-- 1 zj zj 1.5K 6月 8 10:50 random.go
-rw-r--r-- 1 zj zj 2.1K 6月 8 10:50 zmq.go
-rw-r--r-- 1 zj zj 5.0M 6月 8 10:54 Zmq.so
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ cd _packages/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ tar zxf kuiper-0.4.1-linux-x86_64.tar.gz
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ cd kuiper-0.4.1-linux-x86_64/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cp ../../plugins/sources/Zmq.so plugins/sources/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ll plugins/sources/
总用量 5080
drwxr-xr-x 2 zj zj 4096 6月 8 10:57 ./
drwxr-xr-x 5 zj zj 4096 6月 8 10:51 ../
-rw-r--r-- 1 zj zj 5191448 6月 8 10:57 Zmq.so
3、修改source配置
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat etc/sources/zmq.yaml
#Global Zmq configurations
default:
server: tcp://10.0.105.143:5563
topic: events
10.0.105.143是我的go应用所在的主机IP
4、启动Kuiper server进程
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ./bin/server
Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081.
5、设置Kuiper订阅的数据流及过滤规则
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create stream demo '() WITH (FORMAT="JSON", TYPE="zmq", DATASOURCE="events")'
Connecting to 127.0.0.1:20498...
Stream demo is created.
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ vi rule.txt
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat rule.txt
{
"sql": "SELECT * from demo",
"actions": [
{
"log": {
"concurrency": 50,
"bufferLength": 10240,
"cacheLength": 102400,
"runAsync": true
}
}
],
"options": {
"concurrency": 30,
"bufferLength": 10240
}
}
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498...
Creating a new rule from file rule.txt.
Rule rule1 was created successfully, please use 'bin/cli getstatus rule rule1' command to get rule status.
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat log/stream.log
time="2020-06-08T11:00:26+08:00" level=info msg="db location is /home/zj/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64/data/" file="server.go:31"
time="2020-06-08T11:00:26+08:00" level=info msg="Starting rules" file="server.go:48"
time="2020-06-08T11:00:26+08:00" level=info msg="Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
time="2020-06-08T11:02:36+08:00" level=info msg="Stream demo is created." file="xsql_processor.go:75"
time="2020-06-08T11:05:52+08:00" level=info msg="Rule rule1 is created." file="xsql_processor.go:226"
time="2020-06-08T11:05:52+08:00" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 30, bufferLength: 10240" file="xsql_processor.go:399"
time="2020-06-08T11:05:52+08:00" level=info msg="Opening stream" file="streams.go:89" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node demo with option map[DATASOURCE:events FORMAT:JSON TYPE:zmq]" file="source_node.go:59" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node 1 instances" file="source_node.go:78" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open sink node 50 instances" file="sink_node.go:143" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Start source demo instance 0 successfully" file="source_node.go:115" rule=rule1
Kuiper规则配置如下:
{
"sql": "SELECT * from demo",
"actions": [
{
"log": {
"concurrency": 50,
"bufferLength": 10240,
"cacheLength": 102400,
"runAsync": true
}
}
],
"options": {
"concurrency": 30,
"bufferLength": 10240
}
}
以上参数含义:https://github.com/emqx/kuiper/blob/master/docs/zh_CN/rules/overview.md
6、go应用发送消息耗时:0.344352s
同时kuiper侧订阅zeroMq主题为events的消息,过滤了所有消息
说明,kuiper在0.344352s处理了9676条数据
经测试:
注:测试发现当数据量大时,明显丢包严重