pulsar mop mqtt二次开发贡献开源项目代码
Linux MacBook单机部署Pulsar并开启认证功能
Fork代码
[图片上传失败...(image-9737ff-1653401111321)]
[图片上传失败...(image-4f1e4e-1653401111322)]
源代码地址
https://github.com/streamnative/mop
访问原始仓库,点击fork,将原始仓库代码fork到自己的GitHub账号下,成为副本仓库。
点击Fork到自己的仓库
Clone副本仓库到本地
把fork后的副本仓库 clone 到本地。
git clone https://github.com/tw-iot/mop.git
配置上游项目地址
配置上游项目地址的目的将来如果原来那个项目streamnative/mop有代码更新时,我们需要把它最新代码合并到我自己的Fork的项目中,这样才能保持代码同步,否则你的项目永远停留在Fork时候的版本。
cd mop
git remote add upstream https://github.com/streamnative/mop.git
获取上游项目更新
git fetch upstream
合并到本地分支
git merge upstream/master
提交推送
git push origin master
这样你的代码就和原项目的代码保持同步了。
这里的 upstream 就是我们上游项目地址的别名,待会儿就是从这个项目中去拉最新的代码。
创建本地分支
进入仓库目录,使用如下命令创建并切换到authorization分支(自己的本地分支)。
git checkout -b authorization # 创建并切换到authorization分支
本地仓库修改提交
基于本地分支authorization进行代码修改,然后进行本地提交。
git add -A
git commit -m "first commit"
git push
创建一个 Pull Request
现在假设你在本地项目中修改了代码,新增了文件,当我们把代码push到Github之后,你就可以在GitHub发起一个Pull Request了,告知原项目,我修复了一些Bug,更新了某些特性,请把我的代码合并过去吧。
[图片上传失败...(image-3430b9-1653401111322)]
新建一个 Pull Request,如果GitHub发现你的代码和原项目差异,那么就可以成功 Create Pull Request。这样原项目的负责人就可以收到你的Pull Reuqest了。然后就等着他审核、合并代码,审核通过之后,你的代码将被正式合并到他的原项目中去。
[图片上传失败...(image-3f1ab4-1653401111322)]
[图片上传失败...(image-79c036-1653401111322)]
修改mop mqtt插件源代码
[图片上传失败...(image-126f20-1653401111322)]
MQTT代理5682端口
切换对应的分支
git checkout branch-2.8.1
修改类
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor
processConnect 方法
MqttConnectPayload payload = msg.payload();
String clientId = payload.clientIdentifier();
//截取clientId
String clientIdentifier = payload.clientIdentifier();
if (clientIdentifier != null && clientIdentifier.length() != 0) {
clientId = clientIdentifier.split("\\|")[0];
}
String userRole = null;
// Authenticate the client
userRole = authResult.getUserRole();
//连接成功上后,马上发一条上线消息,与1883建立连接
NettyUtils.setUserRole(channel, userRole);
pulsarMsg("login", channel);
//心跳方法,每订阅一个topic,多一个心跳(这里是个bug,一个连接一个心跳,已解决)
@Override
public void processPingReq(Channel channel) {
//channel.writeAndFlush(pingResp());
//topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {
// exchanger.writeAndFlush(pingReq());
//}));
channel.writeAndFlush(pingResp());
brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));
}
//发送pusar消息方法 proxy代理
//此方法会根据topic,lookup找到broker,然后和broker的mqtt1883建立连接,
//这样,客户端给5682发心跳, 5682回一个心跳包,然后给brokerPool发心跳,就是给1883发心跳
//代理5682发送心跳的方法processPingReq(),会给1883发送心跳包
//********所以, 连接代理5682端口时,只需要连接成功后,发送一次在线消息,就和1883建立连接,会维持心跳,断开连接也会有事件
public void pulsarMsg(String type, Channel channel) {
String userRole = NettyUtils.getUserRole(channel);
if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {
return;
}
String[] devArr = userRole.split("$");
String name;
String key;
if (devArr.length == 1) {
//用户连接
key = "user";
name = userRole;
} else {
name = devArr[0];
key = devArr[1];
}
try {
//设备上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
//设备离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
//用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
Map<String, Object> map = new HashMap();
map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
map.put("v", "1.0");
map.put("t", System.currentTimeMillis());
map.put("method", "sys." + type);
CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(
TopicName.get(pulsarTopic));
lookupResult.whenComplete((brokerAddress, throwable) -> {
if (null != throwable) {
log.error("pulsarMsg========11111+++++++topic:{}, error:{}", pulsarTopic, throwable);
return;
}
MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);
MqttMessage msg = new MqttMessage(pingHeader, null , JSON.toString(map));
//这样发送收不到消息,只是与mqtt1883端口建立了连接,就算用mqtt订阅这个topic也收不到数据
writeToMqttBroker(channel, msg, pulsarTopic, brokerAddress);
});
} catch (Exception e) {
e.printStackTrace();
log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
}
}
MQTT1883端口
修改类
io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl
processConnect 方法
//截取clientId
String clientIdentifier = payload.clientIdentifier();
if (clientIdentifier != null && clientIdentifier.length() != 0) {
clientId = clientIdentifier.split("\\|")[0];
}
//连接成功上后,马上发一条上线消息
pulsarMsg("login", channel);
processDisconnect方法
//断开连接后,发送离线消息
pulsarMsg("logout", channel);
processConnectionLost方法
pulsarMsg("logout", channel);
@Override
public void processPingReq(Channel channel) {
channel.writeAndFlush(pingResp());
//心跳,==========注意看默认实现有没有变动
pulsarMsg("login", channel);
}
/**
* 保存最后在线的topic和时间,防止5682端口订阅多个topic,有多个心跳包
*/
public static Map<String, Long> onlineTopicMap = new ConcurrentHashMap<>();
//发送mqtt消息给pusar方法
public void pulsarMsg(String type, Channel channel) {
String userRole = NettyUtils.getUserRole(channel);
if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {
return;
}
String[] devArr = userRole.split("$");
String ame;
String key;
if (devArr.length == 1) {
//用户连接
key = "user";
name = userRole;
} else {
name = devArr[0];
key = devArr[1];
}
try {
//设备上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
//设备离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
//用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
//是心跳包,且map里存在topic
/*long currentTime = System.currentTimeMillis();
if ("login".equals(type) && onlineTopicMap.containsKey(pulsarTopic)) {
long time = onlineTopicMap.get(pulsarTopic);
//小于5秒不重复发送
if (currentTime - time < 5000) {
return;
}
}
onlineTopicMap.put(pulsarTopic, currentTime);*/
Map<String, Object> map = new HashMap();
map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
map.put("v", "1.0");
map.put("t", System.currentTimeMillis());
map.put("method", "sys." + type);
ByteBuf buf = Unpooled.copiedBuffer(JSON.toString(map), CharsetUtil.UTF_8);
MqttPublishMessage msg = MessageBuilder.publish()
.payload(buf)
.topicName(pulsarTopic)
.qos(MqttQoS.AT_MOST_ONCE)
.retained(false)
.build();
this.qosPublishHandlers.qos0().publish(channel, msg);
} catch (Exception e) {
e.printStackTrace();
log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
}
}
增加配置
mqttAuthorizationEnabled=true
public static final String AUTH_MYSQL = "mysql";
public static final List<String> SUPPORTED_AUTH_METHODS = ImmutableList.of(AUTH_BASIC, AUTH_TOKEN, AUTH_MYSQL);
case AUTH_MYSQL:
return new AuthenticationDataCommand(payload.password() + ":" + payload.userName() + ":" + payload.clientIdentifier());
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyHandler
io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler
exceptionCaught 方法
//异常断开连接时调用
processor.processConnectionLost(ctx.channel());
打包好后,复制到pulsar的protocols目录,没有就创建一个protocols目录
mvn install -DskipTests
cp mqtt-impl/target/pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar /Users/liang/software/apache-pulsar-2.8.0.8/protocols/
参考链接:
https://blog.csdn.net/baidu_28196435/article/details/90257719
https://blog.csdn.net/wo541075754/article/details/120235657