MQTT调研及设计部分开发内容
1. 背景
MQTT自研版本架构依赖组件太多架构复杂,运行有不稳定问题(订阅树bug等),功能不完善(5.0协议不支持)
调研了开源的MQTT broker组件,结果如下:
名称 | BifroMQ | EMQX | rocketmq-mqtt |
---|---|---|---|
开源时间 | 2023 | 2012 | 2022 |
语言 | java | erlang | java |
Star | 630 | 14.3k | 184 |
Folk | 63 | 2.3k | 66 |
Issue | 5 | 171 | 39 |
总结:BifroMQ、rocketmq-mqtt 这两款开源MQTT时间较晚,社区不活跃,潜在问题、bug多,但是语言java上手快,EMQX语言erlang,但是比较成熟,总体还是EMQX更有优势。
EMQX跟自研对比:
名称 | 自研 | EMQX |
---|---|---|
优点 | java语言熟悉易改造,查找定位问题难度小,,console 鉴权、order、operator、云监控等都不需要开发 | 底层稳定,底层功能实现全、社区很活跃,有问题能参与讨论,问题发现多修复多、高性能和低延迟 |
缺点 | 底层不稳定,5.0协议不支持,如果需要稳定底层,订阅树优化,组件太多,需要优化,去redis等,部分代码逻辑复杂需要优化(比如离线消息推送过程、鉴权流程复杂) | erlang语言、配置较为复杂、插件生态有限,难以开发扩展、理解日志具有一定难度、不支持横向扩容只有三个节点、不支持数据持久化、console 鉴权、order、operator、云监控等都需要开发 |
个人比较倾向EMQX基础上开发上线,初版不支持数据持久化,机器宕机时可能导致持久化订阅关系丢失,需要客户端设置重连重新订阅解决,保存这台机器的离线消息会丢失,可暂不支持离线消息解决
2. 基于EMQX开源版组件设计
2.1 MQTT broker
直接采用开源
2.2 operator
emqx有开源版本(https://github.com/emqx/emqx-operator/),但是我们产品部署场景太多,需要对接kosmos,对接vpc-native等等,这块最好自研,基于kube-builder开发新的operator
2.3 console
实现方式是调用http接口访问emqx集群,接口文档:https://docs.emqx.com/zh/emqx/v5.8/admin/api-docs.html
具体功能如下:
1、 获取topic列表
2、 获取客户端详情
3、 订阅管理 查看订阅关系
4、 发送消息
5、 消息连接订阅统计报表:消息流入、消息流出、消息丢弃、连接数、主题数、订阅数
6、 集群详情:规格详情、接入点、用户名密码等鉴权信息、基础信息(订单信息)
7、 订单的退订改续
这块内容需要前端出新的控制台页面。控制台调用api示例:
import okhttp3.*;
import java.io.IOException;
public class EMQXNodesAPIExample {
public static void main(String[] args) {
try {
String username = "4f33d24d7b8e448d";
String password = "gwtbmFJZrnzUu8mPK1BxUkBA66PygETiDEegkf1q8dD";
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder()
.url("http://localhost:18083/api/v5/nodes")
.header("Content-Type", "application/json")
.header("Authorization", Credentials.basic(username, password))
.build();
Response response = client.newCall(request).execute();
System.out.println(response.body().string());
} catch (IOException e) {
e.printStackTrace();
}
}
}
2.4 exclusive工程
集群相关操作,创建清理集群,功能类似rocketmq exclusive-console
2.4 order订单工程
与mop交互的工程,复用之前的稍微改动即可
2.5 collect
这些功能优先级低
1、性能/sla数据上报deepwatch相关配置
2、上报稽核数据kafka配置
3、容量上报
3、 鉴权
采用 sha256 密码加密方式 suffix加盐方式 采用username+password加密
示例:
public static void main(String[] args) {
String username= "test";
String salt = "901e9a70436";
String password= generateHashWithSuffix(password, salt);
System.out.println("SHA-256 Hash with Salt: " + hashWithSuffix);
}
public static String generateHashWithSuffix(String password, String salt) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
digest.update(salt.getBytes());
byte[] hashBytes = digest.digest(password.getBytes());
// 将字节转换为十六进制字符串
StringBuilder hexString = new StringBuilder();
for (byte b : hashBytes) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1)
hexString.append('0');
hexString.append(hex);
}
// 将盐添加到哈希字符串的末尾
return hexString.toString() + salt;
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
鉴权信息的username跟password展示到控制台
username集群唯一,随机生成8位字母跟数字混合
salt 随机生成10位字母跟数字混合,控制台不展示,数据库保存
password 控制台展示
username跟salt在创建集群的时候生成跟resourceId保存到一个表里,底层保存这些鉴权信息需要调用接口(https://docs.emqx.com/zh/emqx/v5.8/admin/api-docs.html#tag/Authorization)选择外置数据库存储
4、 公网访问
1、公网访问对接serverless以后能够直接支持公网
2、serverless之前新资源池建设需要申请公网跟独享集群端口
3、复用之前的,没有用户使用的机器回收给kosmos纳管