首先下载库文件包coreMQTT包,coreMQTT-main.zip,解压后添加到工程中。
根据官方说明,需要根据自己的硬件平台编写对应的硬件接口相关函数,所以新建mytransport_interface.c和mytransport_interface.h两个文件,用于硬件接口的编写。
硬件接口函数主要包含传输发送和传输接收两个函数的实现以及时间函数,具体参数资料https://www.freertos.org/Documentation/api-ref/coreMQTT/docs/doxygen/output/html/mqtt_transport_interface.html#mqtt_transport_interface_overview
主要添加代码说明如下:
1.重定义NetworkContext
struct NetworkContext
{
//int NetworkContext;
unsigned char *Recv_Buf;//接收数据数组
unsigned short int *Recv_Buf_Lenth;//接收数据长度
int Recv_Buf_Index;//接收数据数组当前位置
unsigned char NetworkRecvFlag;//获取到网络数据标志
};
这个结构体里边用户可以自定义网络传输相关的一些参数及变量
2、定义网络发送接口函数
/*定义网络发送接口函数*/
int32_t myNetworkSend( NetworkContext_t * pNetworkContext,
const void * pBuffer,
size_t bytesToSend )
{
BaseType_t err=pdFALSE;
int32_t bytesSent = 0;
bytesSent=bytesToSend;
//int8_t val=0;
// uint8_t testlenth=0;
//********清除接收*********
// memset(GPRS_ReceiveBuf,0,usGPRS_RX_Lenth);
// usGPRS_RX_Lenth=0;
// testlenth=*(pNetworkContext->Recv_Buf_Lenth);
memset(pNetworkContext->Recv_Buf,0,*(pNetworkContext->Recv_Buf_Lenth));
*(pNetworkContext->Recv_Buf_Lenth)=0;
pNetworkContext->Recv_Buf_Index=0;//接收数组当前位置清零
pNetworkContext->NetworkRecvFlag=0;//获取到网络数据标志
//保证信号量是被清除过的,AT指令存在回复两次的情况,会多触发一次信号量
err=xSemaphoreTake(BinarySemaphore_MQTT_RX,1);
HAL_UART_Transmit(&huart_gsm,(uint8_t *)pBuffer,bytesToSend,50);
return bytesSent;
}
这里特殊处理的地方是发送之前清空了接收相关参数的内容,保证后续接收到的是正确的最新数据
3、定义网络接收接口函数
/*定义网络接收接口函数*/
int32_t myNetworkRecv( NetworkContext_t * pNetworkContext,
void * pBuffer,
size_t bytesToRecv )
{
static uint8_t tempindex=0;
BaseType_t err=pdFALSE;
int32_t bytesReceived = 0;
bytesReceived=bytesToRecv;
testRevByte_buf[tempindex++]=bytesToRecv;
if(pNetworkContext->NetworkRecvFlag!=1)//阻塞式等待获取到网络数据
{
err=xSemaphoreTake(BinarySemaphore_MQTT_RX,1000); //获取信号量
if(err==pdTRUE) //获取信号量成功
{
pNetworkContext->NetworkRecvFlag=1;
}
else
{
bytesReceived=0;
}
}
if(pNetworkContext->NetworkRecvFlag==1)
{
memmove(pBuffer,pNetworkContext->Recv_Buf+pNetworkContext->Recv_Buf_Index,bytesToRecv);
pNetworkContext->Recv_Buf_Index+=bytesToRecv;
}
else
{
bytesReceived=0;
}
return bytesReceived;
}
经过实际测试发现,MQTT库在调用该接口函数获取网络数据时,并不会一次获取所有需要的数数据,而是首先获取第一个字节数据,进行判断,然后再获取后续数据依次判断,所以会连续调用该函数多次才会获取完成所有网络数据,因此要能够保证调用该函数时数据上的连续和持续性,并且再未调用完成所有数据之前不能清空接收数据buffer。
4、时间函数
/*定义时间获取函数*/
uint32_t myGetTimeStampMs(void)
{
return uiTimeCount_1ms;
}
void SysTick_Handler(void)
{
HAL_IncTick();
osSystickHandler();
uiTimeCount_1ms++;
}
时间函数只需要提供精确的ms计时参数就可以了,每次调用该函数的时候返回默认的ms计数值
5、回调函数
/*定义eventCallback函数,broker上有数据到来时会触发该回调函数
该回调函数参考https://www.freertos.org/mqtt/basic-mqtt-example.html
*/
void myEventCallback(
MQTTContext_t * pxMQTTContext,
MQTTPacketInfo_t * pxPacketInfo,
MQTTDeserializedInfo_t * pxDeserializedInfo)
{
/* The MQTT context is not used for this demo. */
( void ) pxMQTTContext;
if( ( pxPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )//接收到的是publish数据
{
prvMQTTProcessIncomingPublish( pxDeserializedInfo->pPublishInfo );
}
else
{
prvMQTTProcessResponse( pxPacketInfo, pxDeserializedInfo->packetIdentifier );
}
}
处理接收到的publish数据就在prvMQTTProcessIncomingPublish函数中去实现
以上就是硬件接口相关的移植;完成移植后就是库函数的使用了:
1、MQTT_Init
This function must be called on a MQTTContext_t before any other function.
这个函数的作用是注册硬件接口相关函数,并初始化相关变量
MQTTContext_t mqttContext;
NetworkContext_t someTransportContext;
TransportInterface_t transport;
MQTTFixedBuffer_t fixedBuffer;
uint8_t buffer[1024];
// Clear context.
memset( ( void * ) &mqttContext, 0x00, sizeof( MQTTContext_t ) );
memset( ( void * ) &someTransportContext, 0x00, sizeof( NetworkContext_t ) );
someTransportContext.Recv_Buf=GPRS_ReceiveBuf;//映射接收数组
someTransportContext.Recv_Buf_Lenth=&usGPRS_RX_Lenth;//映射接收数据长度
transport.pNetworkContext = &someTransportContext;
transport.send = myNetworkSend;//映射到自定义发送函数上
transport.recv = myNetworkRecv;//映射到自定义接收函数上
// Set buffer members.
fixedBuffer.pBuffer = buffer;
fixedBuffer.size = 1024;
status2 = MQTT_Init( &mqttContext, &transport, myGetTimeStampMs, myEventCallback, &fixedBuffer );
2、MQTT_Connect
建立MQTT会话
case MQTT_Step1://CONNECT连接服务端
// Send the connect packet. Use 100 ms as the timeout to wait for the CONNACK packet.
status = MQTT_Connect( &mqttContext, &connectInfo, &willInfo, 2000, &sessionPresent );
if( status == MQTTSuccess )
{
eMQTT_Step=MQTT_Step2;
memset(mqttContext.transportInterface.pNetworkContext->Recv_Buf,0,
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth));
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth)=0;
mqttContext.transportInterface.pNetworkContext->Recv_Buf_Index=0;//接收数组当前位置清零
mqttContext.transportInterface.pNetworkContext->NetworkRecvFlag=0;//获取到网络数据标志
}
else//连接失败
{
vTaskDelay(20);
}
break;
建立会话成功后清除了接收相关buffer及参数
3、MQTT_Subscribe
订阅主题
case MQTT_Step2://订阅主题
// Obtain a new packet id for the subscription.
packetId = MQTT_GetPacketId(&mqttContext);
status = MQTT_Subscribe(&mqttContext, &subscriptionList[0], 1, packetId );
if( status == MQTTSuccess )
{
status = MQTT_ReceiveLoop(&mqttContext, timeoutMs );
if(status == MQTTSuccess)
{
vTaskDelay(10);
eMQTT_Step=MQTT_Step3;
memset(mqttContext.transportInterface.pNetworkContext->Recv_Buf,0,
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth));
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth)=0;
mqttContext.transportInterface.pNetworkContext->Recv_Buf_Index=0;//接收数组当前位置清零
mqttContext.transportInterface.pNetworkContext->NetworkRecvFlag=0;//获取到网络数据标志
}
}
vTaskDelay(10);
break;
4、MQTT_ProcessLoop
调用该函数实时处理循环接收数据,并自定发送心跳包保持链接,当有数据到来时,会触发myEventCallback回调函数
case MQTT_Step3:
//status = MQTT_ReceiveLoop(&mqttContext,timeoutMs);//循环接收数据,需要手动发送心跳保持链接
status = MQTT_ProcessLoop(&mqttContext,timeoutMs);//循环接收数据,自动保持链接
if( status != MQTTSuccess)
{
vTaskDelay(10);
}
else
{
vTaskDelay(10);
memset(mqttContext.transportInterface.pNetworkContext->Recv_Buf,0,
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth));
*(mqttContext.transportInterface.pNetworkContext->Recv_Buf_Lenth)=0;
mqttContext.transportInterface.pNetworkContext->Recv_Buf_Index=0;//接收数组当前位置清零
mqttContext.transportInterface.pNetworkContext->NetworkRecvFlag=0;//获取到网络数据标志
}
break;
发布消息
这个比较简单,参考:
/* This demo uses QoS0. */xMQTTPublishInfo.qos = MQTTQoS0;
xMQTTPublishInfo.retain = false;
xMQTTPublishInfo.pTopicName = pcExampleTopic;
xMQTTPublishInfo.topicNameLength = ( uint16_t ) strlen( pcExampleTopic ); xMQTTPublishInfo.pPayload = mqttexampleMESSAGE;
xMQTTPublishInfo.payloadLength = strlen( mqttexampleMESSAGE );/* Send PUBLISH packet. Packet ID is not used for a QoS0 publish. */
xResult =MQTT_Publish( pxMQTTContext, &xMQTTPublishInfo, 0U ); configASSERT( xResult == MQTTSuccess );