#coding:utf-8
import time
import json
from paho.mqtt import client as mqtt_client
import base64
broker = 'mqtt代理服务器地址' # mqtt代理服务器地址
port = 1883 # 端口号
keepalive = 60 # 与代理通信之间允许的最长时间段(以秒为单位)
topic = "ug/SN1/uc/SN2/uplink" #
client_id = '-1' # 客户端id不能重复
client_password = 'password' # 客户端密码
def to_temhumidityco2tvoc(tem,humidity,co2,tvoc):
'''将 需要发送的数据 转化为 传感器发送的数据格式'''
length_tem = 2
bytes_tem = b'\x03\x67'
value_tem = (tem*10).to_bytes(length_tem, byteorder='little', signed=True)
bytes_tem += value_tem[0:length_tem]
length_humidity = 1
bytes_humidity = b'\x04\x68'
value_humidity = (humidity*2).to_bytes(length_humidity, byteorder='little')
bytes_humidity += value_humidity[0:length_humidity]
length_CO2 = 2
bytes_CO2 = b'\x07\x7D'
value_CO2 = (co2).to_bytes(length_CO2, byteorder='little')
bytes_CO2 += value_CO2[0:length_CO2]
length_tvoc = 2
bytes_tvoc = b'\x08\x7D'
value_tvoc = (tvoc).to_bytes(length_tvoc, byteorder='little')
bytes_tvoc += value_tvoc[0:length_tvoc]
bytes_temhumidityco2tvoc = bytes_tem + bytes_humidity + bytes_CO2 + bytes_tvoc
res = base64.b64encode(bytes_temhumidityco2tvoc)
return res
def get_info(tem,CO2,TVOC,humidity):
'''传感器上报的数据内容'''
temhumidityco2tvoc_byte = to_temhumidityco2tvoc(tem,humidity,CO2,TVOC)
temhumidityco2tvoc_raw = str(temhumidityco2tvoc_byte,'UTF-8')
info = {"version":2,"event":"status","data":{"raw":temhumidityco2tvoc_raw,"rssi":-73,"snr":11.2,"fPort":85,"bandwidth":125,"spreadFactor":7,"adr":1},"ts":int(time.time())}
return json.dumps(info)
def connect_mqtt():
'''连接mqtt代理服务器'''
def on_connect(client, userdata, flags, rc):
'''连接回调函数'''
# 响应状态码为0表示连接成功
if rc == 0:
print("Connected to MQTT OK!")
else:
print("Failed to connect, return code %d\n", rc)
# 连接mqtt代理服务器,并获取连接引用
client = mqtt_client.Client(client_id)
client.username_pw_set(client_id, client_password)
client.on_connect = on_connect
client.connect(broker, port, keepalive)
return client
def publish(client):
'''发布消息'''
while True:
'''每隔2秒发布一次服务器信息'''
time.sleep(2)
msg = get_info(-17, 1490, 399, 88) # 参数含义顺序:(tem,CO2,TVOC,humidity)
result = client.publish(topic, msg)
status = result[0]
status = status
if status == 0:
print("send succ")
else:
print("send fail")
def run():
'''运行发布者'''
client = connect_mqtt()
# 运行一个线程来自动调用loop()处理网络事件, 非阻塞
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
使用python发送mqtt请求
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- 简书已停更,欢迎转到个人博客查看对应教程:https://www.cnblogs.com/superhin/p/1...
- Python使用grequests并发发送请求 前言 requests是Python发送接口请求非常好用的一个三方...
- requests是Python发送接口请求非常好用的一个三方库,由K神编写,简单,方便上手快。但是requests...
- 上面我们介绍了urllib模块的使用,有一个比urllib更加“人性化”的模块,那就是requests库,使用它可...
- 1.我们使用postman进行接口测试的时候,发现POST请求方式的编码有3种,具体的编码方式如下: A:appl...