MQTTClient.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730
  1. #include "transport.h"
  2. #include "MQTTFormat.h"
  3. #include "cJSON.h"
  4. #include "stdint.h"
  5. #include "stdio.h"
  6. #include "string.h"
  7. #include "MQTTClient.h"
  8. #include "sys_mqtt.h"
  9. uint8_t mqtt_sendBuf[MQTT_SENDBUF_LENGTH];
  10. uint8_t mqtt_recvbuffer[MQTT_RECVBUF_LENTH];
  11. //数据帧同步变量
  12. uint8_t mqtt_publishQos1_status;
  13. uint16_t mqtt_publishQos1_packid;
  14. uint8_t mqtt_publishQos2_status;
  15. uint16_t mqtt_publishQos2_packid;
  16. uint8_t mqtt_subscribe_status;
  17. uint16_t mqtt_subscribe_packid;
  18. uint8_t mqtt_unsubscribe_status;
  19. uint16_t mqtt_unsubscribe_packid;
  20. uint8_t mqtt_pingreq_status;
  21. uint16_t mqtt_recvPublishQos1_packid;
  22. uint16_t mqtt_recvPublishQos2_packid;
  23. //MQTT 数据包 报文id
  24. uint16_t mqtt_getPacketId(void)
  25. {
  26. static uint16_t id = 0;
  27. id ++;
  28. if(id == 0) id = 1;
  29. return id;
  30. }
  31. /**
  32. * @brief 连接到MQTT服务器
  33. * @param sock: 连接到服务器的 sock 编号
  34. * @retval 1:连接成功
  35. * -1:网络相关错误
  36. * -2: 连接参数配置错误或者服务器拒绝连接
  37. */
  38. int mqtt_connectToMqttServer(int sock)
  39. {
  40. int len;
  41. int buflen = sizeof(mqtt_sendBuf);
  42. MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
  43. //配置连接参数
  44. data.clientID.cstring = CLIENT_ID;
  45. data.username.cstring = USER_NAME;
  46. data.password.cstring = PASSWORD;
  47. data.keepAliveInterval = KEEPLIVE_TIME;
  48. data.MQTTVersion = MQTT_VERSION;
  49. data.cleansession = 1;
  50. //组装连接消息
  51. memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
  52. len = MQTTSerialize_connect((unsigned char *)mqtt_sendBuf, buflen, &data);
  53. len = transport_send(sock, mqtt_sendBuf, len); //发送连接报文
  54. if(len <= 0)
  55. {
  56. transport_close(sock);
  57. return -1;
  58. }
  59. len = transport_receive(sock); //等待返回报文
  60. if(len <= 0)
  61. {
  62. transport_close(sock);
  63. return -1;
  64. }
  65. //读取响应报文
  66. memset(mqtt_sendBuf, 0, sizeof(mqtt_sendBuf));
  67. if(MQTTPacket_read(mqtt_sendBuf, buflen, transport_getdata) == CONNACK)
  68. {
  69. //解析响应报文
  70. unsigned char sessionPresent, connack_rc;
  71. if((MQTTDeserialize_connack(&sessionPresent, &connack_rc, mqtt_sendBuf, buflen) != 1) || (connack_rc != 0))
  72. {
  73. transport_close(sock);
  74. return -2;
  75. }
  76. }
  77. else
  78. {
  79. transport_close(sock);
  80. return -2;
  81. }
  82. return 1;
  83. }
  84. /**
  85. * @brief 订阅服务器主题
  86. * @param sock: 连接到服务器的 sock 编号
  87. * topic: 主题名
  88. * qos: 订阅消息等级
  89. * @retval 1: 订阅成功
  90. * -1:网络相关错误
  91. * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
  92. */
  93. int mqtt_subscribeTopic(int sock, char *topic, int qos)
  94. {
  95. int len;
  96. int buflen;
  97. int req_qos;
  98. uint16_t packid;
  99. MQTTString topicString = MQTTString_initializer;
  100. int timeout;
  101. //设置参数
  102. buflen = sizeof(mqtt_sendBuf);
  103. memset(mqtt_sendBuf, 0, buflen);
  104. topicString.cstring = topic;
  105. req_qos = qos;
  106. packid = mqtt_getPacketId();
  107. //组装报文
  108. len = MQTTSerialize_subscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString, &req_qos);
  109. if(len <= 0) return -2;
  110. //设置状态
  111. mqtt_subscribe_status = 1;
  112. mqtt_subscribe_packid = packid;
  113. //发送报文
  114. len = transport_send(sock, mqtt_sendBuf, len);
  115. if(len <= 0)
  116. {
  117. transport_close(sock);
  118. return -1;
  119. }
  120. //等待接收线程数据响应
  121. timeout = 0;
  122. do
  123. {
  124. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  125. timeout += 10;
  126. }while((mqtt_subscribe_status) && (timeout < 2000));
  127. if(mqtt_subscribe_status == 1) return -2;
  128. return 1;
  129. }
  130. /**
  131. * @brief 订阅服务器主题时,解析服务器返回的响应
  132. * @param pbuf: 接收到数据
  133. buflen: 数据长度
  134. * @retval 1: 解析数据成功
  135. * -2:帧类型错误
  136. * -3: packid错误
  137. */
  138. int mqtt_subscribeTopic_SUBACK(uint8_t *pbuf, int buflen)
  139. {
  140. uint16_t pkid;
  141. int count;
  142. int qos;
  143. if(MQTTDeserialize_suback(&pkid, 1, &count, &qos, pbuf, buflen) == 1)
  144. {
  145. if(pkid == mqtt_subscribe_packid)
  146. {
  147. mqtt_subscribe_status = 0;
  148. }
  149. else
  150. {
  151. return -3;
  152. }
  153. }
  154. else return -2;
  155. return 1;
  156. }
  157. /**
  158. * @brief 取消已订阅的主题
  159. * @param sock: 连接到服务器的 sock 编号
  160. * topic: 主题名
  161. * qos: 订阅消息等级
  162. * @retval 1: 订阅成功
  163. * -1:网络相关错误
  164. * -2: 构建订阅数据错误或者未能接收到服务器的正确应答
  165. */
  166. int mqtt_unSubscribeTopic(int sock, char *topic)
  167. {
  168. int len;
  169. int buflen;
  170. uint16_t packid;
  171. MQTTString topicString = MQTTString_initializer;
  172. int timeout;
  173. //设置参数
  174. buflen = sizeof(mqtt_sendBuf);
  175. memset(mqtt_sendBuf, 0, buflen);
  176. topicString.cstring = topic;
  177. packid = mqtt_getPacketId();
  178. //组装报文
  179. len = MQTTSerialize_unsubscribe(mqtt_sendBuf, buflen, 0, packid, 1, &topicString);
  180. if(len <= 0) return -2;
  181. //设置状态
  182. mqtt_unsubscribe_status = 1;
  183. mqtt_unsubscribe_packid = packid;
  184. //发送报文
  185. len = transport_send(sock, mqtt_sendBuf, len);
  186. if(len <= 0)
  187. {
  188. transport_close(sock);
  189. return -1;
  190. }
  191. //等待接收线程数据响应
  192. do
  193. {
  194. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  195. timeout += 10;
  196. }while((mqtt_unsubscribe_status) && (timeout < 2000));
  197. if(mqtt_unsubscribe_status == 1) return -2;
  198. return 1;
  199. }
  200. /**
  201. * @brief 取消订阅服务器主题时,解析服务器返回的响应
  202. * @param pbuf: 接收到数据
  203. buflen: 数据长度
  204. * @retval 1: 解析数据成功
  205. * -2:帧类型错误
  206. * -3: packid错误
  207. */
  208. int mqtt_unSubscribeTopic_UNSUBACK(uint8_t *pbuf, int buflen)
  209. {
  210. uint16_t pkid;
  211. if(MQTTDeserialize_unsuback(&pkid, pbuf, buflen) == 1)
  212. {
  213. if(pkid == mqtt_unsubscribe_packid) mqtt_unsubscribe_status = 0;
  214. else return -3;
  215. }
  216. else return -2;
  217. return 1;
  218. }
  219. ////////////////////////////////////////////////////////////
  220. /**
  221. * @brief 向服务器发布消息函数
  222. * @param sock: sock 编号
  223. topic: 消息主题
  224. msg: 消息内容
  225. msg_len: 消息长度
  226. qos: 消息等级
  227. dup: 数据重发标志
  228. id: 数据包id
  229. * @retval 1: 消息帧发送成功
  230. * -1:网络错误
  231. * -2:数据组包错误
  232. */
  233. int mqtt_publishMassage(int sock, char *topic, uint8_t *msg, int msg_len, int qos, uint8_t dup, uint16_t id)
  234. {
  235. uint8_t retained = 0;
  236. MQTTString topicString = MQTTString_initializer;
  237. int len;
  238. topicString.cstring = topic;
  239. memset(mqtt_sendBuf, 0, MQTT_SENDBUF_LENGTH);
  240. len = MQTTSerialize_publish(mqtt_sendBuf, MQTT_SENDBUF_LENGTH, dup, qos, retained, id, topicString, msg, msg_len);
  241. if(len <=0) return -2;
  242. if(transport_send(sock, mqtt_sendBuf, len) <= 0)
  243. {
  244. transport_close(sock);
  245. return -1;
  246. }
  247. return 1;
  248. }
  249. /**
  250. * @brief 向服务器发布消息 QOS0 函数
  251. * @param sock: sock 编号
  252. topic: 消息主题
  253. msg: 消息内容
  254. msg_len: 消息长度
  255. * @retval 1: 消息帧发送成功
  256. * -1:网络错误
  257. * -2:数据组包错误
  258. */
  259. int mqtt_publishMessage_qos0(int sock, char *topic, uint8_t *msg, int msg_len)
  260. {
  261. return mqtt_publishMassage(sock, topic, msg, msg_len, 0, 0, 0);
  262. }
  263. /**
  264. * @brief 向服务器发布消息 QOS1 函数
  265. * @param sock: sock 编号
  266. topic: 消息主题
  267. msg: 消息内容
  268. msg_len: 消息长度
  269. * @retval 1: 消息帧发送成功
  270. * -1:网络错误
  271. * -2:未能收到服务器的响应
  272. */
  273. int mqtt_publishMessage_qos1(int sock, char *topic, uint8_t *msg, int msg_len)
  274. {
  275. int rc;
  276. uint16_t pkid;
  277. uint8_t dup;
  278. int timeout = 0;
  279. int cnt;
  280. dup = 0;
  281. cnt = 0;
  282. pkid = mqtt_getPacketId();
  283. __QOS1_PACKET_RESEND:
  284. mqtt_publishQos1_status = 1;
  285. mqtt_publishQos1_packid = pkid;
  286. rc = mqtt_publishMassage(sock, topic, msg, msg_len, 1, dup, pkid);
  287. if(rc <= 0) return rc;
  288. //等待接收线程接收到 平台响应包
  289. do
  290. {
  291. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  292. timeout += 10;
  293. }while((mqtt_publishQos1_status) && (timeout < 2000));
  294. if(mqtt_publishQos1_status) //消息发送后,没有得到服务器的回应,重新发送本条消息
  295. {
  296. dup = 1; //发送失败,设置重复消息
  297. cnt += 1;
  298. if(cnt >= 3) return -2; //重发3次均未成功,不再重发
  299. goto __QOS1_PACKET_RESEND;
  300. }
  301. return 1;
  302. }
  303. /**
  304. * @brief 向服务器发布QOS1消息时,服务器返回的响应
  305. * @param pbuf: 服务器返回的数据
  306. buflen: 数据长度
  307. * @retval 1: 消息解析成功
  308. * -2:消息解析失败或者 packid 错误
  309. */
  310. int mqtt_publishMessage_qos1_PUBACK(uint8_t *pbuf, int buflen)
  311. {
  312. uint8_t packettype, dup;
  313. uint16_t packetid;
  314. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  315. {
  316. if(mqtt_publishQos1_packid == packetid)
  317. {
  318. mqtt_publishQos1_status = 0;
  319. MQTT_PRINTF("receive platform qos1 response \r\n");
  320. }
  321. else return -2;
  322. }
  323. else return -2;
  324. return 1;
  325. }
  326. //////////////////////////////////////////////////////////////////向服务器发布消息////////////////////////////////////////////////////////////////////////////////
  327. /**
  328. * @brief 向服务器发布消息 QOS2 时,释放消息
  329. * @param sock: sock 编号
  330. id: 消息报文标识符
  331. * @retval 1: 消息帧发送成功
  332. * -1:网络错误
  333. */
  334. int mqtt_publishMessage_qos2_PUBREL(int sock, uint16_t id)
  335. {
  336. int len, rc;
  337. uint8_t buf[8];
  338. memset(buf, 0, sizeof(buf));
  339. len = MQTTSerialize_pubrel(buf, sizeof(buf), 0, id);
  340. rc = transport_send(sock, buf, len);
  341. if(rc <= 0)
  342. {
  343. transport_close(sock);
  344. return -1;
  345. }
  346. return 1;
  347. }
  348. /**
  349. * @brief 向服务器发布消息 QOS2 函数
  350. * @param sock: sock 编号
  351. topic: 消息主题
  352. msg: 消息内容
  353. msg_len: 消息长度
  354. * @retval 1: 消息帧发送成功
  355. * -1:网络错误
  356. * -2:未能收到服务器的响应
  357. */
  358. int mqtt_publishMessage_qos2(int sock, char *topic, uint8_t *msg, int msg_len)
  359. {
  360. int rc;
  361. uint16_t pkid;
  362. uint8_t dup;
  363. int timeout = 0;
  364. int cnt;
  365. dup = 0;
  366. cnt = 0;
  367. pkid = mqtt_getPacketId();
  368. __QOS2_PACKET_RESEND:
  369. //消息发送
  370. cnt ++;
  371. mqtt_publishQos2_status = 1;
  372. mqtt_publishQos2_packid = pkid;
  373. rc = mqtt_publishMassage(sock, topic, msg, msg_len, 2, dup, pkid);
  374. if(rc <= 0) return rc;
  375. //等待平台响应
  376. do
  377. {
  378. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  379. timeout += 10;
  380. }while((timeout < 2000) && (mqtt_publishQos2_status != 2));
  381. if(mqtt_publishQos2_status != 2)
  382. {
  383. dup = 1;
  384. if(cnt < 3) goto __QOS2_PACKET_RESEND;
  385. else return -2;
  386. }
  387. //发送 PUBREL
  388. rc = mqtt_publishMessage_qos2_PUBREL(sock, mqtt_publishQos2_packid);
  389. if(rc <= 0) return rc;
  390. mqtt_publishQos2_status = 3;
  391. //等待平台响应
  392. timeout = 0;
  393. do
  394. {
  395. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  396. timeout += 10;
  397. }while((timeout < 2000) && (mqtt_publishQos2_status != 4));
  398. if(mqtt_publishQos2_status != 4)
  399. {
  400. if(cnt < 3)
  401. {
  402. // dup = 1;
  403. goto __QOS2_PACKET_RESEND;
  404. }
  405. else return -2;
  406. }
  407. return 1;
  408. }
  409. /**
  410. * @brief 向服务器发布消息 QOS2 时,服务器返回消息
  411. * @param pbuf: 服务器返回的数据
  412. buflen: 数据的长度
  413. * @retval 1: 消息帧发送成功
  414. * -2:帧类型错误或者报文标识符错误
  415. */
  416. int mqtt_publishMessage_qos2_PUBREC(uint8_t *pbuf, int buflen)
  417. {
  418. uint8_t packettype, dup;
  419. uint16_t packetid;
  420. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  421. {
  422. if(mqtt_publishQos2_packid == packetid)
  423. {
  424. mqtt_publishQos2_status = 2;
  425. MQTT_PRINTF("receive platform qos2 PUBREC \r\n");
  426. }
  427. else return -2;
  428. }
  429. else return -2;
  430. return 1;
  431. }
  432. /**
  433. * @brief 向服务器发布消息 QOS2 时,服务器返回消息
  434. * @param pbuf: 服务器返回的数据
  435. buflen: 数据的长度
  436. * @retval 1: 消息帧发送成功
  437. * -2:帧类型错误或者报文标识符错误
  438. */
  439. int mqtt_publishMessage_qos2_PUBCOMP(uint8_t *pbuf, int buflen)
  440. {
  441. uint8_t packettype, dup;
  442. uint16_t packetid;
  443. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  444. {
  445. if(mqtt_publishQos2_packid == packetid)
  446. {
  447. mqtt_publishQos2_status = 4;
  448. MQTT_PRINTF("receive platform qos2 PUBCOMP \r\n");
  449. }
  450. else return -2;
  451. }
  452. else return -2;
  453. return 1;
  454. }
  455. ////////////////////////////////////////////////////////////////接收服务器下发的消息/////////////////////////////////////////////////////
  456. /**
  457. * @brief 对服务器做出应答
  458. * @param sock: sock编号
  459. id: 服务器发布消息时的报文标识符
  460. * @retval 1: 消息帧发送成功
  461. * -1:网络错误
  462. */
  463. int mqtt_recvPublishMessage_qos1_PUBACK(int sock, uint16_t id)
  464. {
  465. int len, rc;
  466. uint8_t buf[8];
  467. memset(buf, 0, sizeof(buf));
  468. len = MQTTSerialize_puback(buf, sizeof(buf), id);
  469. rc = transport_send(sock, buf, len);
  470. if(rc <= 0)
  471. {
  472. transport_close(sock);
  473. return -1;
  474. }
  475. MQTT_PRINTF("send to paltform qos1 PUBACK \r\n");
  476. return 1;
  477. }
  478. /**
  479. * @brief 对服务器做出应答
  480. * @param sock: sock编号
  481. id: 服务器发布消息时的报文标识符
  482. * @retval 1: 消息帧发送成功
  483. * -1:网络错误
  484. */
  485. int mqtt_recvPublishMessage_qos2_PUBREC(int sock, uint16_t id)
  486. {
  487. int len, rc;
  488. uint8_t buf[8];
  489. memset(buf, 0, sizeof(buf));
  490. len = MQTTSerialize_pubrec(buf, sizeof(buf), 0, id);
  491. rc = transport_send(sock, buf, len);
  492. if(rc <= 0)
  493. {
  494. transport_close(sock);
  495. return -1;
  496. }
  497. MQTT_PRINTF("send to platform qos2 PUBREC \r\n");
  498. return 1;
  499. }
  500. /**
  501. * @brief 对服务器做出PUBCOMP应答
  502. * @param sock: sock编号
  503. pbuf: 接收到服务器的数据缓存
  504. buflen: 数据长度
  505. * @retval 1: 消息帧发送成功
  506. * -2: 帧类型错误
  507. */
  508. int mqtt_recvPublishMessage_qos2_PUBREL(int sock, uint8_t *pbuf, int buflen)
  509. {
  510. uint8_t packettype, dup;
  511. uint16_t packetid;
  512. int msg;
  513. if(MQTTDeserialize_ack(&packettype, &dup, &packetid, pbuf, buflen) == 1)
  514. {
  515. MQTT_PRINTF("receive from platform qos2 PUBREL \r\n");
  516. mqtt_recvPublishQos2_packid = packetid;
  517. msg = MBOX_MQTT_QOS2PUBCOMP;
  518. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  519. }
  520. else return -2;
  521. return 1;
  522. }
  523. /**
  524. * @brief 发送COMP信号
  525. * @param sock: sock编号
  526. id: 服务器发布消息时的报文标识符
  527. * @retval 1: 消息帧发送成功
  528. * -1:网络错误
  529. */
  530. int mqtt_recvPublishMessage_qos2_PUBCOMP(int sock, uint16_t id)
  531. {
  532. int len, rc;
  533. uint8_t buf[8];
  534. memset(buf, 0, sizeof(buf));
  535. len = MQTTSerialize_pubcomp(buf, sizeof(buf), id);
  536. rc = transport_send(sock, buf, len);
  537. if(rc <= 0)
  538. {
  539. transport_close(sock);
  540. return -1;
  541. }
  542. MQTT_PRINTF("send to platform qos2 PUBCOMP \r\n");
  543. return 1;
  544. }
  545. /************************************************************
  546. * @brief 发送ping心跳包
  547. * @param sock: sock编号
  548. * @retval 1: 消息帧发送成功
  549. * -1:网络错误
  550. * -2: 帧组包错误或者未收到服务器响应
  551. *************************************************************/
  552. int mqtt_pingReq(int sock)
  553. {
  554. int len;
  555. int timeout = 0;
  556. //数据组包
  557. len = MQTTSerialize_pingreq(mqtt_sendBuf, MQTT_SENDBUF_LENGTH);
  558. if(len <= 0) return -2;
  559. mqtt_pingreq_status = 1;
  560. if(transport_send(sock, mqtt_sendBuf, len) <= 0) return -1;
  561. //等待接收线程 接收到响应包
  562. do
  563. {
  564. OSTimeDly(10/(1000/OS_TICKS_PER_SEC));
  565. timeout += 10;
  566. }while((timeout < 2000) && (mqtt_pingreq_status));
  567. if(mqtt_pingreq_status) return -2;
  568. return 1;
  569. }
  570. /**
  571. * @brief 解析心跳响应包
  572. * @retval 1: 消息解析成功
  573. */
  574. int mqtt_pingResponse(void)
  575. {
  576. mqtt_pingreq_status = 0;
  577. MQTT_PRINTF("receive a ping response \r\n");
  578. return 1;
  579. }
  580. /**
  581. * @brief 断开与服务器的MQTT连接和TCP连接
  582. * @param sock: sock编号
  583. */
  584. void mqtt_disconnectServer(int sock)
  585. {
  586. int len;
  587. uint8_t buf[2];
  588. memset(buf, 0, sizeof(buf));
  589. len = MQTTSerialize_disconnect(buf, sizeof(buf));
  590. transport_send(sock, buf, len);
  591. transport_close(sock);
  592. }