Kuiper与EdgeX Foundry集成实践

Kuiper是什么? EdgeX Foundry又是什么?

Kuiper

EMQ X Kuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。Kuiper 设计的一个主要目标就是将在云端运行的实时流式计算框架(比如 Apache SparkApache StormApache Flink 等)迁移到边缘端。Kuiper 参考了上述云端流式处理项目的架构与实现,结合边缘流式数据处理的特点,采用了编写基于源 (Source)SQL (业务逻辑处理), 目标 (Sink) 的规则引擎来实现边缘端的流式数据处理
其架构如下:

image.png

  • 源 (Sources) :内置支持 MQTT 数据的接入,扩展支持与EdgeX Foundry集成
  • SQL:流式数据逻辑处理,具备完整的数据分析处理能力
    • 支持丰富的数据类型
    • 支持4种时间窗口(滚动窗口、跳跃窗口、滑动窗口、会话窗口)
    • 内置60+处理函数
    • 提供类SQL语句对数据进行抽取、过滤、转换
  • 目标(Sinks):内置支持 MQTT、HTTP等

EMQ公司的相关产品,可以登陆其官网查询了解


image.png

EdgeX Foundry

EdgeX Foundry是一个Linux 基金会运营的开源的,基于与硬件和操作系统完全无关的边缘计算物联网软件框架项目。其是一系列松耦合、开源的微服务集合,位于网络的边缘,可以与设备、传感器、执行器和其他物联网对象的物理世界进行交互。EdgeX Foundry 旨在创造一个互操作性、即插即用、模块化的物联网边缘计算的生态系统。
其架构如下:

image.png

从架构图可以看出:
南侧(SouthBound):在物理领域内的所有物联网对象,以及与这些设备、传感器、执行器和其他物联网对象直接通信并从中收集数据的网络边缘,统称为“南侧”。
北侧(NorthBound):将数据收集、存储、聚合、分析并转换为信息的云(或企业系统),以及与云通信的网络部分称为网络的“北侧”。
因此,EdgeX使数据可以向北移动到云,也可以横向移动到其他网关,或返回到设备、传感器和执行器。
EdgeX的重要服务层及微服务:
image.png

1、安装edgex

参照官网文档:https://fuji-docs.edgexfoundry.org/Ch-GettingStartedUsers.html
docker-compose启动

image.png

相关服务正常
image.png

2、安装并启动kuiper

sudo docker run -d --name kuiper --restart always -e EDGEX_SERVER=10.0.105.143 -e EDGEX_PORT=5563 -e EDGEX_SERVICE_SERVER=http://10.0.105.143:48080 emqx/kuiper:0.2.1

环境变量具体参考:https://hub.docker.com/r/emqx/kuiper 中的说明
EDGEX_SERVER:edgex中zeromq的地址(zeromq集成到core data服务中了,可以看到core data服务暴露了两个端口一个5563,一个48080)
EDGEX_PORT:edgex中zeromq的端口
EDGEX_SERVICE_SERVER:edgex中core data的地址及端口
这里我使用的kuiper镜像为 emqx/kuiper:0.2.1,为目前最新版本

3、进入kuiper容器

sudo docker exec -it kuiper /bin/sh

4、查看日志

/kuiper # cat log/stream.log

5、创建流,订阅来自edgex的消息流

/kuiper # bin/cli create stream demo'() WITH (FORMAT="JSON", TYPE="edgex")'

6、创建规则文件,内容如下

/kuiper # cat rule.txt

{
  "sql": "SELECT * from demo GROUP BY TUMBLINGWINDOW(ss, 10)",
  "actions": [
    {
      "mqtt": {
        "server": "tcp://broker.emqx.io:1883",
        "topic": "result",
        "clientId": "demo_001"
      }
}
  ]
}

上述规则:Kuiper将接受edgex的数据,执行select操作(每10s钟),然后将处理后的数据发布到tcp://broker.emqx.io:1883(也可以换成其他的,比如broker.hivemq.com或者自己搭建的EMQ X edge)
注意:Kuiper SQL相关的参考,见https://docs.emqx.io/kuiper/latest/cn/sqls/overview.html

7、创建规则,命名为rule1

