#include "string.h" #include "stm32f2xx.h" #include "ucos_ii.h" #include "hd_eth.h" #include "lwip/arch.h" #include "sys_mqtt.h" #include "transport.h" #include "MQTTFormat.h" #include "cJSON.h" #include "led.h" #include "MQTTClient.h" #include "malloc.h" #include "mmodbus.h" #include "myFile.h" #include "gateway_message.h" #include "task.h" #include "log.h" /******************************************************** * @brief 连接到服务器 * @param sock: sock编号 *********************************************************/ int mqtt_userConnect(void) { GATEWAY_PARAMS* get; get = get_gateway_config_params(); char* MQTT_SERVER_ADDR = (char*)get->host; int MQTT_SERVER_PORT = get->port; int sock = -1; //连接到tcp服务器 sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT); if(sock < 0) { MQTT_PRINTF("connect tcp server error \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server error"); return -1; } MQTT_PRINTF("connect tcp server success \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server success"); //连接到mqtt服务器 if(mqtt_connectToMqttServer(sock) <= 0) { MQTT_PRINTF("connect mqtt server error \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server error"); return -1; } MQTT_PRINTF("connect mqtt server success \r\n"); LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server success"); return sock; } /************************************************************ * @brief 订阅主题 * @param sock: sock编号 * @retval 1: 订阅成功 *************************************************************/ int mqtt_userSubscribeTopic(int sock) { GATEWAY_PARAMS* get; get = get_gateway_config_params(); char* TOPICPC = (char*)get->commandTopic; return mqtt_subscribeTopic(sock, TOPICPC, 2); } /* *接收mqtt下发消息并存储进一个数据队列 *name MQTT主题相关信息 */ void *json_message[10]; #define QUEUE_SIZE 10 //队列深度 //消息队列指针 OS_EVENT *JsonQ; void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos) { int lenght=msglen; MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf); StringInfo message; message.p=mymalloc(SRAMEX ,msglen); memcpy(message.p,msgbuf,msglen); OSMboxPost(mqtt_recvMseeageMbox,(void*)&message); } /************************************************************ * @brief 处理来自平台发布的信息 * @param sock: sock编号 pbuf: 接收的数据 buflen: 数据的长度 * @retval 1: 消息正常处理 * -2: 帧数据包错误 *************************************************************/ int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen) { uint8_t dup, retained; int qos, payloadlen; uint16_t packetid; MQTTString topicName; uint8_t *payload; int msg; if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1) { mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos); } else return -2; if(qos == 1) { mqtt_recvPublishQos1_packid = packetid; msg = MBOX_MQTT_QOS1PUBACK; OSMboxPost(mqtt_sendMseeageMbox, &msg); } if(qos == 2) { mqtt_recvPublishQos2_packid = packetid; msg = MBOX_MQTT_QOS2PUBREC; OSMboxPost(mqtt_sendMseeageMbox, &msg); } return 1; } /************************************************************ * @brief 解析所有平台下发的数据包 * @param sock: sock编号 type: 数据帧的类型 pbuf: 数据 buflen: 数据的长度 * @retval 1: 消息正常处理 * -2: 帧数据包错误 *************************************************************/ int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len) { int rc; switch(type) { case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break; //平台向设备发布消息 case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break; //设备向平台发布 QOS1 确认报文 case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break; //设备向平台发布 QOS2 确认报文 case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break; //平台向设备发布 QOS2 消息释放报文 case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break; //设备向平台发布 QOS2 消息释放报文的确认报文 case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break; //设备向平台订阅报文时的响应报文 case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break; //设备向平台发取消订阅报文时的响应报文 case PINGRESP: rc = mqtt_pingResponse(); break; //心跳响应报文 default: rc = 1; break; } return rc; } /************************************************************ * @brief 处理发送邮箱 mqtt_recvMbox 中的消息,根据消息的类型,发送对应的数据包 * @param sock: sock编号 boxMsg: 消息内容 * @retval 1: 消息正常处理 * -1: 网络错误 * -2: 组建发送数据时发送错误 *************************************************************/ char pubJsonString[jsonMaxSize]; //char pubJsonStringCopy[jsonMaxSize]; int mqtt_userSendMessage(int sock, int boxMsg) { GATEWAY_PARAMS* get; get = get_gateway_config_params(); char* TOPIC = (char*)get->messageTopic; int rc = 1; //用户发送的数据 if((boxMsg & 0xf0000000) == 0x20000000) { switch(boxMsg) { case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break; case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break; case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break; } } //协议站使用的数据 else if((boxMsg & 0xf0000000) == 0x10000000) { switch(boxMsg) { case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break; case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break; case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break; } } return rc; } OS_EVENT *mqtt_sendMseeageMbox; //发送消息邮箱 OS_EVENT *mqtt_recvMseeageMbox; //接收消息邮箱 int mqtt_connectFlag; //成功连接服务器标志 static int mysock; //连接mqtt服务器的sock编号 /************************************************************ * @brief MQTT主线程 * @param arg: 未使用 * @retval *************************************************************/ void mqtt_userManThread(void *arg) { int rc; uint8_t err; void *mboxMsg; MQTT_PRINTF("mqtt mainthread start \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt mainthread start"); __MQTT_START: //1.建立与服务器的连接 mysock = -1; mqtt_connectFlag = 0; while(mysock < 0) { mysock = mqtt_userConnect(); OSTimeDly(2000); } // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt connect success"); OSMboxAccept(mqtt_sendMseeageMbox); //清空mbox的数据 mqtt_connectFlag = 1; //2.订阅服务器的主题 rc = mqtt_userSubscribeTopic(mysock); if(rc <= 0) { MQTT_PRINTF("subscribe error \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe error"); if(rc == -1) goto __MQTT_START; //如果网络发生错误,重新建立连接 } else { MQTT_PRINTF("subscribe success \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe success"); } //3.循环发送数据 while(1) { mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 60000, &err); if(OS_ERR_NONE == err) { //接收到了 网络异常消息 if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __MQTT_START; else { if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1) goto __MQTT_START; memset(pubJsonString,0,strlen(pubJsonString)); } } else //超时没有发送数据包,发送心跳包 { rc = mqtt_pingReq(mysock); if(rc == -1) goto __MQTT_START; } OSTimeDly(100); } } /************************************************************ * @brief MQTT接收线程,处理所有平台下发的数据帧, CONNACK除外 * @param arg: 未使用 * @retval *************************************************************/ void mqtt_userReceiveThread(void *arg) { int len; int packetType; int msg; MQTT_PRINTF("mqtt receivethread start \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt receivethread start"); while(1) { if(mqtt_connectFlag == 1) { len = transport_receive(mysock); //阻塞接收数据 if(len <=0) { if(len == EWOULDBLOCK) //接收数据超时,重新接收 { MQTT_PRINTF("receive data timeout \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive data timeout"); continue; } else //接收数据时,网络发生了异常,给主线程发送消息,重新建立连接 { MQTT_PRINTF("sock close \r\n"); // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "sock close"); transport_close(mysock); mqtt_connectFlag = 0; msg = MBOX_NETWORK_ERROR; OSMboxPost(mqtt_sendMseeageMbox, &msg); } } memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH); packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata); mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len); } OSTimeDly(100); } } #define APP_TASK_MQTTMAIN_PRIO 6 #define APP_TASK_MQTTMAIN_STK_SIZE 1024 static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE]; #define APP_TASK_MQTTRECEIVE_PRIO 7 #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024 static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE]; /************************************************************ * @brief 创建MQTT发送和接收线程,初始化相关数据 * @param 此函数必须在LWIP协议栈初始完成以后再调用 * @retval *************************************************************/ void mqtt_threadCreate(void) { mqtt_connectFlag = 0; mqtt_sendMseeageMbox = OSMboxCreate(NULL); mqtt_recvMseeageMbox = OSMboxCreate(NULL); JsonQ = OSQCreate(json_message[0], QUEUE_SIZE); //创建json队列 OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO); OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO); }