pulsar mop mqtt二次开发贡献开源项目代码

pulsar mop mqtt二次开发贡献开源项目代码

Linux MacBook单机部署Pulsar并开启认证功能

pulsar集群搭建_亲测成功

pulsar开启mqtt和认证

pulsar自定义认证插件开发

pulsar自定义创建发布和订阅主题权限插件开发

Fork代码

[图片上传失败...(image-9737ff-1653401111321)]
[图片上传失败...(image-4f1e4e-1653401111322)]

源代码地址
https://github.com/streamnative/mop

访问原始仓库,点击fork,将原始仓库代码fork到自己的GitHub账号下,成为副本仓库。

点击Fork到自己的仓库

Clone副本仓库到本地

把fork后的副本仓库 clone 到本地。

git clone https://github.com/tw-iot/mop.git   

配置上游项目地址

配置上游项目地址的目的将来如果原来那个项目streamnative/mop有代码更新时,我们需要把它最新代码合并到我自己的Fork的项目中,这样才能保持代码同步,否则你的项目永远停留在Fork时候的版本。

cd mop

git remote add upstream https://github.com/streamnative/mop.git

获取上游项目更新
git fetch upstream

合并到本地分支
git merge upstream/master

提交推送
git push origin master

这样你的代码就和原项目的代码保持同步了。

这里的 upstream 就是我们上游项目地址的别名,待会儿就是从这个项目中去拉最新的代码。

创建本地分支

进入仓库目录,使用如下命令创建并切换到authorization分支(自己的本地分支)。

git checkout -b authorization # 创建并切换到authorization分支

本地仓库修改提交

基于本地分支authorization进行代码修改,然后进行本地提交。

git add -A

git commit -m "first commit"

git push

创建一个 Pull Request

现在假设你在本地项目中修改了代码,新增了文件,当我们把代码push到Github之后,你就可以在GitHub发起一个Pull Request了,告知原项目,我修复了一些Bug,更新了某些特性,请把我的代码合并过去吧。
[图片上传失败...(image-3430b9-1653401111322)]

新建一个 Pull Request,如果GitHub发现你的代码和原项目差异,那么就可以成功 Create Pull Request。这样原项目的负责人就可以收到你的Pull Reuqest了。然后就等着他审核、合并代码,审核通过之后,你的代码将被正式合并到他的原项目中去。

[图片上传失败...(image-3f1ab4-1653401111322)]
[图片上传失败...(image-79c036-1653401111322)]

修改mop mqtt插件源代码

[图片上传失败...(image-126f20-1653401111322)]

MQTT代理5682端口

切换对应的分支
git checkout branch-2.8.1 

修改类
io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor


    processConnect 方法
    
    MqttConnectPayload payload = msg.payload();
    String clientId = payload.clientIdentifier();
    
    //截取clientId
    String clientIdentifier = payload.clientIdentifier();
    if (clientIdentifier != null && clientIdentifier.length() != 0) {
        clientId = clientIdentifier.split("\\|")[0];
    }

    String userRole = null;
    // Authenticate the client
    
    userRole = authResult.getUserRole();

    //连接成功上后,马上发一条上线消息,与1883建立连接
    NettyUtils.setUserRole(channel, userRole);
    pulsarMsg("login", channel);
    
    //心跳方法,每订阅一个topic,多一个心跳(这里是个bug,一个连接一个心跳,已解决)
    @Override
    public void processPingReq(Channel channel) {
        //channel.writeAndFlush(pingResp());
        //topicBrokers.forEach((k, v) -> v.whenComplete((exchanger, error) -> {
        //    exchanger.writeAndFlush(pingReq());
        //}));
        
        channel.writeAndFlush(pingResp());
        brokerPool.forEach((k, v) -> v.writeAndFlush(pingReq()));
    }
            
    //发送pusar消息方法 proxy代理
    //此方法会根据topic,lookup找到broker,然后和broker的mqtt1883建立连接,
    //这样,客户端给5682发心跳, 5682回一个心跳包,然后给brokerPool发心跳,就是给1883发心跳
    //代理5682发送心跳的方法processPingReq(),会给1883发送心跳包
    //********所以, 连接代理5682端口时,只需要连接成功后,发送一次在线消息,就和1883建立连接,会维持心跳,断开连接也会有事件
    public void pulsarMsg(String type, Channel channel) {
        String userRole = NettyUtils.getUserRole(channel);
        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {
            return;
        }
        String[] devArr = userRole.split("$");
        String name;
        String key;
        if (devArr.length == 1) {
            //用户连接
            key = "user";
            name = userRole;
        } else {
            name = devArr[0];
            key = devArr[1];
        }

        try {
            //设备上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
            //设备离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
            //用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;
            Map<String, Object> map = new HashMap();
            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
            map.put("v", "1.0");
            map.put("t", System.currentTimeMillis());
            map.put("method", "sys." + type);
            CompletableFuture<InetSocketAddress> lookupResult = lookupHandler.findBroker(
                    TopicName.get(pulsarTopic));
            lookupResult.whenComplete((brokerAddress, throwable) -> {
                if (null != throwable) {
                    log.error("pulsarMsg========11111+++++++topic:{}, error:{}", pulsarTopic, throwable);
                    return;
                }
                MqttFixedHeader pingHeader = new MqttFixedHeader(MqttMessageType.PUBLISH, false, AT_MOST_ONCE, false, 0);
                MqttMessage msg = new MqttMessage(pingHeader, null , JSON.toString(map));
                //这样发送收不到消息,只是与mqtt1883端口建立了连接,就算用mqtt订阅这个topic也收不到数据
                writeToMqttBroker(channel, msg, pulsarTopic, brokerAddress);
            });
        } catch (Exception e) {
            e.printStackTrace();
            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
        }
    }
                    
                    

