lte_mqtt.c 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #include "LTE_MQTT.h"
  2. #include "lte.h"
  3. #include "stdint.h"
  4. #include "data_task.h"
  5. // 配置mqtt参数
  6. int MQTT_config()
  7. {
  8. uint16_t cat1_timeout = 0;
  9. char configMessage[64];
  10. snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"version\",%d,%d \r\n",client_send_idx, MQTT_4);
  11. while(Iot_SendCmd(configMessage,"OK", 1)){
  12. LTE_Delay(1);
  13. cat1_timeout ++;
  14. if(cat1_timeout >= 2000){
  15. return TIMEOUT;
  16. }
  17. }
  18. cat1_timeout = 0;
  19. memset(configMessage, 0, sizeof(configMessage));
  20. snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"keepalive\",%d,120 \r\n",client_send_idx);
  21. while(Iot_SendCmd(configMessage,"OK", 1)){
  22. LTE_Delay(1);
  23. cat1_timeout ++;
  24. if(cat1_timeout >= 2000){
  25. return TIMEOUT;
  26. }
  27. }
  28. // 配置需求参数
  29. cat1_timeout = 0;
  30. memset(configMessage, 0, sizeof(configMessage));
  31. snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"session\",%d,1 \r\n",client_send_idx);
  32. while(Iot_SendCmd(configMessage,"OK", 1)){
  33. LTE_Delay(1);
  34. cat1_timeout ++;
  35. if(cat1_timeout >= 2000){
  36. return TIMEOUT;
  37. }
  38. }
  39. cat1_timeout = 0;
  40. memset(configMessage, 0, sizeof(configMessage));
  41. snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"timeout\",%d,10,3,1 \r\n",client_send_idx);
  42. while(Iot_SendCmd(configMessage,"OK", 1)){
  43. LTE_Delay(1);
  44. cat1_timeout ++;
  45. if(cat1_timeout >= 2000){
  46. return TIMEOUT;
  47. }
  48. }
  49. cat1_timeout = 0;
  50. memset(configMessage, 0, sizeof(configMessage));
  51. snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"will\",%d \r\n",client_send_idx);
  52. while(Iot_SendCmd(configMessage,"OK", 1)){
  53. LTE_Delay(1);
  54. cat1_timeout ++;
  55. if(cat1_timeout >= 2000){
  56. return TIMEOUT;
  57. }
  58. }
  59. return SUCCESS;
  60. }
  61. uint8_t LTE_mqtt_pulish(char* message, void* buf )
  62. {
  63. uint8_t i;
  64. HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)message, strlen(message), HAL_MAX_DELAY);
  65. while(strstr((char*)&UART6_RX_BUF,">") == NULL){
  66. i++;
  67. delay_ms(100);
  68. if(i > 20)
  69. return 0;// 失败
  70. }
  71. HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)buf, strlen(buf), HAL_MAX_DELAY);
  72. return 1;// 成功
  73. }
  74. // 连接MQTT- 发送模式
  75. void MQTT_send_connect(void const* arg)
  76. {
  77. // MQTT_config();
  78. uint16_t cat1_timeout = 0;
  79. __Start:
  80. // 选择模式
  81. while(Iot_SendCmd("AT+QMTCFG=\"send/mode\",0,0 \r\n","OK",1)){
  82. LTE_Delay(1);
  83. cat1_timeout ++;
  84. if(cat1_timeout >= 2000){
  85. goto __Start;
  86. }
  87. }
  88. GATEWAY_PARAMS *get;
  89. get= get_gateway_config_params();
  90. // 打开mqtt
  91. cat1_timeout = 0;
  92. char message[64];
  93. snprintf(message, sizeof(message),"AT+QMTOPEN=%d,%s,%d \r\n",client_send_idx,get->host,get->port);
  94. while(Iot_SendCmd(message,"OK",20)){
  95. LTE_Delay(1);
  96. cat1_timeout ++;
  97. if(cat1_timeout >= 2000){
  98. goto __Start;
  99. }
  100. }
  101. // 连接服务器
  102. mqtt_connectFlag = 0;
  103. cat1_timeout = 0;
  104. memset(message, 0, sizeof(message));
  105. snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s,%s,%s\r\n",
  106. client_send_idx,(char*)&clientid1,get->username, get->passwd);
  107. while(Iot_SendCmd(message,"OK",10)){
  108. LTE_Delay(1);
  109. cat1_timeout ++;
  110. if(cat1_timeout >= 2000){
  111. goto __Start;
  112. }
  113. }
  114. int result,index;
  115. char* mqttStr = strstr((char*)&UART6_RX_BUF,"+QMTCONN:");
  116. sscanf(mqttStr, "+QMTCONN: %d,%d, ", &index, &result);
  117. #if 0
  118. // 拿返回的result做判断
  119. switch(result){
  120. case NETSUCCESS:
  121. break;
  122. case NETERR:
  123. goto __Start;
  124. case 1:
  125. goto __Start;
  126. }
  127. #endif
  128. // 订阅主题
  129. cat1_timeout = 0;
  130. memset(message, 0, sizeof(message));
  131. snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,%s,0 \r\n",client_send_idx,get->commandTopic);
  132. while(Iot_SendCmd(message,"OK", 5)){
  133. LTE_Delay(1);
  134. cat1_timeout ++;
  135. if(cat1_timeout >= 2000){
  136. goto __Start;
  137. }
  138. }
  139. mqtt_connectFlag = 1;
  140. mqtt_message_t msg;
  141. memset(&msg, 0, sizeof(msg));
  142. struct Pub_Queue *pxMessage;
  143. //发布消息
  144. while(1)
  145. {
  146. if(xQueue1 != 0)
  147. {
  148. if(xQueueReceive(xQueue1, &(pxMessage), (TickType_t)10))
  149. {
  150. // 处理队列内部数据并上传
  151. msg.payloadlen = pxMessage->pubLength;
  152. msg.qos = pxMessage->qos;
  153. msg.payload = (void *)pxMessage->message;
  154. cat1_timeout = 0;
  155. memset(message, 0, sizeof(message));
  156. snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,%s,%d\r\n",
  157. client_send_idx, get->messageTopic, strlen(msg.payload));
  158. LTE_mqtt_pulish(message, msg.payload);
  159. memset(UART6_RX_BUF, 0, BUFF_SIZE);
  160. myfree(SRAMEX, pxMessage->pub_topic);
  161. myfree(SRAMEX, pxMessage->message);
  162. myfree(SRAMEX, pxMessage);
  163. }
  164. // watchdog_feed();
  165. LTE_Delay(100);
  166. }
  167. }
  168. }
  169. // 连接MQTT- 接收模式
  170. void MQTT_recv_connect(void const* arg)
  171. {
  172. uint16_t cat1_timeout = 0;
  173. char message[64];
  174. while(1)
  175. {
  176. if(mqtt_connectFlag == 1)
  177. {
  178. // 选择模式
  179. memset(message, 0, sizeof(message));
  180. snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",1,0,1 \r\n");
  181. while(Iot_SendCmd(message,"OK", 1)){
  182. LTE_Delay(1);
  183. cat1_timeout ++;
  184. if(cat1_timeout >= 2000){
  185. break;
  186. }
  187. }
  188. //接收消息
  189. memset(UART6_RX_BUF, 0, BUFF_SIZE);
  190. char* cmd = "AT+QMTRECV=0,1\r\n";
  191. HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)cmd, strlen(cmd), HAL_MAX_DELAY);
  192. while(1){
  193. char* payloadstr = mymalloc(SRAMEX, 1024);
  194. payloadstr = strstr((char*)UART6_RX_BUF, "+QMTRECV:");
  195. if (payloadstr)
  196. {
  197. payloadstr = strstr((char*)UART6_RX_BUF, "{");
  198. sprintf(payloadstr + strlen(payloadstr) - 3 , " ");
  199. write_modbus_data(payloadstr);
  200. }
  201. myfree(SRAMEX, payloadstr);
  202. memset(UART6_RX_BUF, 0, BUFF_SIZE);
  203. LTE_Delay(100);
  204. }
  205. }
  206. // watchdog_feed();
  207. LTE_Delay(100);
  208. }
  209. }
  210. #if 0
  211. void MQTT_send_connect(void* arg)
  212. {
  213. uint16_t cat1_timeout = 0;
  214. void *mboxMsg;
  215. uint8_t err;
  216. char message[64];
  217. __Start:
  218. memset(message, 0, sizeof(message));
  219. snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",%d,0,1 \r\n",client_recv_idx);
  220. while(Iot_SendCmd(message,"OK", 5)){
  221. LTE_Delay(1);
  222. cat1_timeout ++;
  223. if(cat1_timeout >= 2000){
  224. goto __Start;
  225. }
  226. }
  227. // 打开mqtt
  228. cat1_timeout = 0;
  229. snprintf(message, sizeof(message),"AT+QMTOPEN=%d,\"36.134.23.11\",1883 \r\n",client_recv_idx);
  230. while(Iot_SendCmd(message,"OK",20)){
  231. LTE_Delay(1);
  232. cat1_timeout ++;
  233. if(cat1_timeout >= 2000){
  234. goto __Start;
  235. }
  236. }
  237. // 连接服务器
  238. mqtt_connectFlag = 0;
  239. cat1_timeout = 0;
  240. memset(message, 0, sizeof(message));
  241. snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s\r\n",client_send_idx,(char*)&clientid1);
  242. while(Iot_SendCmd(message,"OK",10)){
  243. LTE_Delay(1);
  244. cat1_timeout ++;
  245. if(cat1_timeout >= 2000){
  246. goto __Start;
  247. }
  248. }
  249. // 订阅主题
  250. cat1_timeout = 0;
  251. memset(message, 0, sizeof(message));
  252. snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,\"test0003/command\",0 \r\n",client_send_idx);
  253. while(Iot_SendCmd(message,"OK",10)){
  254. LTE_Delay(1);
  255. cat1_timeout ++;
  256. if(cat1_timeout >= 2000){
  257. goto __Start;
  258. }
  259. }
  260. while(1)
  261. {
  262. mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 1000, &err);
  263. if(OS_ERR_NONE == err)
  264. {
  265. if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __Start;
  266. else
  267. {
  268. cat1_timeout = 0;
  269. memset(message, 0, sizeof(message));
  270. snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,\"test0003\",%d\r\n",client_send_idx,strlen(pubJsonString));
  271. while(Iot_SendCmd(message,">",2)){
  272. LTE_Delay(1);
  273. cat1_timeout ++;
  274. if(cat1_timeout >= 2000){
  275. goto __Start;
  276. }
  277. }
  278. cat1_timeout = 0;
  279. while(Iot_SendCmd(pubJsonString,"+QMTPUBEX", 5)){
  280. LTE_Delay(1);
  281. cat1_timeout ++;
  282. if(cat1_timeout >= 2000){
  283. goto __Start;
  284. }
  285. }
  286. memset(pubJsonString, 0, strlen(pubJsonString));
  287. }
  288. }
  289. LTE_Delay(100);
  290. }
  291. }
  292. #endif
  293. void MQ_threadCreate()
  294. {
  295. // mqtt_client_t *client = NULL; // 创建一个客户端
  296. mqtt_log_init();
  297. printf("\nwelcome to mqttclient test...\n");
  298. xQueue1 = xQueueCreate(10, sizeof(struct Pub_Queue *)); // 创建一个mqtt上传的队列
  299. osThreadDef(MQTT_send_task, MQTT_send_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8);
  300. osThreadCreate (osThread(MQTT_send_task), NULL);
  301. osThreadDef(MQTT_recv_task, MQTT_recv_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8);
  302. osThreadCreate (osThread(MQTT_recv_task), NULL);
  303. }