#include "transport.h" #include "MQTTFormat.h" #include "cJSON.h" #include "stdint.h" #include "stdio.h" #include "string.h" #include "MQTTClient.h" #include "sys_mqtt.h" uint8_t mqtt_sendBuf[MQTT_SENDBUF_LENGTH]; uint8_t mqtt_recvbuffer[MQTT_RECVBUF_LENTH]; //数据帧同步变量 uint8_t mqtt_publishQos1_status; uint16_t mqtt_publishQos1_packid; uint8_t mqtt_publishQos2_status; uint16_t mqtt_publishQos2_packid; uint8_t mqtt_subscribe_status; uint16_t mqtt_subscribe_packid; uint8_t mqtt_unsubscribe_status; uint16_t mqtt_unsubscribe_packid; uint8_t mqtt_pingreq_status; uint16_t mqtt_recvPublishQos1_packid; uint16_t mqtt_recvPublishQos2_packid; //MQTT 数据包 报文id uint16_t mqtt_getPacketId(void) { static uint16_t id = 0; id ++; if(id == 0) id = 1; return id; } /** * @brief 连接到MQTT服务器 * @param sock: 连接到服务器的 sock 编号 * @retval 1:连接成功 * -1:网络相关错误 * -2: 连接参数配置错误或者服务器拒绝连接 */ int mqtt_connectToMqttServer(int sock) { int len; int buflen = sizeof(mqtt_sendBuf); MQTTPacket_connectData data = MQTTPacket_connectData_initializer; //配置连接参数 data.clientID.cstring = CLIENT_ID; data.username.cstring = USER_NAME; data.password.cstring = PASSWORD; data.keepAliveInterval = KEEPLIVE_TIME; data.MQTTVersion = MQTT_VERSION; data.cleansession = 1; //组装连接消息 memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf)); len = MQTTSerialize_connect((unsigned char *)mqtt_sendBuf, buflen, &data); len = transport_send(sock, mqtt_sendBuf, len); //发送连接报文 if(len <= 0) { transport_close(sock); return -1; } len = transport_receive(sock); //等待返回报文 if(len <= 0) { transport_close(sock); return -1; } //读取响应报文 memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf)); if(MQTTPacket_read(mqtt_sendBuf, buflen, transport_getdata) == CONNACK) { //解析响应报文 unsigned char sessionPresent, connack_rc; if((MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqtt_sendBuf, buflen) != 1) || (connack_rc != 0)) { transport_close(sock); return -2; } } else { transport_close(sock); return -2; } return 1; } /** * @brief 订阅服务器主题 * @param sock: 连接到服务器的 sock 编号 * topic: 主题名 * qos: 订阅消息等级 * @retval 1: 订阅成功 * -1:网络相关错误 * -2: 构建订阅数据错误或者未能接收到服务器的正确应答 */ int mqtt_subscribeTopic(int sock, char *topic, int qos) { int len; int buflen; int req_qos; uint16_t packid; MQTTString topicString = MQTTString_initializer; int timeout; //设置参数 buflen = sizeof(mqtt_sendBuf); memset(mqtt_sendBuf, 0, buflen); topicString.cstring = topic; req_qos = qos; packid = mqtt_getPacketId(); //组装报文 len = MQTTSerialize_subscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString, &req_qos); if(len <= 0) return -2; //设置状态 mqtt_subscribe_status = 1; mqtt_subscribe_packid = packid; //发送报文 len = transport_send(sock, mqtt_sendBuf, len); if(len <= 0) { transport_close(sock); return -1; } //等待接收线程数据响应 timeout = 0; do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((mqtt_subscribe_status) && (timeout < 2000)); if(mqtt_subscribe_status == 1) return -2; return 1; } /** * @brief 订阅服务器主题时,解析服务器返回的响应 * @param pbuf: 接收到数据 buflen: 数据长度 * @retval 1: 解析数据成功 * -2:帧类型错误 * -3: packid错误 */ int mqtt_subscribeTopic_SUBACK(uint8_t *pbuf, int buflen) { uint16_t pkid; int count; int qos; if(MQTTDeserialize_suback(&pkid, 1, &count, &qos, pbuf, buflen) == 1) { if(pkid == mqtt_subscribe_packid) { mqtt_subscribe_status = 0; } else { return -3; } } else return -2; return 1; } /** * @brief 取消已订阅的主题 * @param sock: 连接到服务器的 sock 编号 * topic: 主题名 * qos: 订阅消息等级 * @retval 1: 订阅成功 * -1:网络相关错误 * -2: 构建订阅数据错误或者未能接收到服务器的正确应答 */ int mqtt_unSubscribeTopic(int sock, char *topic) { int len; int buflen; uint16_t packid; MQTTString topicString = MQTTString_initializer; int timeout; //设置参数 buflen = sizeof(mqtt_sendBuf); memset(mqtt_sendBuf, 0, buflen); topicString.cstring = topic; packid = mqtt_getPacketId(); //组装报文 len = MQTTSerialize_unsubscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString); if(len <= 0) return -2; //设置状态 mqtt_unsubscribe_status = 1; mqtt_unsubscribe_packid = packid; //发送报文 len = transport_send(sock, mqtt_sendBuf, len); if(len <= 0) { transport_close(sock); return -1; } //等待接收线程数据响应 do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((mqtt_unsubscribe_status) && (timeout < 2000)); if(mqtt_unsubscribe_status == 1) return -2; return 1; } /** * @brief 取消订阅服务器主题时,解析服务器返回的响应 * @param pbuf: 接收到数据 buflen: 数据长度 * @retval 1: 解析数据成功 * -2:帧类型错误 * -3: packid错误 */ int mqtt_unSubscribeTopic_UNSUBACK(uint8_t *pbuf, int buflen) { uint16_t pkid; if(MQTTDeserialize_unsuback(&pkid, pbuf, buflen) == 1) { if(pkid == mqtt_unsubscribe_packid) mqtt_unsubscribe_status = 0; else return -3; } else return -2; return 1; } //////////////////////////////////////////////////////////// /** * @brief 向服务器发布消息函数 * @param sock: sock 编号 topic: 消息主题 msg: 消息内容 msg_len: 消息长度 qos: 消息等级 dup: 数据重发标志 id: 数据包id * @retval 1: 消息帧发送成功 * -1:网络错误 * -2:数据组包错误 */ int mqtt_publishMassage(int sock, char *topic, uint8_t *msg, int msg_len, int qos, uint8_t dup, uint16_t id) { uint8_t retained = 0; MQTTString topicString = MQTTString_initializer; int len; topicString.cstring = topic; memset(mqtt_sendBuf, 0, MQTT_SENDBUF_LENGTH); len = MQTTSerialize_publish(mqtt_sendBuf, MQTT_SENDBUF_LENGTH, dup, qos, retained, id, topicString, msg, msg_len); if(len <=0) return -2; if(transport_send(sock, mqtt_sendBuf, len) <= 0) { transport_close(sock); return -1; } return 1; } /** * @brief 向服务器发布消息 QOS0 函数 * @param sock: sock 编号 topic: 消息主题 msg: 消息内容 msg_len: 消息长度 * @retval 1: 消息帧发送成功 * -1:网络错误 * -2:数据组包错误 */ int mqtt_publishMessage_qos0(int sock, char *topic, uint8_t *msg, int msg_len) { return mqtt_publishMassage(sock, topic, msg, msg_len, 0, 0, 0); } /** * @brief 向服务器发布消息 QOS1 函数 * @param sock: sock 编号 topic: 消息主题 msg: 消息内容 msg_len: 消息长度 * @retval 1: 消息帧发送成功 * -1:网络错误 * -2:未能收到服务器的响应 */ int mqtt_publishMessage_qos1(int sock, char *topic, uint8_t *msg, int msg_len) { int rc; uint16_t pkid; uint8_t dup; int timeout = 0; int cnt; dup = 0; cnt = 0; pkid = mqtt_getPacketId(); __QOS1_PACKET_RESEND: mqtt_publishQos1_status = 1; mqtt_publishQos1_packid = pkid; rc = mqtt_publishMassage(sock, topic, msg, msg_len, 1, dup, pkid); if(rc <= 0) return rc; //等待接收线程接收到 平台响应包 do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((mqtt_publishQos1_status) && (timeout < 2000)); if(mqtt_publishQos1_status) //消息发送后,没有得到服务器的回应,重新发送本条消息 { dup = 1; //发送失败,设置重复消息 cnt += 1; if(cnt >= 3) return -2; //重发3次均未成功,不再重发 goto __QOS1_PACKET_RESEND; } return 1; } /** * @brief 向服务器发布QOS1消息时,服务器返回的响应 * @param pbuf: 服务器返回的数据 buflen: 数据长度 * @retval 1: 消息解析成功 * -2:消息解析失败或者 packid 错误 */ int mqtt_publishMessage_qos1_PUBACK(uint8_t *pbuf, int buflen) { uint8_t packettype, dup; uint16_t packetid; if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1) { if(mqtt_publishQos1_packid == packetid) { mqtt_publishQos1_status = 0; MQTT_PRINTF("receive platform qos1 response \r\n"); } else return -2; } else return -2; return 1; } //////////////////////////////////////////////////////////////////向服务器发布消息//////////////////////////////////////////////////////////////////////////////// /** * @brief 向服务器发布消息 QOS2 时,释放消息 * @param sock: sock 编号 id: 消息报文标识符 * @retval 1: 消息帧发送成功 * -1:网络错误 */ int mqtt_publishMessage_qos2_PUBREL(int sock, uint16_t id) { int len, rc; uint8_t buf[8]; memset(buf, 0, sizeof(buf)); len = MQTTSerialize_pubrel(buf, sizeof(buf), 0, id); rc = transport_send(sock, buf, len); if(rc <= 0) { transport_close(sock); return -1; } return 1; } /** * @brief 向服务器发布消息 QOS2 函数 * @param sock: sock 编号 topic: 消息主题 msg: 消息内容 msg_len: 消息长度 * @retval 1: 消息帧发送成功 * -1:网络错误 * -2:未能收到服务器的响应 */ int mqtt_publishMessage_qos2(int sock, char *topic, uint8_t *msg, int msg_len) { int rc; uint16_t pkid; uint8_t dup; int timeout = 0; int cnt; dup = 0; cnt = 0; pkid = mqtt_getPacketId(); __QOS2_PACKET_RESEND: //消息发送 cnt ++; mqtt_publishQos2_status = 1; mqtt_publishQos2_packid = pkid; rc = mqtt_publishMassage(sock, topic, msg, msg_len, 2, dup, pkid); if(rc <= 0) return rc; //等待平台响应 do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((timeout < 2000) && (mqtt_publishQos2_status != 2)); if(mqtt_publishQos2_status != 2) { dup = 1; if(cnt < 3) goto __QOS2_PACKET_RESEND; else return -2; } //发送 PUBREL rc = mqtt_publishMessage_qos2_PUBREL(sock, mqtt_publishQos2_packid); if(rc <= 0) return rc; mqtt_publishQos2_status = 3; //等待平台响应 timeout = 0; do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((timeout < 2000) && (mqtt_publishQos2_status != 4)); if(mqtt_publishQos2_status != 4) { if(cnt < 3) { // dup = 1; goto __QOS2_PACKET_RESEND; } else return -2; } return 1; } /** * @brief 向服务器发布消息 QOS2 时,服务器返回消息 * @param pbuf: 服务器返回的数据 buflen: 数据的长度 * @retval 1: 消息帧发送成功 * -2:帧类型错误或者报文标识符错误 */ int mqtt_publishMessage_qos2_PUBREC(uint8_t *pbuf, int buflen) { uint8_t packettype, dup; uint16_t packetid; if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1) { if(mqtt_publishQos2_packid == packetid) { mqtt_publishQos2_status = 2; MQTT_PRINTF("receive platform qos2 PUBREC \r\n"); } else return -2; } else return -2; return 1; } /** * @brief 向服务器发布消息 QOS2 时,服务器返回消息 * @param pbuf: 服务器返回的数据 buflen: 数据的长度 * @retval 1: 消息帧发送成功 * -2:帧类型错误或者报文标识符错误 */ int mqtt_publishMessage_qos2_PUBCOMP(uint8_t *pbuf, int buflen) { uint8_t packettype, dup; uint16_t packetid; if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1) { if(mqtt_publishQos2_packid == packetid) { mqtt_publishQos2_status = 4; MQTT_PRINTF("receive platform qos2 PUBCOMP \r\n"); } else return -2; } else return -2; return 1; } ////////////////////////////////////////////////////////////////接收服务器下发的消息///////////////////////////////////////////////////// /** * @brief 对服务器做出应答 * @param sock: sock编号 id: 服务器发布消息时的报文标识符 * @retval 1: 消息帧发送成功 * -1:网络错误 */ int mqtt_recvPublishMessage_qos1_PUBACK(int sock, uint16_t id) { int len, rc; uint8_t buf[8]; memset(buf, 0, sizeof(buf)); len = MQTTSerialize_puback(buf, sizeof(buf), id); rc = transport_send(sock, buf, len); if(rc <= 0) { transport_close(sock); return -1; } MQTT_PRINTF("send to paltform qos1 PUBACK \r\n"); return 1; } /** * @brief 对服务器做出应答 * @param sock: sock编号 id: 服务器发布消息时的报文标识符 * @retval 1: 消息帧发送成功 * -1:网络错误 */ int mqtt_recvPublishMessage_qos2_PUBREC(int sock, uint16_t id) { int len, rc; uint8_t buf[8]; memset(buf, 0, sizeof(buf)); len = MQTTSerialize_pubrec(buf, sizeof(buf), 0, id); rc = transport_send(sock, buf, len); if(rc <= 0) { transport_close(sock); return -1; } MQTT_PRINTF("send to platform qos2 PUBREC \r\n"); return 1; } /** * @brief 对服务器做出PUBCOMP应答 * @param sock: sock编号 pbuf: 接收到服务器的数据缓存 buflen: 数据长度 * @retval 1: 消息帧发送成功 * -2: 帧类型错误 */ int mqtt_recvPublishMessage_qos2_PUBREL(int sock, uint8_t *pbuf, int buflen) { uint8_t packettype, dup; uint16_t packetid; int msg; if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1) { MQTT_PRINTF("receive from platform qos2 PUBREL \r\n"); mqtt_recvPublishQos2_packid = packetid; msg = MBOX_MQTT_QOS2PUBCOMP; OSMboxPost(mqtt_sendMseeageMbox, &msg); } else return -2; return 1; } /** * @brief 发送COMP信号 * @param sock: sock编号 id: 服务器发布消息时的报文标识符 * @retval 1: 消息帧发送成功 * -1:网络错误 */ int mqtt_recvPublishMessage_qos2_PUBCOMP(int sock, uint16_t id) { int len, rc; uint8_t buf[8]; memset(buf, 0, sizeof(buf)); len = MQTTSerialize_pubcomp(buf, sizeof(buf), id); rc = transport_send(sock, buf, len); if(rc <= 0) { transport_close(sock); return -1; } MQTT_PRINTF("send to platform qos2 PUBCOMP \r\n"); return 1; } /************************************************************ * @brief 发送ping心跳包 * @param sock: sock编号 * @retval 1: 消息帧发送成功 * -1:网络错误 * -2: 帧组包错误或者未收到服务器响应 *************************************************************/ int mqtt_pingReq(int sock) { int len; int timeout = 0; //数据组包 len = MQTTSerialize_pingreq(mqtt_sendBuf, MQTT_SENDBUF_LENGTH); if(len <= 0) return -2; mqtt_pingreq_status = 1; if(transport_send(sock, mqtt_sendBuf, len) <= 0) return -1; //等待接收线程 接收到响应包 do { OSTimeDly(10/(1000/OS_TICKS_PER_SEC)); timeout += 10; }while((timeout < 2000) && (mqtt_pingreq_status)); if(mqtt_pingreq_status) return -2; return 1; } /** * @brief 解析心跳响应包 * @retval 1: 消息解析成功 */ int mqtt_pingResponse(void) { mqtt_pingreq_status = 0; MQTT_PRINTF("receive a ping response \r\n"); return 1; } /** * @brief 断开与服务器的MQTT连接和TCP连接 * @param sock: sock编号 */ void mqtt_disconnectServer(int sock) { int len; uint8_t buf[2]; memset(buf, 0, sizeof(buf)); len = MQTTSerialize_disconnect(buf, sizeof(buf)); transport_send(sock, buf, len); transport_close(sock); }