sys_mqtt.c 11 KB


  1. #include "string.h"
  2. #include "stm32f2xx.h"
  3. #include "ucos_ii.h"
  4. #include "hd_eth.h"
  5. #include "lwip/arch.h"
  6. #include "sys_mqtt.h"
  7. #include "transport.h"
  8. #include "MQTTFormat.h"
  9. #include "cJSON.h"
  10. #include "led.h"
  11. #include "MQTTClient.h"
  12. #include "malloc.h"
  13. #include "mmodbus.h"
  14. #include "myFile.h"
  15. #include "gateway_message.h"
  16. #include "task.h"
  17. #include "log.h"
  18. /********************************************************
  19. * @brief 连接到服务器
  20. * @param sock: sock编号
  21. *********************************************************/
  22. int mqtt_userConnect(void)
  23. {
  24. GATEWAY_PARAMS* get;
  25. get = get_gateway_config_params();
  26. char* MQTT_SERVER_ADDR = "36.134.23.11";//(char*)get->host;
  27. int MQTT_SERVER_PORT = get->port;
  28. int sock = -1;
  29. //连接到tcp服务器
  30. sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT);
  31. if(sock < 0)
  32. {
  33. MQTT_PRINTF("connect tcp server error \r\n");
  34. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server error");
  35. return -1;
  36. }
  37. MQTT_PRINTF("connect tcp server success \r\n");
  38. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server success");
  39. //连接到mqtt服务器
  40. if(mqtt_connectToMqttServer(sock) <= 0)
  41. {
  42. MQTT_PRINTF("connect mqtt server error \r\n");
  43. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server error");
  44. return -1;
  45. }
  46. MQTT_PRINTF("connect mqtt server success \r\n");
  47. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server success");
  48. return sock;
  49. }
  50. /************************************************************
  51. * @brief 订阅主题
  52. * @param sock: sock编号
  53. * @retval 1: 订阅成功
  54. *************************************************************/
  55. int mqtt_userSubscribeTopic(int sock)
  56. {
  57. GATEWAY_PARAMS* get;
  58. get = get_gateway_config_params();
  59. char* TOPICPC = (char*)get->commandTopic;
  60. // char* TOPICPC = "/device/DTtest0003/command";
  61. return mqtt_subscribeTopic(sock, TOPICPC, 2);
  62. }
  63. /*
  64. *接收mqtt下发消息并存储进一个数据队列
  65. *name MQTT主题相关信息
  66. */
  67. void *json_message[10];
  68. #define QUEUE_SIZE 10 //队列深度
  69. //消息队列指针
  70. OS_EVENT *JsonQ;
  71. void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos)
  72. {
  73. int lenght=msglen;
  74. MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
  75. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
  76. StringInfo message;
  77. message.p=mymalloc(SRAMEX ,msglen);
  78. memcpy(message.p,msgbuf,msglen);
  79. OSMboxPost(mqtt_recvMseeageMbox,(void*)&message);
  80. }
  81. /************************************************************
  82. * @brief 处理来自平台发布的信息
  83. * @param sock: sock编号
  84. pbuf: 接收的数据
  85. buflen: 数据的长度
  86. * @retval 1: 消息正常处理
  87. * -2: 帧数据包错误
  88. *************************************************************/
  89. int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen)
  90. {
  91. uint8_t dup, retained;
  92. int qos, payloadlen;
  93. uint16_t packetid;
  94. MQTTString topicName;
  95. uint8_t *payload;
  96. int msg;
  97. if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1)
  98. {
  99. mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos);
  100. }
  101. else return -2;
  102. if(qos == 1)
  103. {
  104. mqtt_recvPublishQos1_packid = packetid;
  105. msg = MBOX_MQTT_QOS1PUBACK;
  106. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  107. }
  108. if(qos == 2)
  109. {
  110. mqtt_recvPublishQos2_packid = packetid;
  111. msg = MBOX_MQTT_QOS2PUBREC;
  112. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  113. }
  114. return 1;
  115. }
  116. /************************************************************
  117. * @brief 解析所有平台下发的数据包
  118. * @param sock: sock编号
  119. type: 数据帧的类型
  120. pbuf: 数据
  121. buflen: 数据的长度
  122. * @retval 1: 消息正常处理
  123. * -2: 帧数据包错误
  124. *************************************************************/
  125. int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len)
  126. {
  127. int rc;
  128. switch(type)
  129. {
  130. case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break; //平台向设备发布消息
  131. case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break; //设备向平台发布 QOS1 确认报文
  132. case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break; //设备向平台发布 QOS2 确认报文
  133. case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break; //平台向设备发布 QOS2 消息释放报文
  134. case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break; //设备向平台发布 QOS2 消息释放报文的确认报文
  135. case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break; //设备向平台订阅报文时的响应报文
  136. case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break; //设备向平台发取消订阅报文时的响应报文
  137. case PINGRESP: rc = mqtt_pingResponse(); break; //心跳响应报文
  138. default: rc = 1; break;
  139. }
  140. return rc;
  141. }
  142. /************************************************************
  143. * @brief 处理发送邮箱 mqtt_recvMbox 中的消息,根据消息的类型,发送对应的数据包
  144. * @param sock: sock编号
  145. boxMsg: 消息内容
  146. * @retval 1: 消息正常处理
  147. * -1: 网络错误
  148. * -2: 组建发送数据时发送错误
  149. *************************************************************/
  150. char pubJsonString[jsonMaxSize];
  151. char pubJsonStringCopy[jsonMaxSize];
  152. int mqtt_userSendMessage(int sock, int boxMsg)
  153. {
  154. GATEWAY_PARAMS* get;
  155. get = get_gateway_config_params();
  156. char* TOPIC = (char*)get->messageTopic;
  157. // char* TOPIC = "/device/DTtest0003";
  158. int rc = 1;
  159. //用户发送的数据
  160. if((boxMsg & 0xf0000000) == 0x20000000)
  161. {
  162. switch(boxMsg)
  163. {
  164. case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  165. case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  166. case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
  167. }
  168. }
  169. //协议站使用的数据
  170. else if((boxMsg & 0xf0000000) == 0x10000000)
  171. {
  172. switch(boxMsg)
  173. {
  174. case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break;
  175. case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break;
  176. case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break;
  177. }
  178. }
  179. return rc;
  180. }
  181. OS_EVENT *mqtt_sendMseeageMbox; //发送消息邮箱
  182. OS_EVENT *mqtt_recvMseeageMbox; //接收消息邮箱
  183. int mqtt_connectFlag; //成功连接服务器标志
  184. static int mysock; //连接mqtt服务器的sock编号
  185. /************************************************************
  186. * @brief MQTT主线程
  187. * @param arg: 未使用
  188. * @retval
  189. *************************************************************/
  190. void mqtt_userManThread(void *arg)
  191. {
  192. int rc;
  193. uint8_t err;
  194. void *mboxMsg;
  195. MQTT_PRINTF("mqtt mainthread start \r\n");
  196. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt mainthread start");
  197. __MQTT_START:
  198. //1.建立与服务器的连接
  199. mysock = -1;
  200. mqtt_connectFlag = 0;
  201. while(mysock < 0)
  202. {
  203. mysock = mqtt_userConnect();
  204. OSTimeDly(2000);
  205. }
  206. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt connect success");
  207. OSMboxAccept(mqtt_sendMseeageMbox); //清空mbox的数据
  208. mqtt_connectFlag = 1;
  209. //2.订阅服务器的主题
  210. rc = mqtt_userSubscribeTopic(mysock);
  211. if(rc <= 0)
  212. {
  213. MQTT_PRINTF("subscribe error \r\n");
  214. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe error");
  215. if(rc == -1) goto __MQTT_START; //如果网络发生错误,重新建立连接
  216. }
  217. else
  218. {
  219. MQTT_PRINTF("subscribe success \r\n");
  220. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe success");
  221. }
  222. //3.循环发送数据
  223. while(1)
  224. {
  225. mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 2000, &err);
  226. if(OS_ERR_NONE == err)
  227. {
  228. //接收到了 网络异常消息
  229. if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR)
  230. goto __MQTT_START;
  231. else
  232. {
  233. if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1)
  234. goto __MQTT_START;
  235. memset(pubJsonString,0,strlen(pubJsonString));
  236. }
  237. }
  238. else //超时没有发送数据包,发送心跳包
  239. {
  240. rc = mqtt_pingReq(mysock);
  241. if(rc == -1)
  242. goto __MQTT_START;
  243. }
  244. OSTimeDly(100);
  245. }
  246. }
  247. /************************************************************
  248. * @brief MQTT接收线程,处理所有平台下发的数据帧, CONNACK除外
  249. * @param arg: 未使用
  250. * @retval
  251. *************************************************************/
  252. void mqtt_userReceiveThread(void *arg)
  253. {
  254. int len;
  255. int packetType;
  256. int msg;
  257. MQTT_PRINTF("mqtt receivethread start \r\n");
  258. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt receivethread start");
  259. while(1)
  260. {
  261. if(mqtt_connectFlag == 1)
  262. {
  263. len = transport_receive(mysock); //阻塞接收数据
  264. if(len <=0)
  265. {
  266. if(len == EWOULDBLOCK) //接收数据超时,重新接收
  267. {
  268. MQTT_PRINTF("receive data timeout \r\n");
  269. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive data timeout");
  270. continue;
  271. }
  272. else //接收数据时,网络发生了异常,给主线程发送消息,重新建立连接
  273. {
  274. MQTT_PRINTF("sock close \r\n");
  275. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "sock close");
  276. transport_close(mysock);
  277. mqtt_connectFlag = 0;
  278. msg = MBOX_NETWORK_ERROR;
  279. OSMboxPost(mqtt_sendMseeageMbox, &msg);
  280. }
  281. }
  282. memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH);
  283. packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata);
  284. mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len);
  285. }
  286. OSTimeDly(100);
  287. }
  288. }
  289. #define APP_TASK_MQTTMAIN_PRIO 6
  290. #define APP_TASK_MQTTMAIN_STK_SIZE 1024
  291. static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE];
  292. #define APP_TASK_MQTTRECEIVE_PRIO 7
  293. #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024
  294. static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE];
  295. /************************************************************
  296. * @brief 创建MQTT发送和接收线程,初始化相关数据
  297. * @param 此函数必须在LWIP协议栈初始完成以后再调用
  298. * @retval
  299. *************************************************************/
  300. void mqtt_threadCreate(void)
  301. {
  302. mqtt_connectFlag = 0;
  303. mqtt_sendMseeageMbox = OSMboxCreate(NULL);
  304. mqtt_recvMseeageMbox = OSMboxCreate(NULL);
  305. JsonQ = OSQCreate(json_message[0], QUEUE_SIZE); //创建json队列
  306. OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO);
  307. OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO);
  308. }