springboot+mqtt+apache apollo,监听信息并可以动态更改topic

一、需求

        通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。

二、实现

1、新建一个demo数据库并添加几条数据来进行测试


站点设备信息表


存放监听数据表

2、创建一个springboot项目,开始编写Java代码

(1)创建springboot

            具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)

(2)配置配置文件

server.port=13010

spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull

spring.datasource.username = root

spring.datasource.password = 123456

spring.datasource.driver-class-name = com.mysql.jdbc.Driver

spring.datasource.type = com.zaxxer.hikari.HikariDataSource

spring.mqtt.url = tcp://127.0.0.1:61613

consumer.client.id = iot_mqtt

spring.mqtt.username = admin

spring.mqtt.password = password

spring.mqtt.completionTimeout = 3000

(3)pom文件导入相关jar包

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <modelVersion>4.0.0</modelVersion>

    <groupId>com.iotmqtt</groupId>

    <artifactId>iot-mqtt-demo</artifactId>

    <version>0.0.1-SNAPSHOT</version>

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.0.RELEASE</version>

    </parent>

    <dependencies>

        <!--spring-mqtt-->

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-mqtt</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.integration</groupId>

            <artifactId>spring-integration-stream</artifactId>

        </dependency>

        <dependency>

            <groupId>org.apache.commons</groupId>

            <artifactId>commons-lang3</artifactId>

            <version>3.0</version>

        </dependency>

        <!-- swagger -->

        <dependency>

            <groupId>io.springfox</groupId>

            <artifactId>springfox-swagger2</artifactId>

            <version>2.7.0</version>

        </dependency>

        <dependency>

            <groupId>io.springfox</groupId>

            <artifactId>springfox-swagger-ui</artifactId>

            <version>2.7.0</version>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-test</artifactId>

        </dependency>

        <dependency>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <!-- MySQL -->

        <dependency>

            <groupId>mysql</groupId>

            <artifactId>mysql-connector-java</artifactId>

        </dependency>

        <!-- mybatis -->

        <dependency>

            <groupId>org.mybatis.spring.boot</groupId>

            <artifactId>mybatis-spring-boot-starter</artifactId>

            <version>2.0.0</version>

        </dependency>

        <dependency>

            <groupId>org.mybatis</groupId>

            <artifactId>mybatis</artifactId>

            <version>3.4.1</version>

        </dependency>

        <!-- fastjson -->

        <dependency>

            <groupId>com.alibaba</groupId>

            <artifactId>fastjson</artifactId>

            <version>1.2.7</version>

        </dependency>

        <dependency>

            <groupId>org.apache.httpcomponents</groupId>

            <artifactId>httpclient</artifactId>

            <version>4.5.1</version>

        </dependency>

        <!--poi-->

        <dependency>

            <groupId>org.apache.poi</groupId>

            <artifactId>poi</artifactId>

            <version>3.16</version>

        </dependency>

        <dependency>

            <groupId>org.apache.poi</groupId>

            <artifactId>poi-ooxml</artifactId>

            <version>3.16</version>

        </dependency>

        <!-- lombok slf4j日志-->

        <dependency>

            <groupId>org.projectlombok</groupId>

            <artifactId>lombok</artifactId>

            <optional>true</optional>

        </dependency>

    </dependencies>

    <build>

        <plugins>

            <plugin>

                <groupId>org.springframework.boot</groupId>

                <artifactId>spring-boot-maven-plugin</artifactId>

            </plugin>

            <plugin>

                <groupId>org.apache.maven.plugins</groupId>

                <artifactId>maven-compiler-plugin</artifactId>

                <version>3.1</version>

                <configuration>

                    <source>1.8</source>

                    <target>1.8</target>

                </configuration>

            </plugin>

        </plugins>

    </build>

</project>

(3)创建MqttConfig,配置mqtt连接,编写topic更新方法

package com.iotmqtt.iotmqttdemo.config;

import com.alibaba.fastjson.JSON;

import com.alibaba.fastjson.JSONObject;

import com.iotmqtt.iotmqttdemo.bean.SiteData;

import com.iotmqtt.iotmqttdemo.service.DemoService;

import org.apache.commons.lang3.StringUtils;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.integration.annotation.ServiceActivator;

import org.springframework.integration.channel.DirectChannel;

import org.springframework.integration.core.MessageProducer;

import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;

import org.springframework.integration.mqtt.core.MqttPahoClientFactory;

import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;

import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;

import org.springframework.messaging.Message;

import org.springframework.messaging.MessageChannel;

import org.springframework.messaging.MessageHandler;

import org.springframework.messaging.MessagingException;

import org.springframework.stereotype.Component;

@Component

@Configuration

public class MqttConfig {

    private String[] defaultListenerTopic;

    @Value("${spring.mqtt.username}")

    private String username;

    @Value("${spring.mqtt.password}")

    private String password;

    @Value("${spring.mqtt.url}")

    private String hostUrl;

    @Value("${consumer.client.id}")

    private String clientId;

    @Value("${spring.mqtt.completionTimeout}")

    private int completionTimeout ;

    @Autowired

