MQTT协议之消息处理流程

前言

前面的笔记已把所有消息类型都过了一遍,这里从消息处理流程的角度尝试解读一下。

网络故障

在任何网络环境下,都会出现一方连接失败,比如离开公司大门那一刻没有了WIFI信号。但持续连接的另一端-服务器可能不能立即知道对方已断开。类似网络异常情况,都有可能在消息发送的过程中出现,消息发送出去,就丢失了。
  MQTT协议假定客户端和服务器端稳定情况一般,彼此之通信管道不可靠,一旦客户端网络断开,情况就会很严重,很难恢复原状。
  但别忘记,很多客户端会有永久性存储设备支持,比如闪存ROM、存储卡等,在通信出现异常的情况下可以用于保存关键数据或状态信息等。
  总之,异常网络情况很复杂,只能小心处理之。

消息重发策略

QoS > 0情况下,PUBLISH、PUBREL、SUBSCRIBE、UNSUBSCRIBE等类型消息在发送者发送完之后,需要等待一个响应消息,若在一个指定时间段内没有收到,发送者可能需要重试。重发的消息,要求DUP标记要设置为1.
  等待响应的超时应该在消息成功发送之后开始算起,并且等待超时应该是可以配置选项,以便在下一次重试的时候,适当加大。比如第一次重试超时10秒,下一次可能为20秒,再一次重试可能为60秒呢。当然,还要有一个重试次数限制的。
  还有一种情况,客户端重新连接,但未在可变头部中设置clean session标记,但双方(客户端和服务器端)都应该重试先前未发送的动态消息(in-flight messages)。客户端不被强制要求发送未被确认的消息,但服务器端就得需要重发那些未被去确认的消息。

QoS level决定的消息处理流程

QoS level为Quality of Service level的缩写,翻译成中文,服务质量等级。
  MQTT 3.1协议在"4.1 Quality of Service levels and flows"章节中,仅仅讨论了客户端到服务器的发布流程,不太完整。因为决定消息到达率,能够提升发送质量的,应该是服务器发布PUBLISH消息到订阅者这一消息流方向。

QoS level 0

至多发送一次,发送即丢弃。没有确认消息,也不知道对方是否收到。

| Client | Message and direction | Server |
| ------------- |:-------------:|: -----|
| QoS = 0 | PUBLISH
----------> | Action: Publish message to subscribers then Forget
**Reception: **<=1 |

  • 针对的消息不重要,丢失也无所谓。
  • 网络层面,传输压力小。

QoS level 1

所有QoS level 1都要在可变头部中附加一个16位的消息ID
  SUBSCRIBE和UNSUBSCRIBE消息使用QoS level 1
  针对消息的发布,Qos level 1,意味着消息至少被传输一次。
发送者若在一段时间内接收不到PUBACK消息,发送者需要打开DUB标记为1,然后重新发送PUBLISH消息。因此会导致接收方可能会收到两次PUBLISH消息。针对客户端发布消息到服务器的消息处理流程:

| Client | Message and direction | Server |
| ------------- |:-------------:|: ----------|
| QoS = 1
DUP = 0
Message ID = x

Action: Store message | PUBLISH
----------> | Actions:
1. Store message
2. Publish message to subscribers
3. Delete message
**Reception: **>=1 |
| Action: Discard message| PUBACK
<---------- | Message ID = x |
针对服务器发布到订阅者的消息处理流程:

| Server | Message and direction | Subscriber |
| ------------- |:-------------:|: ----------|
| QoS = 1
DUP = 0
Message ID = x
| PUBLISH
----------> | Actions:
1. Store message
2. Make message available
**Reception: **>=1 |
| | PUBACK
<---------- | Message ID = x |

发布者(客户端/服务器)若因种种异常接收不到PUBACK消息,会再次重新发送PUBLISH消息,同时设置DUP标记为1。接收者以服务器为例,这可能会导致服务器收到重复消息,按照流程,broker(服务器)发布消息到订阅者(会导致订阅者接收到重复消息),然后发送一条PUBACK确认消息到发布者。
  在业务层面,或许可以弥补MQTT协议的不足之处:重试的消息ID一定要一致接收方一定判断当前接收的消息ID是否已经接受过,但一样不能够完全确保,消息一定到达了。

QoS level 2

仅仅在PUBLISH类型消息中出现,要求在可变头部中要附加消息ID。
  级别高,通信压力稍大些,但确保了仅仅传输接收一次。
  先看协议中流程图,Client -> Server方向,会有一个总体印象:

