|
- #include "transport.h"
- #include "MQTTFormat.h"
- #include "cJSON.h"
- #include "stdint.h"
- #include "stdio.h"
- #include "string.h"
- #include "MQTTClient.h"
- #include "sys_mqtt.h"
- uint8_t mqtt_sendBuf[MQTT_SENDBUF_LENGTH];
- uint8_t mqtt_recvbuffer[MQTT_RECVBUF_LENTH];
- //数据帧同步变量
- uint8_t mqtt_publishQos1_status;
- uint16_t mqtt_publishQos1_packid;
- uint8_t mqtt_publishQos2_status;
- uint16_t mqtt_publishQos2_packid;
- uint8_t mqtt_subscribe_status;
- uint16_t mqtt_subscribe_packid;
- uint8_t mqtt_unsubscribe_status;
- uint16_t mqtt_unsubscribe_packid;
- uint8_t mqtt_pingreq_status;
- uint16_t mqtt_recvPublishQos1_packid;
- uint16_t mqtt_recvPublishQos2_packid;
- //MQTT 数据包 报文id
- uint16_t mqtt_getPacketId(void)
- {
- static uint16_t id = 0;
- id ++;
- if(id == 0) id = 1;
-
- return id;
- }
- /**
- * @brief 连接到MQTT服务器
- * @param sock: 连接到服务器的 sock 编号
- * @retval 1:连接成功
- * -1:网络相关错误
- * -2: 连接参数配置错误或者服务器拒绝连接
- */
- int mqtt_connectToMqttServer(int sock)
- {
- int len;
- int buflen = sizeof(mqtt_sendBuf);
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- GATEWAY_PARAMS *get;
- get= get_gateway_config_params();
-
- //配置连接参数
- data.clientID.cstring = CLIENT_ID;
- data.username.cstring = (char*)get->username;
- data.password.cstring = (char*)get->passwd;
- data.keepAliveInterval = KEEPLIVE_TIME;
- data.MQTTVersion = MQTT_VERSION;
- data.cleansession = 1;
-
- //组装连接消息
- memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
- len = MQTTSerialize_connect((unsigned char *)mqtt_sendBuf, buflen, &data);
-
- len = transport_send(sock, mqtt_sendBuf, len); //发送连接报文
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
- len = transport_receive(sock); //等待返回报文
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //读取响应报文
- memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
- if(MQTTPacket_read(mqtt_sendBuf, buflen, transport_getdata) == CONNACK)
- {
- //解析响应报文
- unsigned char sessionPresent, connack_rc;
-
- if((MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqtt_sendBuf, buflen) != 1) || (connack_rc != 0))
- {
- transport_close(sock);
- return -2;
- }
- }
- else
- {
- transport_close(sock);
- return -2;
- }
-
- return 1;
- }
- /**
- * @brief 订阅服务器主题
- * @param sock: 连接到服务器的 sock 编号
- * topic: 主题名
- * qos: 订阅消息等级
- * @retval 1: 订阅成功
- * -1:网络相关错误
- * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
- */
- int mqtt_subscribeTopic(int sock, char *topic, int qos)
- {
- int len;
- int buflen;
- int req_qos;
- uint16_t packid;
- MQTTString topicString = MQTTString_initializer;
- int timeout;
-
- //设置参数
- buflen = sizeof(mqtt_sendBuf);
- memset(mqtt_sendBuf, 0, buflen);
- topicString.cstring = topic;
- req_qos = qos;
- packid = mqtt_getPacketId();
-
- //组装报文
- len = MQTTSerialize_subscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString, &req_qos);
- if(len <= 0) return -2;
-
- //设置状态
- mqtt_subscribe_status = 1;
- mqtt_subscribe_packid = packid;
-
- //发送报文
- len = transport_send(sock, mqtt_sendBuf, len);
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //等待接收线程数据响应
- timeout = 0;
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_subscribe_status) && (timeout < 2000));
- if(mqtt_subscribe_status == 1) return -2;
-
- return 1;
- }
- /**
- * @brief 订阅服务器主题时,解析服务器返回的响应
- * @param pbuf: 接收到数据
- buflen: 数据长度
- * @retval 1: 解析数据成功
- * -2:帧类型错误
- * -3: packid错误
- */
- int mqtt_subscribeTopic_SUBACK(uint8_t *pbuf, int buflen)
- {
- uint16_t pkid;
- int count;
- int qos;
-
- if(MQTTDeserialize_suback(&pkid, 1, &count, &qos, pbuf, buflen) == 1)
- {
- if(pkid == mqtt_subscribe_packid)
- {
- mqtt_subscribe_status = 0;
- }
- else
- {
- return -3;
- }
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 取消已订阅的主题
- * @param sock: 连接到服务器的 sock 编号
- * topic: 主题名
- * qos: 订阅消息等级
- * @retval 1: 订阅成功
- * -1:网络相关错误
- * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
- */
- int mqtt_unSubscribeTopic(int sock, char *topic)
- {
- int len;
- int buflen;
- uint16_t packid;
- MQTTString topicString = MQTTString_initializer;
- int timeout;
-
- //设置参数
- buflen = sizeof(mqtt_sendBuf);
- memset(mqtt_sendBuf, 0, buflen);
- topicString.cstring = topic;
- packid = mqtt_getPacketId();
-
- //组装报文
- len = MQTTSerialize_unsubscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString);
- if(len <= 0) return -2;
-
- //设置状态
- mqtt_unsubscribe_status = 1;
- mqtt_unsubscribe_packid = packid;
-
- //发送报文
- len = transport_send(sock, mqtt_sendBuf, len);
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //等待接收线程数据响应
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_unsubscribe_status) && (timeout < 2000));
- if(mqtt_unsubscribe_status == 1) return -2;
-
- return 1;
- }
- /**
- * @brief 取消订阅服务器主题时,解析服务器返回的响应
- * @param pbuf: 接收到数据
- buflen: 数据长度
- * @retval 1: 解析数据成功
- * -2:帧类型错误
- * -3: packid错误
- */
- int mqtt_unSubscribeTopic_UNSUBACK(uint8_t *pbuf, int buflen)
- {
- uint16_t pkid;
-
- if(MQTTDeserialize_unsuback(&pkid, pbuf, buflen) == 1)
- {
- if(pkid == mqtt_unsubscribe_packid) mqtt_unsubscribe_status = 0;
- else return -3;
- }
- else return -2;
-
- return 1;
- }
- ////////////////////////////////////////////////////////////
- /**
- * @brief 向服务器发布消息函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- qos: 消息等级
- dup: 数据重发标志
- id: 数据包id
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:数据组包错误
- */
- int mqtt_publishMassage(int sock, char *topic, uint8_t *msg, int msg_len, int qos, uint8_t dup, uint16_t id)
- {
- uint8_t retained = 0;
- MQTTString topicString = MQTTString_initializer;
- int len;
-
- topicString.cstring = topic;
- memset(mqtt_sendBuf, 0, MQTT_SENDBUF_LENGTH);
- len = MQTTSerialize_publish(mqtt_sendBuf, MQTT_SENDBUF_LENGTH, dup, qos, retained, id, topicString, msg, msg_len);
- if(len <=0) return -2;
-
- if(transport_send(sock, mqtt_sendBuf, len) <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS0 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:数据组包错误
- */
- int mqtt_publishMessage_qos0(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- return mqtt_publishMassage(sock, topic, msg, msg_len, 0, 0, 0);
- }
- /**
- * @brief 向服务器发布消息 QOS1 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:未能收到服务器的响应
- */
- int mqtt_publishMessage_qos1(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- int rc;
- uint16_t pkid;
- uint8_t dup;
- int timeout = 0;
- int cnt;
-
- dup = 0;
- cnt = 0;
-
- pkid = mqtt_getPacketId();
- __QOS1_PACKET_RESEND:
-
- mqtt_publishQos1_status = 1;
- mqtt_publishQos1_packid = pkid;
- rc = mqtt_publishMassage(sock, topic, msg, msg_len, 1, dup, pkid);
- if(rc <= 0) return rc;
-
- //等待接收线程接收到 平台响应包
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_publishQos1_status) && (timeout < 2000));
-
- if(mqtt_publishQos1_status) //消息发送后,没有得到服务器的回应,重新发送本条消息
- {
- dup = 1; //发送失败,设置重复消息
- cnt += 1;
- if(cnt >= 3) return -2; //重发3次均未成功,不再重发
- goto __QOS1_PACKET_RESEND;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布QOS1消息时,服务器返回的响应
- * @param pbuf: 服务器返回的数据
- buflen: 数据长度
- * @retval 1: 消息解析成功
- * -2:消息解析失败或者 packid 错误
- */
- int mqtt_publishMessage_qos1_PUBACK(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos1_packid == packetid)
- {
- mqtt_publishQos1_status = 0;
- MQTT_PRINTF("receive platform qos1 response \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- //////////////////////////////////////////////////////////////////向服务器发布消息////////////////////////////////////////////////////////////////////////////////
- /**
- * @brief 向服务器发布消息 QOS2 时,释放消息
- * @param sock: sock 编号
- id: 消息报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_publishMessage_qos2_PUBREL(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubrel(buf, sizeof(buf), 0, id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:未能收到服务器的响应
- */
- int mqtt_publishMessage_qos2(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- int rc;
- uint16_t pkid;
- uint8_t dup;
- int timeout = 0;
- int cnt;
-
- dup = 0;
- cnt = 0;
-
- pkid = mqtt_getPacketId();
- __QOS2_PACKET_RESEND:
- //消息发送
-
- cnt ++;
- mqtt_publishQos2_status = 1;
- mqtt_publishQos2_packid = pkid;
- rc = mqtt_publishMassage(sock, topic, msg, msg_len, 2, dup, pkid);
- if(rc <= 0) return rc;
-
- //等待平台响应
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_publishQos2_status != 2));
- if(mqtt_publishQos2_status != 2)
- {
- dup = 1;
- if(cnt < 3) goto __QOS2_PACKET_RESEND;
- else return -2;
- }
-
- //发送 PUBREL
- rc = mqtt_publishMessage_qos2_PUBREL(sock, mqtt_publishQos2_packid);
- if(rc <= 0) return rc;
-
- mqtt_publishQos2_status = 3;
-
- //等待平台响应
- timeout = 0;
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_publishQos2_status != 4));
- if(mqtt_publishQos2_status != 4)
- {
- if(cnt < 3)
- {
- // dup = 1;
- goto __QOS2_PACKET_RESEND;
- }
- else return -2;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 时,服务器返回消息
- * @param pbuf: 服务器返回的数据
- buflen: 数据的长度
- * @retval 1: 消息帧发送成功
- * -2:帧类型错误或者报文标识符错误
- */
- int mqtt_publishMessage_qos2_PUBREC(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos2_packid == packetid)
- {
- mqtt_publishQos2_status = 2;
- MQTT_PRINTF("receive platform qos2 PUBREC \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 时,服务器返回消息
- * @param pbuf: 服务器返回的数据
- buflen: 数据的长度
- * @retval 1: 消息帧发送成功
- * -2:帧类型错误或者报文标识符错误
- */
- int mqtt_publishMessage_qos2_PUBCOMP(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos2_packid == packetid)
- {
- mqtt_publishQos2_status = 4;
- MQTT_PRINTF("receive platform qos2 PUBCOMP \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- ////////////////////////////////////////////////////////////////接收服务器下发的消息/////////////////////////////////////////////////////
- /**
- * @brief 对服务器做出应答
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos1_PUBACK(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_puback(buf, sizeof(buf), id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- MQTT_PRINTF("send to paltform qos1 PUBACK \r\n");
- return 1;
- }
- /**
- * @brief 对服务器做出应答
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos2_PUBREC(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubrec(buf, sizeof(buf), 0, id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- MQTT_PRINTF("send to platform qos2 PUBREC \r\n");
- return 1;
- }
- /**
- * @brief 对服务器做出PUBCOMP应答
- * @param sock: sock编号
- pbuf: 接收到服务器的数据缓存
- buflen: 数据长度
- * @retval 1: 消息帧发送成功
- * -2: 帧类型错误
- */
- int mqtt_recvPublishMessage_qos2_PUBREL(int sock, uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
- int msg;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- MQTT_PRINTF("receive from platform qos2 PUBREL \r\n");
-
- mqtt_recvPublishQos2_packid = packetid;
- msg = MBOX_MQTT_QOS2PUBCOMP;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 发送COMP信号
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos2_PUBCOMP(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubcomp(buf, sizeof(buf), id);
- rc = transport_send(sock, buf, len);
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- MQTT_PRINTF("send to platform qos2 PUBCOMP \r\n");
-
- return 1;
- }
- /************************************************************
- * @brief 发送ping心跳包
- * @param sock: sock编号
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2: 帧组包错误或者未收到服务器响应
- *************************************************************/
- int mqtt_pingReq(int sock)
- {
- int len;
- int timeout = 0;
-
- //数据组包
- len = MQTTSerialize_pingreq(mqtt_sendBuf, MQTT_SENDBUF_LENGTH);
- if(len <= 0) return -2;
-
- mqtt_pingreq_status = 1;
- if(transport_send(sock, mqtt_sendBuf, len) <= 0) return -1;
-
- //等待接收线程 接收到响应包
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_pingreq_status));
-
- if(mqtt_pingreq_status) return -2;
-
- return 1;
- }
- /**
- * @brief 解析心跳响应包
- * @retval 1: 消息解析成功
- */
- int mqtt_pingResponse(void)
- {
- mqtt_pingreq_status = 0;
- MQTT_PRINTF("receive a ping response \r\n");
-
- return 1;
- }
- /**
- * @brief 断开与服务器的MQTT连接和TCP连接
- * @param sock: sock编号
- */
- void mqtt_disconnectServer(int sock)
- {
- int len;
- uint8_t buf[2];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_disconnect(buf, sizeof(buf));
- transport_send(sock, buf, len);
-
- transport_close(sock);
- }
|