123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704 |
- #include "transport.h"
- #include "MQTTFormat.h"
- #include "cJSON.h"
- #include "stdint.h"
- #include "stdio.h"
- #include "string.h"
- #include "MQTTClient.h"
- #include "sys_mqtt.h"
- uint8_t mqtt_sendBuf[MQTT_SENDBUF_LENGTH];
- uint8_t mqtt_recvbuffer[MQTT_RECVBUF_LENTH];
- //数据帧同步变量
- uint8_t mqtt_publishQos1_status;
- uint16_t mqtt_publishQos1_packid;
- uint8_t mqtt_publishQos2_status;
- uint16_t mqtt_publishQos2_packid;
- uint8_t mqtt_subscribe_status;
- uint16_t mqtt_subscribe_packid;
- uint8_t mqtt_unsubscribe_status;
- uint16_t mqtt_unsubscribe_packid;
- uint8_t mqtt_pingreq_status;
- uint16_t mqtt_recvPublishQos1_packid;
- uint16_t mqtt_recvPublishQos2_packid;
- //MQTT 数据包 报文id
- uint16_t mqtt_getPacketId(void)
- {
- static uint16_t id = 0;
- id ++;
- if(id == 0) id = 1;
-
- return id;
- }
- /**
- * @brief 连接到MQTT服务器
- * @param sock: 连接到服务器的 sock 编号
- * @retval 1:连接成功
- * -1:网络相关错误
- * -2: 连接参数配置错误或者服务器拒绝连接
- */
- int mqtt_connectToMqttServer(int sock)
- {
- int len;
- int buflen = sizeof(mqtt_sendBuf);
- MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
- GATEWAY_PARAMS *get;
- get= get_gateway_config_params();
-
- //配置连接参数
- data.clientID.cstring = CLIENT_ID;
- data.username.cstring = (char*)get->username;
- data.password.cstring = (char*)get->passwd;
- data.keepAliveInterval = KEEPLIVE_TIME;
- data.MQTTVersion = MQTT_VERSION;
- data.cleansession = 1;
-
- //组装连接消息
- memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
- len = MQTTSerialize_connect((unsigned char *)mqtt_sendBuf, buflen, &data);
-
- len = transport_send(sock, mqtt_sendBuf, len); //发送连接报文
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
- len = transport_receive(sock); //等待返回报文
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //读取响应报文
- memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
- if(MQTTPacket_read(mqtt_sendBuf, buflen, transport_getdata) == CONNACK)
- {
- //解析响应报文
- unsigned char sessionPresent, connack_rc;
-
- if((MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqtt_sendBuf, buflen) != 1) || (connack_rc != 0))
- {
- transport_close(sock);
- return -2;
- }
- }
- else
- {
- transport_close(sock);
- return -2;
- }
-
- return 1;
- }
- /**
- * @brief 订阅服务器主题
- * @param sock: 连接到服务器的 sock 编号
- * topic: 主题名
- * qos: 订阅消息等级
- * @retval 1: 订阅成功
- * -1:网络相关错误
- * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
- */
- int mqtt_subscribeTopic(int sock, char *topic, int qos)
- {
- int len;
- int buflen;
- int req_qos;
- uint16_t packid;
- MQTTString topicString = MQTTString_initializer;
- int timeout;
-
- //设置参数
- buflen = sizeof(mqtt_sendBuf);
- memset(mqtt_sendBuf, 0, buflen);
- topicString.cstring = topic;
- req_qos = qos;
- packid = mqtt_getPacketId();
-
- //组装报文
- len = MQTTSerialize_subscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString, &req_qos);
- if(len <= 0) return -2;
-
- //设置状态
- mqtt_subscribe_status = 1;
- mqtt_subscribe_packid = packid;
-
- //发送报文
- len = transport_send(sock, mqtt_sendBuf, len);
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //等待接收线程数据响应
- timeout = 0;
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_subscribe_status) && (timeout < 2000));
- if(mqtt_subscribe_status == 1) return -2;
-
- return 1;
- }
- /**
- * @brief 订阅服务器主题时,解析服务器返回的响应
- * @param pbuf: 接收到数据
- buflen: 数据长度
- * @retval 1: 解析数据成功
- * -2:帧类型错误
- * -3: packid错误
- */
- int mqtt_subscribeTopic_SUBACK(uint8_t *pbuf, int buflen)
- {
- uint16_t pkid;
- int count;
- int qos;
-
- if(MQTTDeserialize_suback(&pkid, 1, &count, &qos, pbuf, buflen) == 1)
- {
- if(pkid == mqtt_subscribe_packid)
- {
- mqtt_subscribe_status = 0;
- }
- else
- {
- return -3;
- }
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 取消已订阅的主题
- * @param sock: 连接到服务器的 sock 编号
- * topic: 主题名
- * qos: 订阅消息等级
- * @retval 1: 订阅成功
- * -1:网络相关错误
- * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
- */
- int mqtt_unSubscribeTopic(int sock, char *topic)
- {
- int len;
- int buflen;
- uint16_t packid;
- MQTTString topicString = MQTTString_initializer;
- int timeout;
-
- //设置参数
- buflen = sizeof(mqtt_sendBuf);
- memset(mqtt_sendBuf, 0, buflen);
- topicString.cstring = topic;
- packid = mqtt_getPacketId();
-
- //组装报文
- len = MQTTSerialize_unsubscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString);
- if(len <= 0) return -2;
-
- //设置状态
- mqtt_unsubscribe_status = 1;
- mqtt_unsubscribe_packid = packid;
-
- //发送报文
- len = transport_send(sock, mqtt_sendBuf, len);
- if(len <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- //等待接收线程数据响应
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_unsubscribe_status) && (timeout < 2000));
- if(mqtt_unsubscribe_status == 1) return -2;
-
- return 1;
- }
- /**
- * @brief 取消订阅服务器主题时,解析服务器返回的响应
- * @param pbuf: 接收到数据
- buflen: 数据长度
- * @retval 1: 解析数据成功
- * -2:帧类型错误
- * -3: packid错误
- */
- int mqtt_unSubscribeTopic_UNSUBACK(uint8_t *pbuf, int buflen)
- {
- uint16_t pkid;
-
- if(MQTTDeserialize_unsuback(&pkid, pbuf, buflen) == 1)
- {
- if(pkid == mqtt_unsubscribe_packid) mqtt_unsubscribe_status = 0;
- else return -3;
- }
- else return -2;
-
- return 1;
- }
- ////////////////////////////////////////////////////////////
- /**
- * @brief 向服务器发布消息函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- qos: 消息等级
- dup: 数据重发标志
- id: 数据包id
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:数据组包错误
- */
- int mqtt_publishMassage(int sock, char *topic, uint8_t *msg, int msg_len, int qos, uint8_t dup, uint16_t id)
- {
- uint8_t retained = 0;
- MQTTString topicString = MQTTString_initializer;
- int len;
-
- topicString.cstring = topic;
- memset(mqtt_sendBuf, 0, MQTT_SENDBUF_LENGTH);
- len = MQTTSerialize_publish(mqtt_sendBuf, MQTT_SENDBUF_LENGTH, dup, qos, retained, id, topicString, msg, msg_len);
- if(len <=0) return -2;
-
- if(transport_send(sock, mqtt_sendBuf, len) <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS0 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:数据组包错误
- */
- int mqtt_publishMessage_qos0(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- return mqtt_publishMassage(sock, topic, msg, msg_len, 0, 0, 0);
- }
- /**
- * @brief 向服务器发布消息 QOS1 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:未能收到服务器的响应
- */
- int mqtt_publishMessage_qos1(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- int rc;
- uint16_t pkid;
- uint8_t dup;
- int timeout = 0;
- int cnt;
-
- dup = 0;
- cnt = 0;
-
- pkid = mqtt_getPacketId();
- __QOS1_PACKET_RESEND:
-
- mqtt_publishQos1_status = 1;
- mqtt_publishQos1_packid = pkid;
- rc = mqtt_publishMassage(sock, topic, msg, msg_len, 1, dup, pkid);
- if(rc <= 0) return rc;
-
- //等待接收线程接收到 平台响应包
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((mqtt_publishQos1_status) && (timeout < 2000));
-
- if(mqtt_publishQos1_status) //消息发送后,没有得到服务器的回应,重新发送本条消息
- {
- dup = 1; //发送失败,设置重复消息
- cnt += 1;
- if(cnt >= 3) return -2; //重发3次均未成功,不再重发
- goto __QOS1_PACKET_RESEND;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布QOS1消息时,服务器返回的响应
- * @param pbuf: 服务器返回的数据
- buflen: 数据长度
- * @retval 1: 消息解析成功
- * -2:消息解析失败或者 packid 错误
- */
- int mqtt_publishMessage_qos1_PUBACK(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos1_packid == packetid)
- {
- mqtt_publishQos1_status = 0;
- MQTT_PRINTF("receive platform qos1 response \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- //////////////////////////////////////////////////////////////////向服务器发布消息////////////////////////////////////////////////////////////////////////////////
- /**
- * @brief 向服务器发布消息 QOS2 时,释放消息
- * @param sock: sock 编号
- id: 消息报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_publishMessage_qos2_PUBREL(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubrel(buf, sizeof(buf), 0, id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 函数
- * @param sock: sock 编号
- topic: 消息主题
- msg: 消息内容
- msg_len: 消息长度
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2:未能收到服务器的响应
- */
- int mqtt_publishMessage_qos2(int sock, char *topic, uint8_t *msg, int msg_len)
- {
- int rc;
- uint16_t pkid;
- uint8_t dup;
- int timeout = 0;
- int cnt;
-
- dup = 0;
- cnt = 0;
-
- pkid = mqtt_getPacketId();
- __QOS2_PACKET_RESEND:
- //消息发送
-
- cnt ++;
- mqtt_publishQos2_status = 1;
- mqtt_publishQos2_packid = pkid;
- rc = mqtt_publishMassage(sock, topic, msg, msg_len, 2, dup, pkid);
- if(rc <= 0) return rc;
-
- //等待平台响应
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_publishQos2_status != 2));
- if(mqtt_publishQos2_status != 2)
- {
- dup = 1;
- if(cnt < 3) goto __QOS2_PACKET_RESEND;
- else return -2;
- }
-
- //发送 PUBREL
- rc = mqtt_publishMessage_qos2_PUBREL(sock, mqtt_publishQos2_packid);
- if(rc <= 0) return rc;
-
- mqtt_publishQos2_status = 3;
-
- //等待平台响应
- timeout = 0;
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_publishQos2_status != 4));
- if(mqtt_publishQos2_status != 4)
- {
- if(cnt < 3)
- {
- // dup = 1;
- goto __QOS2_PACKET_RESEND;
- }
- else return -2;
- }
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 时,服务器返回消息
- * @param pbuf: 服务器返回的数据
- buflen: 数据的长度
- * @retval 1: 消息帧发送成功
- * -2:帧类型错误或者报文标识符错误
- */
- int mqtt_publishMessage_qos2_PUBREC(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos2_packid == packetid)
- {
- mqtt_publishQos2_status = 2;
- MQTT_PRINTF("receive platform qos2 PUBREC \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 向服务器发布消息 QOS2 时,服务器返回消息
- * @param pbuf: 服务器返回的数据
- buflen: 数据的长度
- * @retval 1: 消息帧发送成功
- * -2:帧类型错误或者报文标识符错误
- */
- int mqtt_publishMessage_qos2_PUBCOMP(uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- if(mqtt_publishQos2_packid == packetid)
- {
- mqtt_publishQos2_status = 4;
- MQTT_PRINTF("receive platform qos2 PUBCOMP \r\n");
- }
- else return -2;
- }
- else return -2;
-
- return 1;
- }
- ////////////////////////////////////////////////////////////////接收服务器下发的消息/////////////////////////////////////////////////////
- /**
- * @brief 对服务器做出应答
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos1_PUBACK(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_puback(buf, sizeof(buf), id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- MQTT_PRINTF("send to paltform qos1 PUBACK \r\n");
- return 1;
- }
- /**
- * @brief 对服务器做出应答
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos2_PUBREC(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubrec(buf, sizeof(buf), 0, id);
- rc = transport_send(sock, buf, len);
-
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
- MQTT_PRINTF("send to platform qos2 PUBREC \r\n");
- return 1;
- }
- /**
- * @brief 对服务器做出PUBCOMP应答
- * @param sock: sock编号
- pbuf: 接收到服务器的数据缓存
- buflen: 数据长度
- * @retval 1: 消息帧发送成功
- * -2: 帧类型错误
- */
- int mqtt_recvPublishMessage_qos2_PUBREL(int sock, uint8_t *pbuf, int buflen)
- {
- uint8_t packettype, dup;
- uint16_t packetid;
- int msg;
-
- if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
- {
- MQTT_PRINTF("receive from platform qos2 PUBREL \r\n");
-
- mqtt_recvPublishQos2_packid = packetid;
- msg = MBOX_MQTT_QOS2PUBCOMP;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
- else return -2;
-
- return 1;
- }
- /**
- * @brief 发送COMP信号
- * @param sock: sock编号
- id: 服务器发布消息时的报文标识符
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- */
- int mqtt_recvPublishMessage_qos2_PUBCOMP(int sock, uint16_t id)
- {
- int len, rc;
- uint8_t buf[8];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_pubcomp(buf, sizeof(buf), id);
- rc = transport_send(sock, buf, len);
- if(rc <= 0)
- {
- transport_close(sock);
- return -1;
- }
-
- MQTT_PRINTF("send to platform qos2 PUBCOMP \r\n");
-
- return 1;
- }
- /************************************************************
- * @brief 发送ping心跳包
- * @param sock: sock编号
- * @retval 1: 消息帧发送成功
- * -1:网络错误
- * -2: 帧组包错误或者未收到服务器响应
- *************************************************************/
- int mqtt_pingReq(int sock)
- {
- int len;
- int timeout = 0;
-
- //数据组包
- len = MQTTSerialize_pingreq(mqtt_sendBuf, MQTT_SENDBUF_LENGTH);
- if(len <= 0) return -2;
-
- mqtt_pingreq_status = 1;
- if(transport_send(sock, mqtt_sendBuf, len) <= 0) return -1;
-
- //等待接收线程 接收到响应包
- do
- {
- OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
- timeout += 10;
- }while((timeout < 2000) && (mqtt_pingreq_status));
-
- if(mqtt_pingreq_status) return -2;
-
- return 1;
- }
- /**
- * @brief 解析心跳响应包
- * @retval 1: 消息解析成功
- */
- int mqtt_pingResponse(void)
- {
- mqtt_pingreq_status = 0;
- MQTT_PRINTF("receive a ping response \r\n");
-
- return 1;
- }
- /**
- * @brief 断开与服务器的MQTT连接和TCP连接
- * @param sock: sock编号
- */
- void mqtt_disconnectServer(int sock)
- {
- int len;
- uint8_t buf[2];
-
- memset(buf, 0, sizeof(buf));
- len = MQTTSerialize_disconnect(buf, sizeof(buf));
- transport_send(sock, buf, len);
-
- transport_close(sock);
- }
|