MMQ broker
MMQ broker 是一款完全开源,高度可伸缩,高可用的分布式 MQTT 消息服务器,适用于 IoT、M2M 和移动应用程序。
MMQ broker 完整支持MQTT V3.1 和 V3.1.1。
安装
MMQ broker 是跨平台的,支持 Linux、Unix、macOS 以及 Windows。这意味着 MMQ broker 可以部署在 x86_64 架构的服务器上。由于使用raft一致性算法,集群部署三个节点以上。
从 Github 上下载源码方式
git clone https://github.com/MrHKing/mmq.git
cd mmq
mvn -Prelease-mmq -Dmaven.test.skip=true clean install -U
直接安装
您可以从 最新稳定版本 下载 mmq-server-$version.zip 包。
unzip mmq-server-$version.zip 或者 tar -xvf mmq-server-$version.tar.gz
cd mmq/bin
Docker安装
docker run -d --name mmq -p 2883:2883 -p 1883:3883 -p 8888:8888 paperman/mmq:v1.0.8
Kubernetes安装
快速入门
单机版启动
cd mmq\bin
#windows start
startup.cmd -m standalone
cd mmq/bin
#linux start
sh startup.sh -m standalone
#linux shutdown
sh shutdown.sh
集群版启动
cd mmq/config
#配置集群文件
cp cluster.conf.example cluster.conf
#每个节点都需要配置其他节点的地址,如下:
#example
192.168.31.9:7777
192.168.31.9:8848
192.168.31.9:8888
cd mmq\bin
#windows start
startup.cmd
cd mmq/bin
#linux start
sh startup.sh
#linux shutdown
sh shutdown.sh
配置文件
mqtt tcp端口默认:3883
mqtt websocket端口:2883
#*************** Spring Boot Related Configurations ***************#
### Default web context path:
server.servlet.contextPath=/
### Default web server port:
server.port=8888
#*************** mqtt broker Configurations ***************#
mmq.broker.websocketPort=2883
mmq.broker.port=3883
mmq.broker.default.user=admin
mmq.broker.default.password=admin@mmq
mmq.broker.default.anonymous=true
10万客户端压力测试
10万连接
两节点集群
测试工具xmeter
服务器资源
服务端资源:两台4核8G服务器
客户端资源:三台8核16G模拟测试服务器
测试结果
10万连接,一分钟发送一次数据。
平均吞吐量:1302。成功率:100%
服务器使用资源:CPU 10%, 内存6%
平均响应时间:0.0158秒
规则引擎
通过SQL进行规则转发
选择发布到topic/#的消息,然后选择所有字段:
SELECT * FROM "topic/#"
查询专门字段SQL
SELECT this.payload.value, this.payload.deviceName FROM "topic/#"
条件查询SQL
SELECT this.payload.value, this.payload.deviceName FROM "topic/#" WHERE this.payload.deviceName like 'abc%'
滚动窗口
滑动窗口
MySql\Sqlserver\Postgresql\Tdengine 插入数据库模板通用字段
属性说明
INSERT INTO test (uuid, date, datetime, utc, timestamp, topic, name) VALUES ('#{[uuid]}',#{[date]},#{[datetime]},#{[utc]},#{[timestamp]},#{[topic]},#{[topic1]})
kafka资源桥接
SELECT * FROM "topic/#"
测试
Topic: topic/test
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
插入结果
{"address":"","qos":0,"payload":{"msg":{"name":"test","value":"12321"}},"topic":"topic/test"}
Mysql资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
SqlServer资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
postgresql资源桥接
MQTT 客户端上传 Json Demo 如下:
{
"msg": {
"name": "test",
"value": "12321"
}
}
规则SQL Demo:
SELECT * FROM "topic/#"
Mysql SQL Demo如下:
INSERT INTO history (name, value) VALUES ('#{[msg][name]}',#{[msg][value]})
认证方式
对外API接口,HTTP API 使用 Basic 认证 (opens new window)方式,id 和 password 须分别填写 AppID 和 AppSecret。 默认的 AppID 和 AppSecret 是:mmq/aaaaaa。使用需要在【模块】菜单中找到HTTP API模块,启用即可,如需修改 AppID 和 AppSecret,点击编辑修改即可。
返回响应码
响应码说明
获得在线客户端接口
GET /v1/api/clients
返回集群下所有客户端的信息,支持分页查询参数
名称类型是否必填默认值说明
例子
$ curl -i --basic -u mmq:aaaaaa -X GET "http://localhost:8888/v1/api/clients?pageNo=1&pageSize=10"
{
"code": 200,
"message": null,
"data": {
"pageSize": 10,
"pageNo": 1,
"totalCount": 1,
"totalPage": 0,
"data": [
{
"clientId": "paho1640921025845000001",
"user": "e",
"connectTiem": "2022-01-20T01:02:39.903+00:00",
"address": "127.0.0.1",
"nodeIp": "192.168.0.113",
"nodePort": 8888
}
]
}
}
踢出客户端接口
GET /v1/api/rejectClient
通过客户端id,踢出客户端查询参数
名称类型是否必填默认值说明
$ curl -i --basic -u mmq:aaaaaa -X GET "http://localhost:8888/v1/api/rejectClient?clientId=paho1640921025845000001"
{
"code": 200,
"message": null,
"data": null
}
Dashboard --单机演示
启动后访问 http://101.43.4.211:8888/
默认账户:mmq
默认密码:aaaaaa
mqtt tcp端口默认:1883
mqtt websocket端口:2883
MQTT 规范
你可以通过以下链接了解与查阅 MQTT 协议:
开源许可
Apache License 2.0, 详见 LICENSE。
群号
QQ群: 1016132679