| Client | Message and direction | Server |
| ------------- |:-------------:|: ----------|
| QoS = 2
DUP = 0
Message ID = x

Action: Store message | PUBLISH
----------> | Actions(a):
Store message
or
Actions(b):
1. Store message
2. Publish message to subscribers |
| Message ID = x| PUBREL
----------> | Actions(a):
1. Publish message to subscribers
2. Delete message
or
Actions(b):
Delete message ID |
| | PUBREC
<---------- | Message ID = x |
| Action: Discard message| PUBCOMP
<---------- | Message ID = x |
针对服务器发布到订阅者的消息处理流程:

| Server | Message and direction | Subscriber |
| ------------- |:-------------:|: ----------|
| QoS = 2
DUP = 0
Message ID = x

Action: Store message | PUBLISH
----------> | Actions:
Store message |
| | PUBREL
----------> | Message ID = x|
| Message ID = x| PUBREC
<---------- | Actions:
Make message available |
| | PUBCOMP
<---------- | Message ID = x |
  Server端采取的方案a和b,都包含了何时消息有效,何时处理消息。两个方案二选一,Server端自己决定。但无论死采取哪一种方式,都是在QoS level 2协议范畴下,不受影响。若一方没有接收到对应的确认消息,会从最近一次需要确认的消息重试,以便整个(QoS level 2)流程打通。

消息顺序

消息顺序会受许多因素的影响,但对于服务器程序,必须保证消息传递流程的每个阶段要和开始的顺序一致。例如,在QoS level 2定义的消息流中,PUBREL流必须和PUBLISH流具有相同的顺序发送:

| Client | Message and direction | Server |
| ------------- |:-------------:|: ----------|
||PUBLISH 1
---------->
PUBLISH 2
---------->
PUBLISH 3
---------->||
||PUBREC 1
<----------
PUBREC 2
<----------||
||PUBREL 1
---------->||
||PUBREC 3
<----------||
||PUBREL 2
---------->||
||PUBCOMP 1
<----------||
||PUBREL 3
---------->||
||PUBCOMP 2
<----------
PUBCOMP 3
<----------||

流动消息(in-flight messages)数量允许有一个可保证的效果:

  • 在流动消息(in-flight)窗口1中,每个传递流在下一个流开始之前完成。这保证消息以提交的顺序传递
  • 在流动消息(in-flight)大于1的窗口,只能在QoS level内被保证消息的顺序

消息的持久化

在MQTT协议中,PUBLISH消息固定头部RETAIN标记,只有为1才要求服务器需要持久保存此消息,除非新的PUBLISH覆盖。
  对于持久的、最新一条PUBLISH消息,服务器不但要发送给当前的订阅者,并且新的订阅者(new subscriber,同样需要订阅了此消息对应的Topic name)会马上得到推送。

Tip:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送,不是所有。

消息流的编码/解码

MQTT协议中,由目前定义的14种类型消息在客户端和服务器端之间数据进行交互。若以JAVA语言构建MQTT服务器,可选择Netty作为基础。
  在Netty中,数据的进入和流出,代表了一次完整的交互。无论是要进入的还是要流出的数据(单独以服务器为例),都可看做字节流。若把每种类型消息抽象为一个具体对象,那么处理起来就不难了。
  客户端->服务器,进入的字节流,逐个字节/单位读取,可还原成一个具体的消息对象(解码的过程)。
  要发送到客户端的消息对象,转换(编码)成字节流,然后由TCP通道流转到接收者。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,752评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,100评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,244评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,099评论 1 286
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,210评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,307评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,346评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,133评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,546评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,849评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,019评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,702评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,331评论 3 319
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,030评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,260评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,871评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,898评论 2 351

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,644评论 18 139
  • 前言 这次要讲到客户端/服务器的发布消息行为,与PUBLISH相关的消息类型,会在这里提到。 PUBLISH 客户...
    技术学习阅读 5,022评论 0 2
  • 序 本篇会把连接(CONNECT)、心跳(PINGREQ/PINGRESP)、确认(CONNACK)、断开连接(D...
    技术学习阅读 9,753评论 0 8
  • MQTT Protocol MQTT协议特性 一句话总结:MQTT是一个简单,轻量的消息发布/订阅协议。 MQTT...
    福克斯记阅读 7,245评论 0 9
  • 前言 接到任务项目需要用MQTT来写消息推送,经过一段时间在网上查看资料后写下这篇文章,文章内容大都来自互联网,在...
    Hank_Zhong阅读 16,523评论 69 51