sys_mqtt.c 9.4 KB


  1. #include "string.h"
  2. #include "stm32f2xx.h"
  3. #include "ucos_ii.h"
  4. #include "hd_eth.h"
  5. #include "lwip/arch.h"
  6. #include "sys_mqtt.h"
  7. #include "transport.h"
  8. #include "MQTTFormat.h"
  9. #include "cJSON.h"
  10. #include "led.h"
  11. #include "MQTTClient.h"
  12. #include "malloc.h"
  13. /********************************************************
  14. * @brief 连接到服务器
  15. * @param sock: sock编号
  16. *********************************************************/
  17. int mqtt_userConnect(void)
  18. {
  19. int sock = -1;
  20. //连接到tcp服务器
  21. sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT);
  22. if(sock < 0)
  23. {
  24. MQTT_PRINTF("connect tcp server error \r\n");
  25. return -1;
  26. }
  27. MQTT_PRINTF("connect tcp server success \r\n");
  28. //连接到mqtt服务器
  29. if(mqtt_connectToMqttServer(sock) <= 0)
  30. {
  31. MQTT_PRINTF("connect mqtt server error \r\n");
  32. return -1;
  33. }
  34. MQTT_PRINTF("connect mqtt server success \r\n");
  35. return sock;
  36. }
  37. /************************************************************
  38. * @brief 订阅主题
  39. * @param sock: sock编号
  40. * @retval 1: 订阅成功
  41. *************************************************************/
  42. int mqtt_userSubscribeTopic(int sock)
  43. {
  44. return mqtt_subscribeTopic(sock, TOPICPC, 2);
  45. }
  46. /*
  47. *接收mqtt下发消息并存储进一个数据队列
  48. *name MQTT主题相关信息
  49. */
  50. void *json_message[10];
  51. #define QUEUE_SIZE 10 //队列深度
  52. //消息队列指针
  53. OS_EVENT *JsonQ;
  54. void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos)
  55. {
  56. int lenght=msglen;
  57. MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
  58. StringInfo message;
  59. message.p=mymalloc(SRAMEX ,msglen);
  60. memcpy(message.p,msgbuf,msglen);
  61. uint8_t err;
  62. err=OSQPost(JsonQ,(void *)&message);
  63. switch(err)
  64. {
  65. case OS_ERR_NONE:
  66. break;
  67. case OS_ERR_Q_FULL:
  68. MQTT_PRINTF("receive a msg queue is full \r\n");
  69. break;
  70. default:
  71. break;
  72. }
  73. }
  74. /************************************************************
  75. * @brief 处理来自平台发布的信息
  76. * @param sock: sock编号
  77. pbuf: 接收的数据
  78. buflen: 数据的长度
  79. * @retval 1: 消息正常处理
  80. * -2: 帧数据包错误
  81. *************************************************************/
  82. int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen)
  83. {
  84. uint8_t dup, retained;
  85. int qos, payloadlen;
  86. uint16_t packetid;
  87. MQTTString topicName;
  88. uint8_t *payload;
  89. int msg;
  90. if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1)
  91. {
  92. mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos);
  93. }
  94. else return -2;
  95. if(qos == 1)
  96. {
  97. mqtt_recvPublishQos1_packid = packetid;
  98. msg = MBOX_MQTT_QOS1PUBACK;
  99. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  100. }
  101. if(qos == 2)
  102. {
  103. mqtt_recvPublishQos2_packid = packetid;
  104. msg = MBOX_MQTT_QOS2PUBREC;
  105. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  106. }
  107. return 1;
  108. }
  109. /************************************************************
  110. * @brief 解析所有平台下发的数据包
  111. * @param sock: sock编号
  112. type: 数据帧的类型
  113. pbuf: 数据
  114. buflen: 数据的长度
  115. * @retval 1: 消息正常处理
  116. * -2: 帧数据包错误
  117. *************************************************************/
  118. int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len)
  119. {
  120. int rc;
  121. switch(type)
  122. {
  123. case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break; //平台向设备发布消息
  124. case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break; //设备向平台发布 QOS1 确认报文
  125. case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break; //设备向平台发布 QOS2 确认报文
  126. case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break; //平台向设备发布 QOS2 消息释放报文
  127. case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break; //设备向平台发布 QOS2 消息释放报文的确认报文
  128. case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break; //设备向平台订阅报文时的响应报文
  129. case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break; //设备向平台发取消订阅报文时的响应报文
  130. case PINGRESP: rc = mqtt_pingResponse(); break; //心跳响应报文
  131. default: rc = 1; break;
  132. }
  133. return rc;
  134. }
  135. /************************************************************
  136. * @brief 处理发送邮箱 mqtt_recvMbox 中的消息,根据消息的类型,发送对应的数据包
  137. * @param sock: sock编号
  138. boxMsg: 消息内容
  139. * @retval 1: 消息正常处理
  140. * -1: 网络错误
  141. * -2: 组建发送数据时发送错误
  142. *************************************************************/
  143. char pubJsonString[jsonMaxSize];
  144. int mqtt_userSendMessage(int sock, int boxMsg)
  145. {
  146. int rc = 1;
  147. //用户发送的数据
  148. if((boxMsg & 0xf0000000) == 0x20000000)
  149. {
  150. switch(boxMsg)
  151. {
  152. case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPICSTM32, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  153. case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPICSTM32, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  154. case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPICSTM32, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  155. }
  156. }
  157. //协议站使用的数据
  158. else if((boxMsg & 0xf0000000) == 0x10000000)
  159. {
  160. switch(boxMsg)
  161. {
  162. case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break;
  163. case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break;
  164. case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break;
  165. }
  166. }
  167. return rc;
  168. }
  169. OS_EVENT *mqtt_sendMseeageMbox; //发送消息邮箱
  170. int mqtt_connectFlag; //成功连接服务器标志
  171. static int mysock; //连接mqtt服务器的sock编号
  172. /************************************************************
  173. * @brief MQTT主线程
  174. * @param arg: 未使用
  175. * @retval
  176. *************************************************************/
  177. void mqtt_userManThread(void *arg)
  178. {
  179. int rc;
  180. uint8_t err;
  181. void *mboxMsg;
  182. MQTT_PRINTF("mqtt mainthread start \r\n");
  183. __MQTT_START:
  184. //1.建立与服务器的连接
  185. mysock = -1;
  186. mqtt_connectFlag = 0;
  187. while(mysock < 0)
  188. {
  189. mysock = mqtt_userConnect();
  190. OSTimeDly(2000);
  191. }
  192. OSMboxAccept(mqtt_sendMseeageMbox); //清空mbox的数据
  193. mqtt_connectFlag = 1;
  194. //2.订阅服务器的主题
  195. rc = mqtt_userSubscribeTopic(mysock);
  196. if(rc <= 0)
  197. {
  198. MQTT_PRINTF("subscribe error \r\n");
  199. if(rc == -1) goto __MQTT_START; //如果网络发生错误,重新建立连接
  200. }
  201. else MQTT_PRINTF("subscribe success \r\n");
  202. //3.循环发送数据
  203. while(1)
  204. {
  205. mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 60000, &err);
  206. if(OS_ERR_NONE == err)
  207. {
  208. //接收到了 网络异常消息
  209. if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __MQTT_START;
  210. else
  211. {
  212. if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1)
  213. goto __MQTT_START;
  214. }
  215. }
  216. else //超时没有发送数据包,发送心跳包
  217. {
  218. rc = mqtt_pingReq(mysock);
  219. if(rc == -1) goto __MQTT_START;
  220. }
  221. OSTimeDly(1);
  222. }
  223. }
  224. /************************************************************
  225. * @brief MQTT接收线程,处理所有平台下发的数据帧, CONNACK除外
  226. * @param arg: 未使用
  227. * @retval
  228. *************************************************************/
  229. void mqtt_userReceiveThread(void *arg)
  230. {
  231. int len;
  232. int packetType;
  233. int msg;
  234. MQTT_PRINTF("mqtt receivethread start \r\n");
  235. while(1)
  236. {
  237. if(mqtt_connectFlag == 1)
  238. {
  239. len = transport_receive(mysock); //阻塞接收数据
  240. if(len <=0)
  241. {
  242. if(len == EWOULDBLOCK) //接收数据超时,重新接收
  243. {
  244. MQTT_PRINTF("receive data timeout \r\n");
  245. continue;
  246. }
  247. else //接收数据时,网络发生了异常,给主线程发送消息,重新建立连接
  248. {
  249. MQTT_PRINTF("sock close \r\n");
  250. transport_close(mysock);
  251. mqtt_connectFlag = 0;
  252. msg = MBOX_NETWORK_ERROR;
  253. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  254. }
  255. }
  256. memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH);
  257. packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata);
  258. mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len);
  259. }
  260. OSTimeDly(1);
  261. }
  262. }
  263. #define APP_TASK_MQTTMAIN_PRIO 6
  264. #define APP_TASK_MQTTMAIN_STK_SIZE 1024
  265. static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE];
  266. #define APP_TASK_MQTTRECEIVE_PRIO 7
  267. #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024
  268. static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE];
  269. /************************************************************
  270. * @brief 创建MQTT发送和接收线程,初始化相关数据
  271. * @param 此函数必须在LWIP协议栈初始完成以后再调用
  272. * @retval
  273. *************************************************************/
  274. void mqtt_threadCreate(void)
  275. {
  276. mqtt_connectFlag = 0;
  277. mqtt_sendMseeageMbox = OSMboxCreate(NULL);
  278. JsonQ = OSQCreate(json_message[0], QUEUE_SIZE); //创建json队列
  279. OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO);
  280. OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO);
  281. }