    DemoService demoService;

    private void setDefaultListenerTopic(){

        this.defaultListenerTopic = demoService.getTopics();

    }

    private MqttPahoMessageDrivenChannelAdapter  adapter;

    /**

    * 添加监听主题

    * @param topicArr

    */

    public void addTopic(String[] topicArr){

        //先清除历史

        removeTopic();

        //重新给defaultListenerTopic值

        this.defaultListenerTopic = topicArr;

        if(adapter == null){

            adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(),"");

        }

        for(String topic:topicArr){

            if(StringUtils.isNotEmpty(topic)){

                adapter.addTopic(topic,1);

            }

        }

        adapter.removeTopic();

    }

    /**

    * 清除监听主题

    */

    public void removeTopic(){

        if(adapter == null){

            adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(),"");

        }

        for(String topic:defaultListenerTopic){

            if(StringUtils.isNotEmpty(topic)){

                adapter.removeTopic(topic);

            }

        }

    }

    @Bean

    public MqttPahoClientFactory mqttClientFactory() {

        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();

        factory.setUserName(username);

        factory.setPassword(password);

        factory.setServerURIs(new String[]{hostUrl});

        factory.setKeepAliveInterval(2);

        return factory;

    }

    //接收通道

    @Bean

    public MessageChannel mqttInputChannel() {

        return new DirectChannel();

    }

    //配置client,监听的topic

    @Bean

    public MessageProducer inbound() {

        adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),"");

        setDefaultListenerTopic();

        for(String topic:defaultListenerTopic){

            if(StringUtils.isNotEmpty(topic)){

                adapter.addTopic(topic,1);

            }

        }

        adapter.setCompletionTimeout(completionTimeout);

        adapter.setConverter(new DefaultPahoMessageConverter());

        adapter.setOutputChannel(mqttInputChannel());

        return adapter;

    }

    //通过通道获取数据

    @Bean

    @ServiceActivator(inputChannel = "mqttInputChannel")

    public MessageHandler handler() {

        return new MessageHandler() {

            @Override

            public void handleMessage(Message<?> message) throws MessagingException {

                insert(message.getPayload().toString());

            }

        };

    }

    /**

    * 插入数据

    * @param message

    */

    private void insert(String message){

        SiteData siteData = new SiteData();

        JSONObject mesageObj = JSON.parseObject(message);

        siteData.setData(mesageObj.getString("data"));

        siteData.setSiteId(mesageObj.getString("siteId"));

        demoService.insert(siteData);

    }

}

(4)编写获得站点/设备topic的方法和保存数据的方法

mapper

package com.iotmqtt.iotmqttdemo.mapper;

import com.iotmqtt.iotmqttdemo.bean.SiteData;

import org.apache.ibatis.annotations.Insert;

import org.apache.ibatis.annotations.Param;

import org.apache.ibatis.annotations.Select;

public interface DemoMapper {

    @Select("select topics from site_info")

    String[] getTopics();

    @Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})")

    void insert(SiteData siteData);

}

server

package com.iotmqtt.iotmqttdemo.service;

import com.iotmqtt.iotmqttdemo.bean.SiteData;

public interface DemoService {

    public String[] getTopics();

    public void insert(SiteData siteData);

}

serviceImpl

package com.iotmqtt.iotmqttdemo.service.impl;

import com.iotmqtt.iotmqttdemo.bean.SiteData;

import com.iotmqtt.iotmqttdemo.mapper.DemoMapper;

import com.iotmqtt.iotmqttdemo.service.DemoService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class DemoServiceImplimplements DemoService {

@Autowired

    private DemoMapperdemoMapper;

    @Override

    public String[]getTopics(){

        return demoMapper.getTopics();

    }


    @Override

    public void insert(SiteData siteData){

        demoMapper.insert(siteData);

    }

}

(5)创建controller编写更新topic接口

package com.iotmqtt.iotmqttdemo.controller;

import com.iotmqtt.iotmqttdemo.config.MqttConfig;

import com.iotmqtt.iotmqttdemo.service.DemoService;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.web.bind.annotation.RequestMapping;

import org.springframework.web.bind.annotation.RequestMethod;

import org.springframework.web.bind.annotation.RestController;

@RestController

@RequestMapping({"/demo"})

public class DemoController {

    @Autowired

    DemoService demoService;

    @Autowired

    MqttConfig mqttConfig;

    @RequestMapping(value = "/updateTopic", method = RequestMethod.GET)

    public String updateTopic(){

        mqttConfig.addTopic(demoService.getTopics());

        return "更新topic成功";

    }

}

三、测试代码

1、服务启动后,使用mqtt连接工具发送数据测试功能

(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据


发送数据测试

(2)修改一个站点的topic,调用更新接口再此发送数据测试


修改1站点topic


执行更新topic接口
修改后重新发送数据

(4)检查数据更新情况


可以看到我们数据已经可以正常上来了

最后,项目源码我放到了百度网盘上,感兴趣的可以下载下来看看

地址:https://pan.baidu.com/s/1XRQKP1dYIQ6RVqvwkMyTqQ

提取码:mqtt

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容