MQTTClient.c 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704
  1. #include "transport.h"
  2. #include "MQTTFormat.h"
  3. #include "cJSON.h"
  4. #include "stdint.h"
  5. #include "stdio.h"
  6. #include "string.h"
  7. #include "MQTTClient.h"
  8. #include "sys_mqtt.h"
  9. uint8_t mqtt_sendBuf[MQTT_SENDBUF_LENGTH];
  10. uint8_t mqtt_recvbuffer[MQTT_RECVBUF_LENTH];
  11. //数据帧同步变量
  12. uint8_t mqtt_publishQos1_status;
  13. uint16_t mqtt_publishQos1_packid;
  14. uint8_t mqtt_publishQos2_status;
  15. uint16_t mqtt_publishQos2_packid;
  16. uint8_t mqtt_subscribe_status;
  17. uint16_t mqtt_subscribe_packid;
  18. uint8_t mqtt_unsubscribe_status;
  19. uint16_t mqtt_unsubscribe_packid;
  20. uint8_t mqtt_pingreq_status;
  21. uint16_t mqtt_recvPublishQos1_packid;
  22. uint16_t mqtt_recvPublishQos2_packid;
  23. //MQTT 数据包 报文id
  24. uint16_t mqtt_getPacketId(void)
  25. {
  26. static uint16_t id = 0;
  27. id ++;
  28. if(id == 0) id = 1;
  29. return id;
  30. }
  31. /**
  32. * @brief 连接到MQTT服务器
  33. * @param sock: 连接到服务器的 sock 编号
  34. * @retval 1:连接成功
  35. * -1:网络相关错误
  36. * -2: 连接参数配置错误或者服务器拒绝连接
  37. */
  38. int mqtt_connectToMqttServer(int sock)
  39. {
  40. int len;
  41. int buflen = sizeof(mqtt_sendBuf);
  42. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  43. GATEWAY_PARAMS *get;
  44. get= get_gateway_config_params();
  45. //配置连接参数
  46. data.clientID.cstring = CLIENT_ID;
  47. data.username.cstring = (char*)get->username;
  48. data.password.cstring = (char*)get->passwd;
  49. data.keepAliveInterval = KEEPLIVE_TIME;
  50. data.MQTTVersion = MQTT_VERSION;
  51. data.cleansession = 1;
  52. //组装连接消息
  53. memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
  54. len = MQTTSerialize_connect((unsigned char *)mqtt_sendBuf, buflen, &data);
  55. len = transport_send(sock, mqtt_sendBuf, len); //发送连接报文
  56. if(len <= 0)
  57. {
  58. transport_close(sock);
  59. return -1;
  60. }
  61. len = transport_receive(sock); //等待返回报文
  62. if(len <= 0)
  63. {
  64. transport_close(sock);
  65. return -1;
  66. }
  67. //读取响应报文
  68. memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
  69. if(MQTTPacket_read(mqtt_sendBuf, buflen, transport_getdata) == CONNACK)
  70. {
  71. //解析响应报文
  72. unsigned char sessionPresent, connack_rc;
  73. if((MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqtt_sendBuf, buflen) != 1) || (connack_rc != 0))
  74. {
  75. transport_close(sock);
  76. return -2;
  77. }
  78. }
  79. else
  80. {
  81. transport_close(sock);
  82. return -2;
  83. }
  84. return 1;
  85. }
  86. /**
  87. * @brief 订阅服务器主题
  88. * @param sock: 连接到服务器的 sock 编号
  89. * topic: 主题名
  90. * qos: 订阅消息等级
  91. * @retval 1: 订阅成功
  92. * -1:网络相关错误
  93. * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
  94. */
  95. int mqtt_subscribeTopic(int sock, char *topic, int qos)
  96. {
  97. int len;
  98. int buflen;
  99. int req_qos;
  100. uint16_t packid;
  101. MQTTString topicString = MQTTString_initializer;
  102. int timeout;
  103. //设置参数
  104. buflen = sizeof(mqtt_sendBuf);
  105. memset(mqtt_sendBuf, 0, buflen);
  106. topicString.cstring = topic;
  107. req_qos = qos;
  108. packid = mqtt_getPacketId();
  109. //组装报文
  110. len = MQTTSerialize_subscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString, &req_qos);
  111. if(len <= 0) return -2;
  112. //设置状态
  113. mqtt_subscribe_status = 1;
  114. mqtt_subscribe_packid = packid;
  115. //发送报文
  116. len = transport_send(sock, mqtt_sendBuf, len);
  117. if(len <= 0)
  118. {
  119. transport_close(sock);
  120. return -1;
  121. }
  122. //等待接收线程数据响应
  123. timeout = 0;
  124. do
  125. {
  126. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  127. timeout += 10;
  128. }while((mqtt_subscribe_status) && (timeout < 2000));
  129. if(mqtt_subscribe_status == 1) return -2;
  130. return 1;
  131. }
  132. /**
  133. * @brief 订阅服务器主题时,解析服务器返回的响应
  134. * @param pbuf: 接收到数据
  135. buflen: 数据长度
  136. * @retval 1: 解析数据成功
  137. * -2:帧类型错误
  138. * -3: packid错误
  139. */
  140. int mqtt_subscribeTopic_SUBACK(uint8_t *pbuf, int buflen)
  141. {
  142. uint16_t pkid;
  143. int count;
  144. int qos;
  145. if(MQTTDeserialize_suback(&pkid, 1, &count, &qos, pbuf, buflen) == 1)
  146. {
  147. if(pkid == mqtt_subscribe_packid)
  148. {
  149. mqtt_subscribe_status = 0;
  150. }
  151. else
  152. {
  153. return -3;
  154. }
  155. }
  156. else return -2;
  157. return 1;
  158. }
  159. /**
  160. * @brief 取消已订阅的主题
  161. * @param sock: 连接到服务器的 sock 编号
  162. * topic: 主题名
  163. * qos: 订阅消息等级
  164. * @retval 1: 订阅成功
  165. * -1:网络相关错误
  166. * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
  167. */
  168. int mqtt_unSubscribeTopic(int sock, char *topic)
  169. {
  170. int len;
  171. int buflen;
  172. uint16_t packid;
  173. MQTTString topicString = MQTTString_initializer;
  174. int timeout;
  175. //设置参数
  176. buflen = sizeof(mqtt_sendBuf);
  177. memset(mqtt_sendBuf, 0, buflen);
  178. topicString.cstring = topic;
  179. packid = mqtt_getPacketId();
  180. //组装报文
  181. len = MQTTSerialize_unsubscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString);
  182. if(len <= 0) return -2;
  183. //设置状态
  184. mqtt_unsubscribe_status = 1;
  185. mqtt_unsubscribe_packid = packid;
  186. //发送报文
  187. len = transport_send(sock, mqtt_sendBuf, len);
  188. if(len <= 0)
  189. {
  190. transport_close(sock);
  191. return -1;
  192. }
  193. //等待接收线程数据响应
  194. do
  195. {
  196. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  197. timeout += 10;
  198. }while((mqtt_unsubscribe_status) && (timeout < 2000));
  199. if(mqtt_unsubscribe_status == 1) return -2;
  200. return 1;
  201. }
  202. /**
  203. * @brief 取消订阅服务器主题时,解析服务器返回的响应
  204. * @param pbuf: 接收到数据
  205. buflen: 数据长度
  206. * @retval 1: 解析数据成功
  207. * -2:帧类型错误
  208. * -3: packid错误
  209. */
  210. int mqtt_unSubscribeTopic_UNSUBACK(uint8_t *pbuf, int buflen)
  211. {
  212. uint16_t pkid;
  213. if(MQTTDeserialize_unsuback(&pkid, pbuf, buflen) == 1)
  214. {
  215. if(pkid == mqtt_unsubscribe_packid) mqtt_unsubscribe_status = 0;
  216. else return -3;
  217. }
  218. else return -2;
  219. return 1;
  220. }
  221. ////////////////////////////////////////////////////////////
  222. /**
  223. * @brief 向服务器发布消息函数
  224. * @param sock: sock 编号
  225. topic: 消息主题
  226. msg: 消息内容
  227. msg_len: 消息长度
  228. qos: 消息等级
  229. dup: 数据重发标志
  230. id: 数据包id
  231. * @retval 1: 消息帧发送成功
  232. * -1:网络错误
  233. * -2:数据组包错误
  234. */
  235. int mqtt_publishMassage(int sock, char *topic, uint8_t *msg, int msg_len, int qos, uint8_t dup, uint16_t id)
  236. {
  237. uint8_t retained = 0;
  238. MQTTString topicString = MQTTString_initializer;
  239. int len;
  240. topicString.cstring = topic;
  241. memset(mqtt_sendBuf, 0, MQTT_SENDBUF_LENGTH);
  242. len = MQTTSerialize_publish(mqtt_sendBuf, MQTT_SENDBUF_LENGTH, dup, qos, retained, id, topicString, msg, msg_len);
  243. if(len <=0) return -2;
  244. if(transport_send(sock, mqtt_sendBuf, len) <= 0)
  245. {
  246. transport_close(sock);
  247. return -1;
  248. }
  249. return 1;
  250. }
  251. /**
  252. * @brief 向服务器发布消息 QOS0 函数
  253. * @param sock: sock 编号
  254. topic: 消息主题
  255. msg: 消息内容
  256. msg_len: 消息长度
  257. * @retval 1: 消息帧发送成功
  258. * -1:网络错误
  259. * -2:数据组包错误
  260. */
  261. int mqtt_publishMessage_qos0(int sock, char *topic, uint8_t *msg, int msg_len)
  262. {
  263. return mqtt_publishMassage(sock, topic, msg, msg_len, 0, 0, 0);
  264. }
  265. /**
  266. * @brief 向服务器发布消息 QOS1 函数
  267. * @param sock: sock 编号
  268. topic: 消息主题
  269. msg: 消息内容
  270. msg_len: 消息长度
  271. * @retval 1: 消息帧发送成功
  272. * -1:网络错误
  273. * -2:未能收到服务器的响应
  274. */
  275. int mqtt_publishMessage_qos1(int sock, char *topic, uint8_t *msg, int msg_len)
  276. {
  277. int rc;
  278. uint16_t pkid;
  279. uint8_t dup;
  280. int timeout = 0;
  281. int cnt;
  282. dup = 0;
  283. cnt = 0;
  284. pkid = mqtt_getPacketId();
  285. __QOS1_PACKET_RESEND:
  286. mqtt_publishQos1_status = 1;
  287. mqtt_publishQos1_packid = pkid;
  288. rc = mqtt_publishMassage(sock, topic, msg, msg_len, 1, dup, pkid);
  289. if(rc <= 0) return rc;
  290. //等待接收线程接收到 平台响应包
  291. do
  292. {
  293. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  294. timeout += 10;
  295. }while((mqtt_publishQos1_status) && (timeout < 2000));
  296. if(mqtt_publishQos1_status) //消息发送后,没有得到服务器的回应,重新发送本条消息
  297. {
  298. dup = 1; //发送失败,设置重复消息
  299. cnt += 1;
  300. if(cnt >= 3) return -2; //重发3次均未成功,不再重发
  301. goto __QOS1_PACKET_RESEND;
  302. }
  303. return 1;
  304. }
  305. /**
  306. * @brief 向服务器发布QOS1消息时,服务器返回的响应
  307. * @param pbuf: 服务器返回的数据
  308. buflen: 数据长度
  309. * @retval 1: 消息解析成功
  310. * -2:消息解析失败或者 packid 错误
  311. */
  312. int mqtt_publishMessage_qos1_PUBACK(uint8_t *pbuf, int buflen)
  313. {
  314. uint8_t packettype, dup;
  315. uint16_t packetid;
  316. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  317. {
  318. if(mqtt_publishQos1_packid == packetid)
  319. {
  320. mqtt_publishQos1_status = 0;
  321. MQTT_PRINTF("receive platform qos1 response \r\n");
  322. }
  323. else return -2;
  324. }
  325. else return -2;
  326. return 1;
  327. }
  328. //////////////////////////////////////////////////////////////////向服务器发布消息////////////////////////////////////////////////////////////////////////////////
  329. /**
  330. * @brief 向服务器发布消息 QOS2 时,释放消息
  331. * @param sock: sock 编号
  332. id: 消息报文标识符
  333. * @retval 1: 消息帧发送成功
  334. * -1:网络错误
  335. */
  336. int mqtt_publishMessage_qos2_PUBREL(int sock, uint16_t id)
  337. {
  338. int len, rc;
  339. uint8_t buf[8];
  340. memset(buf, 0, sizeof(buf));
  341. len = MQTTSerialize_pubrel(buf, sizeof(buf), 0, id);
  342. rc = transport_send(sock, buf, len);
  343. if(rc <= 0)
  344. {
  345. transport_close(sock);
  346. return -1;
  347. }
  348. return 1;
  349. }
  350. /**
  351. * @brief 向服务器发布消息 QOS2 函数
  352. * @param sock: sock 编号
  353. topic: 消息主题
  354. msg: 消息内容
  355. msg_len: 消息长度
  356. * @retval 1: 消息帧发送成功
  357. * -1:网络错误
  358. * -2:未能收到服务器的响应
  359. */
  360. int mqtt_publishMessage_qos2(int sock, char *topic, uint8_t *msg, int msg_len)
  361. {
  362. int rc;
  363. uint16_t pkid;
  364. uint8_t dup;
  365. int timeout = 0;
  366. int cnt;
  367. dup = 0;
  368. cnt = 0;
  369. pkid = mqtt_getPacketId();
  370. __QOS2_PACKET_RESEND:
  371. //消息发送
  372. cnt ++;
  373. mqtt_publishQos2_status = 1;
  374. mqtt_publishQos2_packid = pkid;
  375. rc = mqtt_publishMassage(sock, topic, msg, msg_len, 2, dup, pkid);
  376. if(rc <= 0) return rc;
  377. //等待平台响应
  378. do
  379. {
  380. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  381. timeout += 10;
  382. }while((timeout < 2000) && (mqtt_publishQos2_status != 2));
  383. if(mqtt_publishQos2_status != 2)
  384. {
  385. dup = 1;
  386. if(cnt < 3) goto __QOS2_PACKET_RESEND;
  387. else return -2;
  388. }
  389. //发送 PUBREL
  390. rc = mqtt_publishMessage_qos2_PUBREL(sock, mqtt_publishQos2_packid);
  391. if(rc <= 0) return rc;
  392. mqtt_publishQos2_status = 3;
  393. //等待平台响应
  394. timeout = 0;
  395. do
  396. {
  397. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  398. timeout += 10;
  399. }while((timeout < 2000) && (mqtt_publishQos2_status != 4));
  400. if(mqtt_publishQos2_status != 4)
  401. {
  402. if(cnt < 3)
  403. {
  404. // dup = 1;
  405. goto __QOS2_PACKET_RESEND;
  406. }
  407. else return -2;
  408. }
  409. return 1;
  410. }
  411. /**
  412. * @brief 向服务器发布消息 QOS2 时,服务器返回消息
  413. * @param pbuf: 服务器返回的数据
  414. buflen: 数据的长度
  415. * @retval 1: 消息帧发送成功
  416. * -2:帧类型错误或者报文标识符错误
  417. */
  418. int mqtt_publishMessage_qos2_PUBREC(uint8_t *pbuf, int buflen)
  419. {
  420. uint8_t packettype, dup;
  421. uint16_t packetid;
  422. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  423. {
  424. if(mqtt_publishQos2_packid == packetid)
  425. {
  426. mqtt_publishQos2_status = 2;
  427. MQTT_PRINTF("receive platform qos2 PUBREC \r\n");
  428. }
  429. else return -2;
  430. }
  431. else return -2;
  432. return 1;
  433. }
  434. /**
  435. * @brief 向服务器发布消息 QOS2 时,服务器返回消息
  436. * @param pbuf: 服务器返回的数据
  437. buflen: 数据的长度
  438. * @retval 1: 消息帧发送成功
  439. * -2:帧类型错误或者报文标识符错误
  440. */
  441. int mqtt_publishMessage_qos2_PUBCOMP(uint8_t *pbuf, int buflen)
  442. {
  443. uint8_t packettype, dup;
  444. uint16_t packetid;
  445. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  446. {
  447. if(mqtt_publishQos2_packid == packetid)
  448. {
  449. mqtt_publishQos2_status = 4;
  450. MQTT_PRINTF("receive platform qos2 PUBCOMP \r\n");
  451. }
  452. else return -2;
  453. }
  454. else return -2;
  455. return 1;
  456. }
  457. ////////////////////////////////////////////////////////////////接收服务器下发的消息/////////////////////////////////////////////////////
  458. /**
  459. * @brief 对服务器做出应答
  460. * @param sock: sock编号
  461. id: 服务器发布消息时的报文标识符
  462. * @retval 1: 消息帧发送成功
  463. * -1:网络错误
  464. */
  465. int mqtt_recvPublishMessage_qos1_PUBACK(int sock, uint16_t id)
  466. {
  467. int len, rc;
  468. uint8_t buf[8];
  469. memset(buf, 0, sizeof(buf));
  470. len = MQTTSerialize_puback(buf, sizeof(buf), id);
  471. rc = transport_send(sock, buf, len);
  472. if(rc <= 0)
  473. {
  474. transport_close(sock);
  475. return -1;
  476. }
  477. MQTT_PRINTF("send to paltform qos1 PUBACK \r\n");
  478. return 1;
  479. }
  480. /**
  481. * @brief 对服务器做出应答
  482. * @param sock: sock编号
  483. id: 服务器发布消息时的报文标识符
  484. * @retval 1: 消息帧发送成功
  485. * -1:网络错误
  486. */
  487. int mqtt_recvPublishMessage_qos2_PUBREC(int sock, uint16_t id)
  488. {
  489. int len, rc;
  490. uint8_t buf[8];
  491. memset(buf, 0, sizeof(buf));
  492. len = MQTTSerialize_pubrec(buf, sizeof(buf), 0, id);
  493. rc = transport_send(sock, buf, len);
  494. if(rc <= 0)
  495. {
  496. transport_close(sock);
  497. return -1;
  498. }
  499. MQTT_PRINTF("send to platform qos2 PUBREC \r\n");
  500. return 1;
  501. }
  502. /**
  503. * @brief 对服务器做出PUBCOMP应答
  504. * @param sock: sock编号
  505. pbuf: 接收到服务器的数据缓存
  506. buflen: 数据长度
  507. * @retval 1: 消息帧发送成功
  508. * -2: 帧类型错误
  509. */
  510. int mqtt_recvPublishMessage_qos2_PUBREL(int sock, uint8_t *pbuf, int buflen)
  511. {
  512. uint8_t packettype, dup;
  513. uint16_t packetid;
  514. int msg;
  515. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  516. {
  517. MQTT_PRINTF("receive from platform qos2 PUBREL \r\n");
  518. mqtt_recvPublishQos2_packid = packetid;
  519. msg = MBOX_MQTT_QOS2PUBCOMP;
  520. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  521. }
  522. else return -2;
  523. return 1;
  524. }
  525. /**
  526. * @brief 发送COMP信号
  527. * @param sock: sock编号
  528. id: 服务器发布消息时的报文标识符
  529. * @retval 1: 消息帧发送成功
  530. * -1:网络错误
  531. */
  532. int mqtt_recvPublishMessage_qos2_PUBCOMP(int sock, uint16_t id)
  533. {
  534. int len, rc;
  535. uint8_t buf[8];
  536. memset(buf, 0, sizeof(buf));
  537. len = MQTTSerialize_pubcomp(buf, sizeof(buf), id);
  538. rc = transport_send(sock, buf, len);
  539. if(rc <= 0)
  540. {
  541. transport_close(sock);
  542. return -1;
  543. }
  544. MQTT_PRINTF("send to platform qos2 PUBCOMP \r\n");
  545. return 1;
  546. }
  547. /************************************************************
  548. * @brief 发送ping心跳包
  549. * @param sock: sock编号
  550. * @retval 1: 消息帧发送成功
  551. * -1:网络错误
  552. * -2: 帧组包错误或者未收到服务器响应
  553. *************************************************************/
  554. int mqtt_pingReq(int sock)
  555. {
  556. int len;
  557. int timeout = 0;
  558. //数据组包
  559. len = MQTTSerialize_pingreq(mqtt_sendBuf, MQTT_SENDBUF_LENGTH);
  560. if(len <= 0) return -2;
  561. mqtt_pingreq_status = 1;
  562. if(transport_send(sock, mqtt_sendBuf, len) <= 0) return -1;
  563. //等待接收线程 接收到响应包
  564. do
  565. {
  566. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  567. timeout += 10;
  568. }while((timeout < 2000) && (mqtt_pingreq_status));
  569. if(mqtt_pingreq_status) return -2;
  570. return 1;
  571. }
  572. /**
  573. * @brief 解析心跳响应包
  574. * @retval 1: 消息解析成功
  575. */
  576. int mqtt_pingResponse(void)
  577. {
  578. mqtt_pingreq_status = 0;
  579. MQTT_PRINTF("receive a ping response \r\n");
  580. return 1;
  581. }
  582. /**
  583. * @brief 断开与服务器的MQTT连接和TCP连接
  584. * @param sock: sock编号
  585. */
  586. void mqtt_disconnectServer(int sock)
  587. {
  588. int len;
  589. uint8_t buf[2];
  590. memset(buf, 0, sizeof(buf));
  591. len = MQTTSerialize_disconnect(buf, sizeof(buf));
  592. transport_send(sock, buf, len);
  593. transport_close(sock);
  594. }