/kuiper # bin/cli create rule rule1 -f rule.txt

8、查看日志,可以看到已经连通edgex,相关的规则也已经创建

image.png

9、查看规则状态

image.png

10、使用mosquitto订阅broker.emqx.io中主题为result的消息

image.png

分析结果,发布到文件(https://github.com/emqx/kuiper/blob/master/docs/zh_CN/plugins/sinks/file.md

{
  "sql": "SELECT * from demo",
  "actions": [
   {
      "file": {
        "path": "/tmp/result.txt",
        "interval": 5000
      }
   }
  ]
}

分析结果,发布到zmq(https://github.com/emqx/kuiper/blob/master/docs/zh_CN/plugins/sinks/zmq.md

{
  "sql": "SELECT * from demo",
  "actions": [
  {
    "zmq": {
       "server": "tcp://127.0.0.1:5563",
       "topic": "temp"
      }
   }
  ]
}

分析结果,通过调用rest(https://github.com/emqx/kuiper/blob/master/docs/en_US/rules/sinks/rest.md

{
  "sql": "SELECT * from demo",
  "actions": [
  {
    "rest": {
      "url": "http://127.0.0.1:48082/api/v1/device/cc622d99-f835-4e94-b5cb-b1eff8699dc4/command/51fce08a-ae19-4bce-b431-b9f363bba705",       
      "method": "post",
      "dataTemplate": "\"newKey\":\"{{.key}}\"",
      "sendSingle": true
      }
    }
  ]
}

这个类似于EdgeX中core services中的command服务

分析结果,发布到edgex(https://github.com/emqx/kuiper/blob/master/docs/en_US/rules/sinks/edgex.md

{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "edgex": {
        "protocol": "tcp",
        "host": "*",
        "port": 5571,
        "topic": "application",
        "deviceName": "kuiper",
        "contentType": "application/json"
      }
    }
  ]
}

分析结果,发布到日志文件,默认在log/stream.log

{
  "sql": "SELECT * from demo",
  "actions": [
    {
      "log": {}
    }
  ]
}

经验证,有些插件不完整

/kuiper # ./bin/cli getstatus rule rule1
Connecting to 127.0.0.1:20498... 
Stopped: cannot open /kuiper/plugins/sinks/File.so: plugin.Open("/kuiper/plugins/sinks/File.so"): Error relocating /kuiper/plugins/sinks/File.so: __fprintf_chk: symbol not found.

参考:

1、kuiper官方文档及github地址

https://docs.emqx.io/kuiper/latest/cn/

https://github.com/emqx/kuiper

2、kuiper集成edgex文档https://github.com/emqx/kuiper/blob/master/docs/en_US/edgex/edgex_rule_engine_tutorial.md

3、Edgex Foundry官方文档

https://fuji-docs.edgexfoundry.org/Ch-QuickStart.html

https://fuji-docs.edgexfoundry.org/Ch-GettingStartedUsers.html

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,542评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,596评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,021评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,682评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,792评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,985评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,107评论 3 410
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,845评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,299评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,612评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,747评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,441评论 4 333
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,072评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,828评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,069评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,545评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,658评论 2 350

推荐阅读更多精彩内容

  • 小米业务线众多,从信息流,电商,广告到金融等覆盖了众多领域,小米流式平台为小米集团各业务提供一体化的流式数据解决方...
    程序员66阅读 523评论 0 0
  • Flink总结 Flink简介 Apache Flink作为一款高吞吐量、低延迟的针对流数据和批数据的分布式实时处...
    bigdata_er阅读 10,592评论 0 10
  • 2017.3.13 诗性化体验即给到我之融合的多样性真诚善待的必要之抉择的文本化表达。 是引起之于构成的意象化可认...
    宙六十三阅读 73评论 0 0
  • 傍晚吃饭的时候我哥打电话来,说他在家,老爸的头歪向一边转不过来了,听到这个消息差点忍不住哭了出来,心情一下子就沉重...
    阿金319阅读 41评论 0 0
  • 小朋友都知道一年一度在北京举行的国际马拉松比赛吧。我 们从电视屏幕上看到参赛的运动员在跑到体育馆终点时,已经很 累...
    简什么阅读 544评论 3 6