使用ZeroMq源测试Kuiper吞吐量

EMQ X Kuiper支持各种source(MQTT、ZeroMq、EdgeX),默认支持MQTT
1、关于Kuiper的介绍及简单使用,请参考:https://docs.emqx.io/kuiper/latest/cn/
2、关于Kuiper与EdgeX Foundry集成,请参照:https://www.jianshu.com/p/0726d41b00bf

本文实践ZeroMq作为消息总线源(实际上EdgeX内部使用的正是ZeroMq),Kuiper订阅其消息,并以此测试Kuiper的性能,kuiper官方给出性能测试结果:<u>https://docs.emqx.io/kuiper/latest/cn/</u>

操作系统: Ubuntu 18.04
测试原理描述:启动一个go应用向ZeroMq发送消息,同时Kuiper订阅来自ZeroMq的消息,经过Kuiper规则处理后输出消息
操作步骤
1、创建一个go应用,向ZeroMq发送若干条消息,每条消息类似:
{"device":"demo","readings":[{"device":"Temperature device","name":"Temperature","value":"40"},{"device":"Humidity device","name":"Humidity","value":"45"}]},大小大概为157字节。

go应用源码:https://github.com/emqx/kuiper/blob/master/fvt_scripts/edgex/benchmark/pub.go
go应用与kuiper在一台机器上
2、由于Kuiper对zmq源的支持是以插件的形式支持的,而且必须与Kuiper的版本相配套,故此处kuiper采用源码编译打包,同时编译zmq的插件

zj@zj-Z390-UD:~/文档/项目工作$ git clone https://github.com/emqx/kuiper.git
正克隆到 'kuiper'...
remote: Enumerating objects: 28, done.
remote: Counting objects: 100% (28/28), done.
remote: Compressing objects: 100% (19/19), done.
remote: Total 5326 (delta 10), reused 21 (delta 9), pack-reused 5298
接收对象中: 100% (5326/5326), 17.02 MiB | 597.00 KiB/s, 完成.
处理 delta 中: 100% (3332/3332), 完成.
zj@zj-Z390-UD:~/文档/项目工作$ cd kuiper/
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ make pkg
Build successfully
make[1]: 进入目录“/home/zj/文档/项目工作/kuiper”
Package build success
make[1]: 离开目录“/home/zj/文档/项目工作/kuiper”
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h _packages/
总用量 18M
drwxr-xr-x  2 zj zj 4.0K 6月   8 10:51 ./
drwxr-xr-x 15 zj zj 4.0K 6月   8 10:51 ../
-rw-r--r--  1 zj zj 8.8M 6月   8 10:51 kuiper-0.4.1-linux-x86_64.tar.gz
-rw-r--r--  1 zj zj 8.8M 6月   8 10:51 kuiper-0.4.1-linux-x86_64.zip
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ go build --buildmode=plugin -o plugins/sources/Zmq.so plugins/sources/zmq.go
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ ll -h  plugins/sources
总用量 5.0M
drwxr-xr-x 2 zj zj 4.0K 6月   8 10:54 ./
drwxr-xr-x 6 zj zj 4.0K 6月   8 10:50 ../
-rw-r--r-- 1 zj zj 1.5K 6月   8 10:50 random.go
-rw-r--r-- 1 zj zj 2.1K 6月   8 10:50 zmq.go
-rw-r--r-- 1 zj zj 5.0M 6月   8 10:54 Zmq.so
zj@zj-Z390-UD:~/文档/项目工作/kuiper$ cd _packages/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ tar zxf kuiper-0.4.1-linux-x86_64.tar.gz 
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages$ cd kuiper-0.4.1-linux-x86_64/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cp ../../plugins/sources/Zmq.so plugins/sources/
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ll plugins/sources/
总用量 5080
drwxr-xr-x 2 zj zj    4096 6月   8 10:57 ./
drwxr-xr-x 5 zj zj    4096 6月   8 10:51 ../
-rw-r--r-- 1 zj zj 5191448 6月   8 10:57 Zmq.so

3、修改source配置

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat etc/sources/zmq.yaml 
#Global Zmq configurations
default:
  server: tcp://10.0.105.143:5563
  topic: events

10.0.105.143是我的go应用所在的主机IP

4、启动Kuiper server进程

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ ./bin/server
Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081.

5、设置Kuiper订阅的数据流及过滤规则

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create stream demo '() WITH (FORMAT="JSON", TYPE="zmq", DATASOURCE="events")'
Connecting to 127.0.0.1:20498... 
Stream demo is created.

zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ vi rule.txt
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat rule.txt
{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {
          "concurrency": 50,
          "bufferLength": 10240,
          "cacheLength": 102400,
          "runAsync": true
       }
    }
  ],
  "options": {
    "concurrency": 30,
    "bufferLength": 10240
  }
}
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ bin/cli create rule rule1 -f rule.txt
Connecting to 127.0.0.1:20498... 
Creating a new rule from file rule.txt.
Rule rule1 was created successfully, please use 'bin/cli getstatus rule rule1' command to get rule status.
zj@zj-Z390-UD:~/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64$ cat log/stream.log 
time="2020-06-08T11:00:26+08:00" level=info msg="db location is /home/zj/文档/项目工作/kuiper/_packages/kuiper-0.4.1-linux-x86_64/data/" file="server.go:31"
time="2020-06-08T11:00:26+08:00" level=info msg="Starting rules" file="server.go:48"
time="2020-06-08T11:00:26+08:00" level=info msg="Serving kuiper (version - 0.4.1) on port 20498, and restful api on port 9081. \n" file="server.go:101"
time="2020-06-08T11:02:36+08:00" level=info msg="Stream demo is created." file="xsql_processor.go:75"
time="2020-06-08T11:05:52+08:00" level=info msg="Rule rule1 is created." file="xsql_processor.go:226"
time="2020-06-08T11:05:52+08:00" level=info msg="Init rule with options {isEventTime: false, lateTolerance: 0, concurrency: 30, bufferLength: 10240" file="xsql_processor.go:399"
time="2020-06-08T11:05:52+08:00" level=info msg="Opening stream" file="streams.go:89" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node demo with option map[DATASOURCE:events FORMAT:JSON TYPE:zmq]" file="source_node.go:59" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open source node 1 instances" file="source_node.go:78" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="open sink node 50 instances" file="sink_node.go:143" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Opening func collector" file="func.go:36" rule=rule1
time="2020-06-08T11:05:52+08:00" level=info msg="Start source demo instance 0 successfully" file="source_node.go:115" rule=rule1

Kuiper规则配置如下:

{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {
          "concurrency": 50,
          "bufferLength": 10240,
          "cacheLength": 102400,
          "runAsync": true
       }
    }
  ],
  "options": {
    "concurrency": 30,
    "bufferLength": 10240
  }
}

以上参数含义:https://github.com/emqx/kuiper/blob/master/docs/zh_CN/rules/overview.md

6、go应用发送消息耗时:0.344352s


image.png

同时kuiper侧订阅zeroMq主题为events的消息,过滤了所有消息


image.png

说明,kuiper在0.344352s处理了9676条数据

经测试:


image.png

注:测试发现当数据量大时,明显丢包严重

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,142评论 6 498
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,298评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,068评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,081评论 1 291
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,099评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,071评论 1 295
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,990评论 3 417
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,832评论 0 273
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,274评论 1 310
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,488评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,649评论 1 347
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,378评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,979评论 3 325
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,625评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,796评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,643评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,545评论 2 352