123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- #include "LTE_MQTT.h"
- #include "lte.h"
- #include "stdint.h"
- #include "data_task.h"
- // 配置mqtt参数
- int MQTT_config()
- {
- uint16_t cat1_timeout = 0;
- char configMessage[64];
- snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"version\",%d,%d \r\n",client_send_idx, MQTT_4);
- while(Iot_SendCmd(configMessage,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- return TIMEOUT;
- }
- }
-
- cat1_timeout = 0;
- memset(configMessage, 0, sizeof(configMessage));
- snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"keepalive\",%d,120 \r\n",client_send_idx);
- while(Iot_SendCmd(configMessage,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- return TIMEOUT;
- }
- }
- // 配置需求参数
- cat1_timeout = 0;
- memset(configMessage, 0, sizeof(configMessage));
- snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"session\",%d,1 \r\n",client_send_idx);
- while(Iot_SendCmd(configMessage,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- return TIMEOUT;
- }
- }
-
- cat1_timeout = 0;
- memset(configMessage, 0, sizeof(configMessage));
- snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"timeout\",%d,10,3,1 \r\n",client_send_idx);
- while(Iot_SendCmd(configMessage,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- return TIMEOUT;
- }
- }
-
- cat1_timeout = 0;
- memset(configMessage, 0, sizeof(configMessage));
- snprintf(configMessage, sizeof(configMessage), "AT+QMTCFG=\"will\",%d \r\n",client_send_idx);
- while(Iot_SendCmd(configMessage,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- return TIMEOUT;
- }
- }
- return SUCCESS;
- }
- uint8_t LTE_mqtt_pulish(char* message, void* buf )
- {
- uint8_t i;
- HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)message, strlen(message), HAL_MAX_DELAY);
- while(strstr((char*)&UART6_RX_BUF,">") == NULL){
- i++;
- delay_ms(100);
- if(i > 20)
- return 0;// 失败
- }
- HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)buf, strlen(buf), HAL_MAX_DELAY);
- return 1;// 成功
- }
- // 连接MQTT- 发送模式
- void MQTT_send_connect(void const* arg)
- {
- // MQTT_config();
- uint16_t cat1_timeout = 0;
- __Start:
- // 选择模式
- while(Iot_SendCmd("AT+QMTCFG=\"send/mode\",0,0 \r\n","OK",1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
-
- GATEWAY_PARAMS *get;
- get= get_gateway_config_params();
- // 打开mqtt
- cat1_timeout = 0;
- char message[64];
- snprintf(message, sizeof(message),"AT+QMTOPEN=%d,%s,%d \r\n",client_send_idx,get->host,get->port);
- while(Iot_SendCmd(message,"OK",20)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
-
- // 连接服务器
- mqtt_connectFlag = 0;
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s,%s,%s\r\n",
- client_send_idx,(char*)&clientid1,get->username, get->passwd);
- while(Iot_SendCmd(message,"OK",10)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- int result,index;
- char* mqttStr = strstr((char*)&UART6_RX_BUF,"+QMTCONN:");
- sscanf(mqttStr, "+QMTCONN: %d,%d, ", &index, &result);
- #if 0
- // 拿返回的result做判断
- switch(result){
- case NETSUCCESS:
- break;
- case NETERR:
- goto __Start;
- case 1:
- goto __Start;
-
- }
- #endif
-
- // 订阅主题
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,%s,0 \r\n",client_send_idx,get->commandTopic);
- while(Iot_SendCmd(message,"OK", 5)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- mqtt_connectFlag = 1;
- mqtt_message_t msg;
- memset(&msg, 0, sizeof(msg));
- struct Pub_Queue *pxMessage;
- //发布消息
- while(1)
- {
- if(xQueue1 != 0)
- {
- if(xQueueReceive(xQueue1, &(pxMessage), (TickType_t)10))
- {
- // 处理队列内部数据并上传
- msg.payloadlen = pxMessage->pubLength;
- msg.qos = pxMessage->qos;
- msg.payload = (void *)pxMessage->message;
-
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,%s,%d\r\n",
- client_send_idx, get->messageTopic, strlen(msg.payload));
-
- LTE_mqtt_pulish(message, msg.payload);
-
- memset(UART6_RX_BUF, 0, BUFF_SIZE);
- myfree(SRAMEX, pxMessage->pub_topic);
- myfree(SRAMEX, pxMessage->message);
- myfree(SRAMEX, pxMessage);
-
- }
- // watchdog_feed();
- LTE_Delay(100);
- }
- }
- }
- // 连接MQTT- 接收模式
- void MQTT_recv_connect(void const* arg)
- {
- uint16_t cat1_timeout = 0;
- char message[64];
- while(1)
- {
- if(mqtt_connectFlag == 1)
- {
- // 选择模式
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",1,0,1 \r\n");
- while(Iot_SendCmd(message,"OK", 1)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- break;
- }
- }
- //接收消息
- memset(UART6_RX_BUF, 0, BUFF_SIZE);
- char* cmd = "AT+QMTRECV=0,1\r\n";
- HAL_UART_Transmit(&USART_InitStruct_DEBUG, (uint8_t*)cmd, strlen(cmd), HAL_MAX_DELAY);
- while(1){
- char* payloadstr = mymalloc(SRAMEX, 1024);
- payloadstr = strstr((char*)UART6_RX_BUF, "+QMTRECV:");
- if (payloadstr)
- {
- payloadstr = strstr((char*)UART6_RX_BUF, "{");
- sprintf(payloadstr + strlen(payloadstr) - 3 , " ");
- write_modbus_data(payloadstr);
- }
- myfree(SRAMEX, payloadstr);
- memset(UART6_RX_BUF, 0, BUFF_SIZE);
- LTE_Delay(100);
- }
- }
- // watchdog_feed();
- LTE_Delay(100);
- }
- }
- #if 0
- void MQTT_send_connect(void* arg)
- {
- uint16_t cat1_timeout = 0;
- void *mboxMsg;
- uint8_t err;
- char message[64];
- __Start:
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTCFG=\"recv/mode\",%d,0,1 \r\n",client_recv_idx);
- while(Iot_SendCmd(message,"OK", 5)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- // 打开mqtt
- cat1_timeout = 0;
- snprintf(message, sizeof(message),"AT+QMTOPEN=%d,\"36.134.23.11\",1883 \r\n",client_recv_idx);
- while(Iot_SendCmd(message,"OK",20)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- // 连接服务器
- mqtt_connectFlag = 0;
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTCONN=%d,%s\r\n",client_send_idx,(char*)&clientid1);
- while(Iot_SendCmd(message,"OK",10)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- // 订阅主题
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTSUB=%d,1,\"test0003/command\",0 \r\n",client_send_idx);
- while(Iot_SendCmd(message,"OK",10)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- while(1)
- {
-
- mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 1000, &err);
- if(OS_ERR_NONE == err)
- {
- if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __Start;
- else
- {
- cat1_timeout = 0;
- memset(message, 0, sizeof(message));
- snprintf(message, sizeof(message), "AT+QMTPUBEX=%d,0,0,0,\"test0003\",%d\r\n",client_send_idx,strlen(pubJsonString));
- while(Iot_SendCmd(message,">",2)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- cat1_timeout = 0;
- while(Iot_SendCmd(pubJsonString,"+QMTPUBEX", 5)){
- LTE_Delay(1);
- cat1_timeout ++;
- if(cat1_timeout >= 2000){
- goto __Start;
- }
- }
- memset(pubJsonString, 0, strlen(pubJsonString));
- }
- }
- LTE_Delay(100);
- }
-
- }
- #endif
- void MQ_threadCreate()
- {
- // mqtt_client_t *client = NULL; // 创建一个客户端
- mqtt_log_init();
- printf("\nwelcome to mqttclient test...\n");
- xQueue1 = xQueueCreate(10, sizeof(struct Pub_Queue *)); // 创建一个mqtt上传的队列
- osThreadDef(MQTT_send_task, MQTT_send_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8);
- osThreadCreate (osThread(MQTT_send_task), NULL);
- osThreadDef(MQTT_recv_task, MQTT_recv_connect, osPriorityNormal, 0, configMINIMAL_STACK_SIZE * 8);
- osThreadCreate (osThread(MQTT_recv_task), NULL);
- }
|