mqtt消息推送

mqtt是什么?

MQTT 是一种基于发布/订阅模式的 轻量级物联网消息传输协议 ,可在严重受限的硬件设备和低带宽、高延迟的网络上实现稳定传输。

后端集成mqtt客户端-生产者

  • pom
<!--        mqtt-->
        <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
        </dependency>

        <!--mqtt相关依赖-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>
  • 发布者

    public class PublishSample {
    
    
    
        public static void main(String[] args) {
            //为了让前端js 接受到消息只能先websocket 协议
            String broker = "ws://broker.emqx.io:8083/mqtt";
    //        String broker = "tcp://broker.emqx.io:1883";
            String topic = "topic-123";
            String clientid = "publish_clien";
            String content = "各位好!";
            int qos = 0;
    
            try {
                MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence());
                // 连接参数
                MqttConnectOptions options = new MqttConnectOptions();
                options.setConnectionTimeout(60);
                options.setKeepAliveInterval(60);
                // 连接
                client.connect(options);
                // 创建消息并设置 QoS
    //            MqttMessage message = new MqttMessage(content.getBytes());
    //            message.setQos(qos);
                // 发布消息
    //            client.publish(topic, message);
    //            System.out.println("Message published");
    //            System.out.println("topic: " + topic);
    //            System.out.println("message content: " + content);
    
                //模拟产生日志
                for (int i = 0; i < 50; i++) {
                    try {
                        TimeUnit.MILLISECONDS.sleep(100L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    String msg = "发送【" + i + "】条日志信息-" + UUID.randomUUID().toString();
                    MqttMessage message = new MqttMessage(msg.getBytes());
                    message.setQos(qos);
                    client.publish(topic, message);
                    System.out.println("Message published");
                    System.out.println("topic: " + topic);
                    System.out.println("message content: " + content);
                }
    
    
                // 关闭连接
                client.disconnect();
                // 关闭客户端
                client.close();
            } catch (MqttException e) {
                throw new RuntimeException(e);
            }
        }
    }
    

前端集成mqtt客户端-消费者

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>mqtt</title>
    <style>
        #txt {
            border: 1px solid;
            width: 800px;
            height: 500px;
            overflow-y: scroll;

        }

    </style>
</head>
<body>
    <h5>日志实时展示</h5>
    <div id="txt">

    </div>

</body>
<script src="https://cdn.bootcdn.net/ajax/libs/mqtt/4.1.0/mqtt.min.js"></script>
<script>



    // Broker: broker.emqx.io
    // TCP Port: 1883
    // Websocket Port: 8083
    const connectUrl = `ws://broker.emqx.io:8083/mqtt`;
    // const connectUrl = `ws://broker.emqx.io/mqtt`;

    client = mqtt.connect(connectUrl, {
        clean: true,
        connectTimeout: 4000,
        reconnectPeriod: 1000,
        clientId: 'emqx_test',
        username: 'emqx_test',
        password: 'emqx_test'
    })

    // 需要订阅的主题
    const topic = 'topic-123';

    //成功连接后触发的回调
    client.on('connect', () => {
        console.log('已经连接成功');
        // 这里可以订阅多个主题
        client.subscribe([topic], () => {
            console.log(`订阅了主题 ${topic}`)
        })
    });

    // 当客户端收到一个发布过来的消息时触发回调
    client.on('message', function (topic, message, packet) {
        // 这里有可能拿到的数据格式是Uint8Array格式,所以可以直接用toString转成字符串
        // let data = JSON.parse(message.toString);
        // var s = JSON.stringify(message.toString());
        console.log("返回的数据:", message.toString())
        // console.log("返回的数据2:", s)


        //将字节数组转换 成 普通 字符串 utf-8编码
        var blob = new Blob([message]);
        var fileReader = new FileReader();
        fileReader.onload = function (event) {
            var result = event.target.result;
            console.log("解析收到消息:" + result)

            //渲染到页面上
            var txtDiv = document.querySelector("#txt");
            var p = document.createElement("p");
            p.textContent = `${result}`;
            txtDiv.appendChild(p);

        }
        fileReader.readAsText(blob)

    });

    // 连接断开后触发的回调
    client.on("close", function () {
        console.log("已断开连接")
    });



</script>

</html>

运行效果

image.png
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容