「Android基于MQTT实现消息通知」

「Android基于MQTT实现消息通知」

一、写在前面

在对接项目中IoT时,发现目前有对MQTT做了接入,这里记录一下,官方的资料比较详细,这里主要从实现细节出发;对具体的需求以及配套的技术方案进行整理,以供参考。

一、IoT与MQTT

提到 IoT(Internet of Things)、IIoT(Industrial IoT ) 不得不说 MQTT,其被广泛的应用在物联网以及工业物联网之中,是一种消息传递协议。不同于我们所认识的平时常见的一些智能设备,如手机、电脑、平板等;这些设备一般都有着很好的计算能力,所依赖的网络环境很优质。但是一般的硬件设备性能较差,网络环境不稳定,而MQTT则是专门针对于硬件性能,网络状态不稳定场景下而生的。有着天然的优势。

二、什么是MQTT

MQTT是用于物联网的最常用的消息传递协议 (IoT)。MQTT代表MQ遥测运输。该协议是一组规则,它定义了IoT设备如何通过Internet发布订阅数据。用于IoT和工业IoT(IIT)设备(例如嵌入式设备,传感器,工业PLC等)之间的消息传递和数据交换。协议是事件驱动的,并使用发布/订阅(PUB / SUB)模式连接设备。发布者和接收器(订阅者)通过主题通信,并彼此分离。它们之间的连接由MQTT代理处理。MQTT代理过滤所有传入消息并将其正确分发给订阅者。

三、与传统Http的区别
  • MQTT以数据为中心,底层基于TCP链接,直接操作轻量级的二进制数据,并且数据包很小(可以小到一个字符,两个字节)。划重点,由于这个特性,其对于网络环境状态要求没有HTTP那么高,这也是为什么广泛应用于IoT设备的原因之一。
  • MQTT基于发布/订阅模式,区别于HTTP的请求/回调模式,这就决定了一个同一个设备即可以是客户端(Client)同时也可以是服务端(Server),回想发布订阅模式,消息的发布可以是1toN(N>=0),而HTTP则是1to1。
  • MQTT的发布/订阅架构决定了其无法基于UDP(面向无链接),而HTPP底层可以是基于TCP或者UDP。
  • 消息体量的区别,MQTT数据包很小,而HTTP数据量一般较大。
四、MQTT构成部分
1.Publish&Subscribe

MQTT对于发布订阅做了自身的解耦处理,主要是从三个维度出发,1.空间解耦:发布者和订阅者不需要相互了解(例如,没有IP地址和端口的交换)。2.时间解耦:发布者和订阅者不需要同时运行。3.同步解耦:在发布或接收期间,两个组件的操作不需要中断。详细信息

2.Client、Broker

详细信息

3.Topics&Best Practices

主要需要注意Topics的匹配规则,分为单项通配符,与多项通配符。单项以 + 连接:this/is/+/single,其中仅仅 + 部分可以被替换为单个路径(以 / 分割)。多项通配符仅支持在尾端支持:this/is/multi/#,并且是多级的。

详细信息

4.Keep Alive

保活时效,包括其他的字段,官方文档都给出了很详细的解释,认真了解一项技术实现,官方的文档还是最好的选择文档。这里主要基本认识MQTT是个什么东西,具体的实现细节与规范也不是一两句话可以说的清楚的,且可能存在误导的风险。MQTT

五、MQTT实际项目中的使用
1.实现什么需求?

以实际的项目为例,现需要实现的功能有:

  • 服务端下发消息通知到IoT设备,消息以Type区分,不同的消息需对应不同的处理措施。
  • 根据消息的不同,有语音播放、叫号、本地数据更新、服务端配置下发。

功能相对很简单,总结就是服务端推送消息,设备根据消息做出响应。

2.具体实现方案

导入依赖

implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'

主要分为几个类:a.主体请求Client,b.数据返回的回调dataCallback,c.链接状态回调connectCallback,d.具体消息处理策略IHandler。方案主要就包括这几个大类,逐步实现各个细节。

3.数据接口回调IDataCallback
interface IDataCallback {
  fun connectionError(cause: Throwable?)
  fun dataMessage(dataMessage: String)
}
4.连接状态回调IConnectionCallback
interface IConnectionCallback {
  fun connectSuccess()
  fun connectFail(reason: Throwable?)
}
5.CMqttClient链接类

在实现之前,列举几个关键的参数,参数配置在MqttConnectOptions

val options = MqttConnectOptions()
//默认为true,表示非持久订阅,无论服务端或者客户端重启,不会保持状态,重启后指定消息也无法送达
//设置为false,表示持久订阅,服务端与客户端重启或重链,指定消息可以送达
options.isCleanSession = false
//链接的用户名(账号名)
options.userName = user
//链接的用户密码(账户密码)
options.password = password.toCharArray()
//链接超时值s,默认30s,为0时,等待网络状态,即成功或失败
options.connectionTimeout = connectTimeout
//默认60s,检测服务端是否可用,为0时则禁止客户端保活,保活间隔内,没有消息的情况下客户端会通过ping来检测链接是否保持
options.keepAliveInterval = keepAliveInterval
//是否开启自动重连接,初始尝试重连是等待1s,失败情况下,延迟加倍,直到2分钟。
options.isAutomaticReconnect = true

