一、需求
通过Java集成mqtt来获得设备监控到的数据,并且当设备发送mqtt的topic发生改变时,Java可以动态改变topic来继续监听设备发送的数据。
二、实现
1、新建一个demo数据库并添加几条数据来进行测试
2、创建一个springboot项目,开始编写Java代码
(1)创建springboot
具体创建过程略,可参考文章使用IDEA创建一个springboot项目 - 码出精彩人生 - 博客园 (cnblogs.com)
(2)配置配置文件
server.port=13010
spring.datasource.url = jdbc:mysql://localhost:3306/iot-mqtt-demo?characterEncoding=utf-8&useSSL=false&zeroDateTimeBehavior=convertToNull
spring.datasource.username = root
spring.datasource.password = 123456
spring.datasource.driver-class-name = com.mysql.jdbc.Driver
spring.datasource.type = com.zaxxer.hikari.HikariDataSource
spring.mqtt.url = tcp://127.0.0.1:61613
consumer.client.id = iot_mqtt
spring.mqtt.username = admin
spring.mqtt.password = password
spring.mqtt.completionTimeout = 3000
(3)pom文件导入相关jar包
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.iotmqtt</groupId>
<artifactId>iot-mqtt-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.RELEASE</version>
</parent>
<dependencies>
<!--spring-mqtt-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.0</version>
</dependency>
<!-- swagger -->
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- MySQL -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<!-- mybatis -->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.1</version>
</dependency>
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.7</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.1</version>
</dependency>
<!--poi-->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.16</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>3.16</version>
</dependency>
<!-- lombok slf4j日志-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
(3)创建MqttConfig,配置mqtt连接,编写topic更新方法
package com.iotmqtt.iotmqttdemo.config;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.stereotype.Component;
@Component
@Configuration
public class MqttConfig {
private String[] defaultListenerTopic;
@Value("${spring.mqtt.username}")
private String username;
@Value("${spring.mqtt.password}")
private String password;
@Value("${spring.mqtt.url}")
private String hostUrl;
@Value("${consumer.client.id}")
private String clientId;
@Value("${spring.mqtt.completionTimeout}")
private int completionTimeout ;
@Autowired
DemoService demoService;
private void setDefaultListenerTopic(){
this.defaultListenerTopic = demoService.getTopics();
}
private MqttPahoMessageDrivenChannelAdapter adapter;
/**
* 添加监听主题
* @param topicArr
*/
public void addTopic(String[] topicArr){
//先清除历史
removeTopic();
//重新给defaultListenerTopic值
this.defaultListenerTopic = topicArr;
if(adapter == null){
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(),"");
}
for(String topic:topicArr){
if(StringUtils.isNotEmpty(topic)){
adapter.addTopic(topic,1);
}
}
adapter.removeTopic();
}
/**
* 清除监听主题
*/
public void removeTopic(){
if(adapter == null){
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId,mqttClientFactory(),"");
}
for(String topic:defaultListenerTopic){
if(StringUtils.isNotEmpty(topic)){
adapter.removeTopic(topic);
}
}
}
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setUserName(username);
factory.setPassword(password);
factory.setServerURIs(new String[]{hostUrl});
factory.setKeepAliveInterval(2);
return factory;
}
//接收通道
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
//配置client,监听的topic
@Bean
public MessageProducer inbound() {
adapter = new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),"");
setDefaultListenerTopic();
for(String topic:defaultListenerTopic){
if(StringUtils.isNotEmpty(topic)){
adapter.addTopic(topic,1);
}
}
adapter.setCompletionTimeout(completionTimeout);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
//通过通道获取数据
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
insert(message.getPayload().toString());
}
};
}
/**
* 插入数据
* @param message
*/
private void insert(String message){
SiteData siteData = new SiteData();
JSONObject mesageObj = JSON.parseObject(message);
siteData.setData(mesageObj.getString("data"));
siteData.setSiteId(mesageObj.getString("siteId"));
demoService.insert(siteData);
}
}
(4)编写获得站点/设备topic的方法和保存数据的方法
mapper
package com.iotmqtt.iotmqttdemo.mapper;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
public interface DemoMapper {
@Select("select topics from site_info")
String[] getTopics();
@Insert("INSERT INTO site_data (siteId,data) VALUES (#{siteId},#{data})")
void insert(SiteData siteData);
}
server
package com.iotmqtt.iotmqttdemo.service;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
public interface DemoService {
public String[] getTopics();
public void insert(SiteData siteData);
}
serviceImpl
package com.iotmqtt.iotmqttdemo.service.impl;
import com.iotmqtt.iotmqttdemo.bean.SiteData;
import com.iotmqtt.iotmqttdemo.mapper.DemoMapper;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DemoServiceImplimplements DemoService {
@Autowired
private DemoMapperdemoMapper;
@Override
public String[]getTopics(){
return demoMapper.getTopics();
}
@Override
public void insert(SiteData siteData){
demoMapper.insert(siteData);
}
}
(5)创建controller编写更新topic接口
package com.iotmqtt.iotmqttdemo.controller;
import com.iotmqtt.iotmqttdemo.config.MqttConfig;
import com.iotmqtt.iotmqttdemo.service.DemoService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping({"/demo"})
public class DemoController {
@Autowired
DemoService demoService;
@Autowired
MqttConfig mqttConfig;
@RequestMapping(value = "/updateTopic", method = RequestMethod.GET)
public String updateTopic(){
mqttConfig.addTopic(demoService.getTopics());
return "更新topic成功";
}
}
三、测试代码
1、服务启动后,使用mqtt连接工具发送数据测试功能
(1)使用谷歌插件 MQTTlens 连接mqtt并发送数据
(2)修改一个站点的topic,调用更新接口再此发送数据测试
(4)检查数据更新情况
可以看到我们数据已经可以正常上来了
最后,项目源码我放到了百度网盘上,感兴趣的可以下载下来看看
地址:https://pan.baidu.com/s/1XRQKP1dYIQ6RVqvwkMyTqQ
提取码:mqtt