2022最新MQTT整合Spring 详解!

mqtt 整合springboot具体步骤代码

  • 1. pom 引入相关包

      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-configuration-processor</artifactId>
          <optional>true</optional>
      </dependency>
    
      <dependency>
          <groupId>org.springframework.integration</groupId>
          <artifactId>spring-integration-mqtt</artifactId>
          <version>5.5.14</version>
      </dependency>
    
  • 2. application.yml相关配置

server:
  port: 8080

mqtt:
  config:
    username: admin
    password: public
    hostUrl: tcp://localhost:1883
    clientId: ${random.value}
    pubTopic: pubTopic
    subTopic: subTopic
  • 3. MqttConfig配置类
package com.example.tdengine;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;


/**
 * mqtt配置类
 *
 * @author WeiWei
 * @date 2022/08/08
 */
@Data
@ConfigurationProperties(prefix = "mqtt.config")
@Component
public class MqttConfig {

    /**
     * 用户名
     */
    private String username;

    /**
     * 密码
     */
    private String password;

    /**
     * 主机url(代理端即 broker)
     */
    private String hostUrl;

    /**
     * 客户机id
     */
    private String clientId;

    /**
     * 发布的主题
     */
    private String pubTopic;

    /**
     * 订阅的主题
     */
    private String subTopic;


}

  • 4. MqttServer类
package com.example.tdengine;



import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.Objects;

/**
 * mqtt服务类
 *
 * @author WeiWei
 * @date 2022/08/08
 */
@Slf4j
@Configuration
public class MqttServer{

    /**
     * 出站通道
     */
    public static final String OUTBOUND_CHANNEL = "mqttOutboundChannel";

    /**
     * 输入通道
     */
    public static final String INPUT_CHANNEL = "mqttInputChannel";

    /**
     * mqtt配置
     */
    @Resource
    private MqttConfig mqttConfig;


    /**
     * 初始化
     */
    @PostConstruct
    public void init() {
        log.info(mqttConfig.toString());
    }

    /**
     * mqtt客户工厂
     *
     * @return {@link MqttPahoClientFactory}
     */
    @Bean
    public MqttPahoClientFactory clientFactory() {

        MqttConnectOptions options = new MqttConnectOptions();
        //配置MqttConnectOptions
        options.setServerURIs(new String[]{mqttConfig.getHostUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(options);
        return factory;
    }

    /**
     * mqtt出站通道
     *
     * @return {@link MessageChannel}
     */
    @Bean(value = OUTBOUND_CHANNEL)
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    /**
     * mqtt出站handler
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = OUTBOUND_CHANNEL)
    public MessageHandler mqttOutboundHandler() {
        //MqttPahoMessageHandler初始化
        MqttPahoMessageHandler handler = new MqttPahoMessageHandler(mqttConfig.getClientId(), clientFactory());
        //设置默认的qos级别
        handler.setDefaultQos(1);
        //保留标志的默认值。如果没有mqtt_retained找到标题,则使用它。如果提供了自定义,则不使用它converter。这里不启用
        handler.setDefaultRetained(false);
        //设置发布的主题
        handler.setDefaultTopic(mqttConfig.getPubTopic());
        //当 时true,调用者不会阻塞。相反,它在发送消息时等待传递确认。默认值为false(在确认交付之前发送阻止)。
        handler.setAsync(false);
        //当 async 和 async-events 都为 true 时,会发出 MqttMessageSentEvent(请参阅事件)。它包含消息、主题、客户端库生成的messageId、clientId和clientInstance(每次连接客户端时递增)。当客户端库确认交付时,会发出 MqttMessageDeliveredEvent。它包含 messageId、clientId 和 clientInstance,使传递与发送相关联。任何 ApplicationListener 或事件入站通道适配器都可以接收这些事件。请注意,有可能在 MqttMessageSentEvent 之前接收到 MqttMessageDeliveredEvent。默认值为false。
        handler.setAsyncEvents(false);
        return handler;
    }


    /**
     * mqtt输入通道
     *
     * @return {@link MessageChannel}
     */
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }


    /**
     * 入站
     *
     * @return {@link MessageProducer}
     */
    @Bean
    public MessageProducer inbound() {
        //配置订阅端MqttPahoMessageDrivenChannelAdapter
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(
                        mqttConfig.getClientId() + "_inbound", clientFactory(), mqttConfig.getSubTopic().split(","));
        //设置超时时间
        adapter.setCompletionTimeout(3000);
        //设置默认的消息转换类
        adapter.setConverter(new DefaultPahoMessageConverter());
        //设置qos级别
        adapter.setQos(1);
        //设置入站管道
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }


    /**
     * 消息处理程序
     *
     * @return {@link MessageHandler}
     */
    @Bean
    @ServiceActivator(inputChannel = INPUT_CHANNEL)
    public MessageHandler messageHandler() {
        return message -> {
            String topic = Objects.requireNonNull(message.getHeaders().get("mqtt_receivedTopic")).toString();
            log.info("订阅主题为: {}", topic);
            String[] topics = mqttConfig.getSubTopic().split(",");
            for (String t : topics) {
                if (t.equals(topic)) {
                    log.info("接收到该主题消息为: {}", message.getPayload().toString());
                }
            }
        };
    }

}

  • 5. MqttGateway接口
package com.example.tdengine;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

/**
 * mqtt网关(发布端需要用到)
 *
 * @author WeiWei
 * @date 2022/08/08
 */
@Component
@MessagingGateway(defaultRequestChannel = MqttServer.OUTBOUND_CHANNEL)
public interface MqttGateway {


    /**
     * 发送到mqtt
     *
     * @param payload 有效载荷
     */
    void sendToMqtt(String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

    /**
     * 发送到mqtt
     *
     * @param topic   主题
     * @param qos     qos
     * @param payload 消息内容
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

©著作权归作者所有,转载或内容合作请联系作者
【社区内容提示】社区部分内容疑似由AI辅助生成,浏览时请结合常识与多方信息审慎甄别。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

相关阅读更多精彩内容

友情链接更多精彩内容