mqtt简介
一种数据传输协议,不怎么耗资源,适合物联网远程传数据。比如一个传感器要发数据给电脑,那么需要开一个mqtt服务器(Broker),然后传感器作为客户端(client)通过mqtt服务器发布(publish)某个主题(topic)的消息(message),电脑也作为客户端,连接(connect)到mqtt服务器,且订阅(subscribe)该主题,就能收到消息。
搭建一个mqtt服务器
推荐EMQX,直接选择合适的版本下载压缩包,这里选择了emqx-windows7-v3.2.2.zip
,解压后,进入bin
目录,启动服务器:
D:>\emqx\bin>emqx.cmd start
如果要关闭服务器,只需如此:
D:>\emqx\bin>emqx.cmd stop
启动服务器后,浏览器打开http://127.0.0.1:18083
,使用默认管理账号admin/public
即可登录查看,在设置里还能改成中文界面,很方便。
编写Python例子
首先安装mqtt
模块:
pip install paho-mqtt
写一个发布客户端pub.py
:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
client.publish('fifa', payload='amazing', qos=0)
再写一个接受客户端sub.py
:
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
client.connect('127.0.0.1', 1883, 600) # 600为keepalive的时间间隔
client.subscribe('fifa', qos=0)
client.loop_forever() # 保持连接
可以先运行sub.py
,然后就开始持续监听,别关。另开一个窗口运行pub.py
,然后之前的客户端就能接收到消息了,发一次收一次。
保持连接
这里我的理解也不够深,不过大体能用。
-
loop()
感觉是一个心跳函数,用来保持客户端与服务器的连接。比如keepalive
参数为60秒,那么60秒内必须loop()
一下或者发布一下消息,不然连接会断,就无法继续发布或者接受消息。 -
loop_start()
是启用一个进程保持loop()
的重复调用,就不需要定期心跳了,对应的有loop_stop()
。 -
loop_forever()
用来保持无穷阻塞调用loop()
,原文如下:
"""This function call loop() for you in an infinite blocking loop.
It is useful for the case where you only want to run the MQTT client loop in your program."""
举例来说明几种用法,对于发布客户端,第一种保持连接的方式是在keeplive
的间隔内,发布消息或者调用loop()
。
client.connect('127.0.0.1', 1883, 5) # keeplive仅为5秒
for i in range(100):
client.publish('fifa', payload=f'amazing{i}', qos=0)
# client.loop() # 或者loop()
time.sleep(4) # 不能超过5秒
第二种方式是使用loop_start()
:
client.connect('127.0.0.1', 1883, 5)
client.loop_start()
for i in range(100):
client.publish('fifa', payload=f'amazing{i}', qos=0)
time.sleep(6) # 可以超过5秒了
对于订阅客户端,一种方法是使用loop_start()
保持连接,然后写个死循环阻塞程序,保持监听。
client.connect('127.0.0.1', 1883, 5)
client.subscribe('fifa', qos=0)
client.loop_start()
while True:
pass
第二种方法直接使用loop_forever()
,也能阻塞运行:
client.connect('127.0.0.1', 1883, 5)
client.subscribe('fifa', qos=0)
client.loop_forever()
都能持续接受消息。