【JS tool】MQTT消息模块

简述

基于Paho-mqtt,实现mqtt连接与消息处理

主要功能

  1. MQTT 连接、重连、断开、消息发送
  2. 消息的订阅与发布

模块

/*
    消息列表
    const MESSAGE = {
        device_id:{
            message_code:message_body
        }
    }
*/
const MESSAGE = {}


/*
    消息订阅列表
    const SUBSCRIBER = {
        device_id:{
            message_code:[
                {
                    page,
                    callback
                }
            ]
        }
    }
*/
const SUBSCRIBER = {}

/* 消息到达后,存入MESSAGE*/
function onMqttMessageArrived({topic, payloadString}: { topic: string, payloadString }) {

    // 格式化消息
    let device_id = topic.split('/')[1]
    let {f: message_code, p: message_body} = JSON.parse(payloadString)
    console.log('---------------------------------------- message Arrived:', device_id, message_code, message_body)

    // 存放消息
    if (!(device_id in MESSAGE))
        MESSAGE[device_id] = {}
    MESSAGE[device_id][message_code] = message_body

    // 发布订阅
    if ((device_id in SUBSCRIBER) && (message_code in SUBSCRIBER[device_id])) {
        let subscribers = SUBSCRIBER[device_id][message_code]
        subscribers.forEach(subscriber => {
            subscriber.callback({f: message_code, p: message_body}, MESSAGE);
        })
    }
    /*
    // 可以考虑将发布订阅放到模块外,通过defineProperty/proxy来监听数据变化
    Object.defineProperty(MESSAGE[device_id], message_code, {
        get: function () {

        },
        set: function (value) {
            if (!(device_id in SUBSCRIBER)) {
                return;
            }
            if (!(message_code in SUBSCRIBER[device_id])) {
                return;
            }
            let subscribers = SUBSCRIBER[device_id][message_code]
            subscribers.forEach(subscriber => {
                subscriber.callback({f: message_code, p: value}, MESSAGE);
            })


        }
    })
    */

}

// MQTT类的接口,用来校验public属性与方法
interface MQTT_interface {
    mqtt;
    url: string;
    username: string;
    password: string;
    useSSL: boolean;
    keepAliveInterval: number;

    // 设置连接mqtt所需的参数,所有参数可选
    setConnectOptions({}: { url?: string, username?: string, password?: string, useSSL?: boolean, keepAliveInterval?: number });

    // 建立mqtt连接
    connect();

    // mqtt重连
    reconnect();

    // 断开mqtt连接
    disconnect();

    // 发送mqtt消息
    send();

    /*
    * 添加监听
    * device_id: 需要监听的设备
    * message_code: 需要监听的消息code
    * page: 在哪个页面监听
    * callback: 监听到数据时的回调函数
    * */
    subscribe(device_id: string, message_code: string | number, page: string, callback: () => any);

    // 解除监听,参数参考subscribe
    unsubscribe(device_id: string, message_code: string | number, page: string);
}

// 重连次数
let reconnect_count = 0;

// MQTT处理
class MQTT implements MQTT_interface {
    public mqtt
    public url: string = ''
    public username: string = ''
    public password: string = ''
    public useSSL: boolean = true
    public keepAliveInterval: number = 10;

    constructor() {
    }

    setConnectOptions({url, username, password, useSSL, keepAliveInterval}
                          : { url?: string, username?: string, password?: string, useSSL?: boolean, keepAliveInterval?: number }
    ) {
        if (url)
            this.url = url;
        if (username)
            this.username = username;
        if (password)
            this.password = password;
        if (useSSL)
            this.useSSL = useSSL;
        if (keepAliveInterval)
            this.keepAliveInterval = keepAliveInterval
    }