MQTT1883端口

修改类
io.streamnative.pulsar.handlers.mqtt.support.DefaultProtocolMethodProcessorImpl


    processConnect 方法
     
    //截取clientId
    String clientIdentifier = payload.clientIdentifier();
    if (clientIdentifier != null && clientIdentifier.length() != 0) {
        clientId = clientIdentifier.split("\\|")[0];
    }
    
    //连接成功上后,马上发一条上线消息
    pulsarMsg("login", channel);
    
   processDisconnect方法
   //断开连接后,发送离线消息
   pulsarMsg("logout", channel);
   
    processConnectionLost方法
    pulsarMsg("logout", channel);
    
    @Override
    public void processPingReq(Channel channel) {
        channel.writeAndFlush(pingResp());

        //心跳,==========注意看默认实现有没有变动
        pulsarMsg("login", channel);
    }
    
    /**
     * 保存最后在线的topic和时间,防止5682端口订阅多个topic,有多个心跳包
     */
    public static Map<String, Long> onlineTopicMap = new ConcurrentHashMap<>();
    
    //发送mqtt消息给pusar方法
    public void pulsarMsg(String type, Channel channel) {
        String userRole = NettyUtils.getUserRole(channel);
        if (userRole == null || userRole.length() == 0 || "admin".equals(userRole)) {
            return;
        }
        String[] devArr = userRole.split("$");
        String ame;
        String key;
        if (devArr.length == 1) {
            //用户连接
            key = "user";
            name = userRole;
        } else {
            name = devArr[0];
            key = devArr[1];
        }

        try {
            //设备上线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.login
            //设备离线 persistent://${tenantid}/${user}/${Key}.${Name}.sys.logout
            //用户上线 persistent://${tenantid}/${user}/user.${username}.sys.login
            String pulsarTopic = "persistent://public/default/" + key + "." + name + ".sys." + type;

            //是心跳包,且map里存在topic
            /*long currentTime = System.currentTimeMillis();
            if ("login".equals(type) && onlineTopicMap.containsKey(pulsarTopic)) {
                long time = onlineTopicMap.get(pulsarTopic);
                //小于5秒不重复发送
                if (currentTime - time < 5000) {
                    return;
                }
            }
            onlineTopicMap.put(pulsarTopic, currentTime);*/

            Map<String, Object> map = new HashMap();
            map.put("reqid", UUID.randomUUID().toString().replace("-", ""));
            map.put("v", "1.0");
            map.put("t", System.currentTimeMillis());
            map.put("method", "sys." + type);
            ByteBuf buf = Unpooled.copiedBuffer(JSON.toString(map), CharsetUtil.UTF_8);
            MqttPublishMessage msg = MessageBuilder.publish()
                    .payload(buf)
                    .topicName(pulsarTopic)
                    .qos(MqttQoS.AT_MOST_ONCE)
                    .retained(false)
                    .build();
            this.qosPublishHandlers.qos0().publish(channel, msg);
        } catch (Exception e) {
            e.printStackTrace();
            log.error("pulsarMsg========2222222222+++++++{}", e.getMessage());
        }
    }

增加配置

mqttAuthorizationEnabled=true
public static final String AUTH_MYSQL = "mysql";

public static final List<String> SUPPORTED_AUTH_METHODS = ImmutableList.of(AUTH_BASIC, AUTH_TOKEN, AUTH_MYSQL);

 case AUTH_MYSQL:
        return new AuthenticationDataCommand(payload.password() + ":" + payload.userName() + ":" + payload.clientIdentifier());

io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyHandler
io.streamnative.pulsar.handlers.mqtt.MQTTInboundHandler

exceptionCaught 方法
//异常断开连接时调用
processor.processConnectionLost(ctx.channel());

打包好后,复制到pulsar的protocols目录,没有就创建一个protocols目录

mvn install -DskipTests  

cp mqtt-impl/target/pulsar-protocol-handler-mqtt-2.9.0-SNAPSHOT.nar /Users/liang/software/apache-pulsar-2.8.0.8/protocols/

参考链接:
https://blog.csdn.net/baidu_28196435/article/details/90257719
https://blog.csdn.net/wo541075754/article/details/120235657

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,444评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,421评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,036评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,363评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,460评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,502评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,511评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,280评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,736评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,014评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,190评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,848评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,531评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,159评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,411评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,067评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,078评论 2 352

推荐阅读更多精彩内容