关于自动重新连接有三个必要条件,cleanSession需要设置为falseisAutomaticReconnect需要设置为true,并且初始已经连接过。划重点,这里就要求,MQTT虽然可以自动重试连接当时必须有这三个前提,那么首次由于网络等其他原因未能连接的,这层的重试机制是需要我们自身去实现的,也就是需要保证首次能够连接到服务端。源码以及注释:

//Reconnect Only appropriate if cleanSession is false and we were connected. Declare as synchronized to avoid multiple calls to this method to send connect multiple times
synchronized void reconnect() {
  //....
}
//注释说的很明确三个条件,1.cleanSession需设置为false,2.isAutomaticReconnect需设置为true,并且是之前是已经连接过了。
class CMqttClient {
  //正在连接服务器
  private var connecting = false
  private var mqttAndroidClient: MqttAndroidClient? = null
  //连接到服务器
  fun connectToServer(
    context: Context,
    host: String,
    clientId: String,
    accountName: String,
    accountPsw: String,
    connectTimeout: Int = 20,
    keepAliveTime: Int = 60,
    connectionCallback: IConnectionCallback? = null,
    dataCallback: IDataCallback? = null
  ) {
    if(null == mqttAndroidClient) {
      mqttAndroidClient = MqttAndroidClient(context, host, clientId)
      mqttAndroidClient?.setCallback(object: MqttCallback {
        override fun connectionLost(cause: Throwable?) {
          dataCallback?.connectionError(cause)
        }
        override fun messageArrived(topic: String?, message: MqttMessage?) {
          message?.let {
            val payLoad = String(it.payload)
            Log.d(xxxxxxxxx)
            dataCallback?.dataMessage(payLoad)
          }
        }
        override fun deliveryComplete(token: IMqttDeliveryToken?) {
          //do something
        }
      })
    }
    connecting = true
    val options = MqttConnectOptions()
    options.isCleanSession = false
    options.userName = user
    options.password = password.toCharArray()
    options.connectionTimeout = connectTimeout
    options.keepAliveInterval = keepAliveInterval
    options.isAutomaticReconnect = true
    mqttAndroidClient?.connect(options, null, object: IMqttActionListener{
      override fun onSuccess(asyncActionToken: IMqttToken?) {
        connecting = false
        connectionCallback?.connectSuccess()
      }
      override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
        connecting = false
        connectionCallback?.connectFail(exception)
      }
    })
  }
  //是否已经建立链接
  fun hasConnected(): Boolean {
    return try {
      mqttAndroidClient?.isConnected == true
    } catch {
      false
    }
  }
  //断开与服务端连接
  fun disConnectFromServer() {
    if(mqttAndroidClient?.isConnected == true) {
      mqttAndroidClient?.disconnect()
    }
    connecting = false
  }
  //释放资源
  fun relaseClient() {
    mqttAndroidClient?.close()
    mqttAndroidClient = null
    connecting = false
  }
  //订阅消息
  fun subscribe(topics: Array<String>, qos: IntArray, timeOut: Long = 2000): Boolean {
    return mqttAndroidClient?.let {
      try{
        val mqttToken = it.subscribe(topics, qos)
        mqttToken.waitForCompletion(timeout)
        mqttToken.isComplete
        true
      } catch {
        Log.d(xxxx)
        false
      }
    }?: false
  }
  //取消订阅
  fun unSubscribe(): Boolean {
    //省略
  }
}

需要注意的是这里的ClientId,是唯一性的,像IoT设备以设备deviceId作为ClientId,如果换成用户userId,当在多设备登录的情况下,那么重试等其他一些机制会影响预期结果,给排查问题带来一定的难度。

5.消息处理接口IHandler
interface IHandler {
  fun handlerMessage(message: String)
}

消息体中会包含不同的type,根据不同的type实现不同的处理器,当然为了灵活还要借助注解机制

6.注解模版
@Target(AnnotationTarget.CLASS)
@Retention(AnnotationRetention.SOURCE)
annotation class MQTTHandler {
  val groupName: String
  val type: String
}

通过反射的方式加载对应的IHandler实现类,核心代码

interface IHandlerProvide {
  fun provideHandler(handlerType: String): IHandler?
  fun release()
}

class XXXHandlerProvider: IHandlerProvide {
  override fun provideHandler(handlerType: String): IHandler? {
    //find the imp IHandler
  }
  override fun release() {
    //release the source
  }
}

override fun provideHandler(handlerType: String): IHandler? {
  val handlerClass = Maneger.instance.findIHandlersByGroup(GroupName)?.get(handlerType)
}

使用时,直接加上注解:

@MQTTHandler(groupName = groupxxxx, type = typeNamexxxx)
class TestHandler: IHandler {
  override fun handlerMessage(message: String) {
    //do something what you want
  }
}

整个流程的主要部分已经给出,核心是通过不同的消息type查找出对应的处理器;当然这部分主要是由注解完成的,对于处理器的查找则是通过反射的方式来进行匹配的。

六、文档

MQTT官网

作者:快乐二狗
链接:https://juejin.cn/post/7070081254548307999

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,843评论 6 502
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,538评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 163,187评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,264评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,289评论 6 390
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,231评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,116评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,945评论 0 275
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,367评论 1 313
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,581评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,754评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,458评论 5 344
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,068评论 3 327
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,692评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,842评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,797评论 2 369
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,654评论 2 354

推荐阅读更多精彩内容