    connect() {

        // 模拟发送消息
        setInterval(() => {
            let message: { topic: string, payloadString: string } = {
                topic: 'topic/device_id_1',
                payloadString: JSON.stringify({f: Math.floor(Math.random() * 10), p: Math.floor(Math.random() * 100)})
            }
            onMqttMessageArrived(message)
        }, 1.5 * 1000)
        /*
        this.mqtt = new Paho.Client(this.url, this.password)
        this.mqtt.onConnectionLost = this.reconnect();
        this.mqtt.onMessageArrived = onMqttMessageArrived;

        return new Promise(resolve => {
            this.mqtt.connect({
                userName:this.username,
                password:this.password,
                useSSL:this.useSSL,
                keepAliveInterval:this.keepAliveInterval,
                onSuccess(){
                    reconnect_count = 0;
                    resolve(true);
                },
                onFailure(){
                    this.reconnect();
                }
            })
        })
        */

    }

    reconnect() {
        reconnect_count++;
        setTimeout(() => {
            this.connect();
        }, reconnect_count * .5)
    }

    disconnect(): boolean {
        return true
    }

    send() {
    }

    subscribe(device_id: string, message_code: string | number, page: string, callback: (message) => any) {
        if (!(device_id in SUBSCRIBER))
            SUBSCRIBER[device_id] = {}

        if (!(message_code in SUBSCRIBER[device_id]))
            SUBSCRIBER[device_id][message_code] = []

        let is_existed = SUBSCRIBER[device_id][message_code].some(item => item.page === page)
        if (is_existed)
            SUBSCRIBER[device_id][message_code] = SUBSCRIBER[device_id][message_code].map(item => item.page === page ? {
                ...item,
                callback
            } : item)
        else
            SUBSCRIBER[device_id][message_code].push({page, callback})

        return SUBSCRIBER
    }

    unsubscribe(device_id: string, message_code: string | number, page: string) {

        if ((device_id in SUBSCRIBER) && (message_code in SUBSCRIBER[device_id])) {
            SUBSCRIBER[device_id][message_code] = SUBSCRIBER[device_id][message_code].filter(item => item.page !== page)
            if (SUBSCRIBER[device_id][message_code].length === 0) {
                delete SUBSCRIBER[device_id][message_code]
            }
        }

        return SUBSCRIBER
    }
}

建立连接

// 建立mqtt连接
let mqtt = new MQTT();
mqtt.setConnectOptions({url: '', username: '', password:''})
mqtt.connect()

消息订阅与取消

// 在不同页面/组件内,订阅mqtt消息
mqtt.subscribe('device_id_1', 1, 'page_1', message => {
    console.log('callback', 'device_id_1-page_1', message)
})
mqtt.subscribe('device_id_1', 1, 'page_2', message => {
    console.log('callback', 'device_id_1-page_2', message)
})
mqtt.subscribe('device_id_1', 2, 'page_1', message => {
    console.log('callback', 'device_id_1-page_1', message)
})
mqtt.subscribe('device_id_1', 3, 'page_1', message => {
    console.log('callback', 'device_id_1-page_1', message)
})

// 取消订阅某条消息(退出页面,取消不需要的订阅)
mqtt.unsubscribe('device_id_1', 2, 'page_1')

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,014评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,796评论 3 386
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,484评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,830评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,946评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,114评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,182评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,927评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,369评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,678评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,832评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,533评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,166评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,885评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,128评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,659评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,738评论 2 351

推荐阅读更多精彩内容

  • 随着物联网(Internet of Things,IoT)的兴起,机器之间(Machine-to-Machine,...
    登高且赋阅读 12,491评论 0 18
  • 一、简述 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)...
    高广超阅读 24,819评论 2 5
  • 简介 MQTT 全称为 Message Queuing Telemetry Transport(消息队列遥测传输)...
    殖民_FE阅读 4,401评论 1 6
  • 网络编程 1. 概论 建立连接:通过IP或者域名来连接两台设备,通过端口号找到对应的通信程序 通信协议:要传输的数...
    陵无山阅读 3,923评论 0 12
  • 2018.2.26.登西甄山 咬定青山不放松 立根原在破岩中 千磨万击还坚劲 任尔东西南北风
    一葉一豆阅读 179评论 0 1