|
- #include "string.h"
- #include "stm32f2xx.h"
- #include "ucos_ii.h"
- #include "hd_eth.h"
- #include "lwip/arch.h"
- #include "sys_mqtt.h"
- #include "transport.h"
- #include "MQTTFormat.h"
- #include "cJSON.h"
- #include "led.h"
- #include "MQTTClient.h"
- #include "malloc.h"
- #include "mmodbus.h"
- #include "myFile.h"
- #include "gateway_message.h"
- #include "task.h"
- #include "log.h"
- /********************************************************
- * @brief 连接到服务器
- * @param sock: sock编号
- *********************************************************/
- int mqtt_userConnect(void)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* MQTT_SERVER_ADDR = "36.134.23.11";//(char*)get->host;
- int MQTT_SERVER_PORT = get->port;
- int sock = -1;
-
- //连接到tcp服务器
- sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT);
- if(sock < 0)
- {
- MQTT_PRINTF("connect tcp server error \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server error");
- return -1;
- }
- MQTT_PRINTF("connect tcp server success \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect tcp server success");
- //连接到mqtt服务器
- if(mqtt_connectToMqttServer(sock) <= 0)
- {
- MQTT_PRINTF("connect mqtt server error \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server error");
- return -1;
- }
- MQTT_PRINTF("connect mqtt server success \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "connect mqtt server success");
- return sock;
- }
- /************************************************************
- * @brief 订阅主题
- * @param sock: sock编号
- * @retval 1: 订阅成功
- *************************************************************/
- int mqtt_userSubscribeTopic(int sock)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* TOPICPC = (char*)get->commandTopic;
- // char* TOPICPC = "/device/DTtest0003/command";
- return mqtt_subscribeTopic(sock, TOPICPC, 2);
- }
- /*
- *接收mqtt下发消息并存储进一个数据队列
- *name MQTT主题相关信息
- */
- void *json_message[10];
- #define QUEUE_SIZE 10 //队列深度
- //消息队列指针
- OS_EVENT *JsonQ;
- void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos)
- {
- int lenght=msglen;
- MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
- StringInfo message;
- message.p=mymalloc(SRAMEX ,msglen);
- memcpy(message.p,msgbuf,msglen);
- OSMboxPost(mqtt_recvMseeageMbox,(void*)&message);
- }
- /************************************************************
- * @brief 处理来自平台发布的信息
- * @param sock: sock编号
- pbuf: 接收的数据
- buflen: 数据的长度
- * @retval 1: 消息正常处理
- * -2: 帧数据包错误
- *************************************************************/
- int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen)
- {
- uint8_t dup, retained;
- int qos, payloadlen;
- uint16_t packetid;
- MQTTString topicName;
- uint8_t *payload;
- int msg;
-
- if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1)
- {
- mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos);
- }
- else return -2;
-
- if(qos == 1)
- {
- mqtt_recvPublishQos1_packid = packetid;
- msg = MBOX_MQTT_QOS1PUBACK;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
-
- if(qos == 2)
- {
- mqtt_recvPublishQos2_packid = packetid;
- msg = MBOX_MQTT_QOS2PUBREC;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
-
- return 1;
- }
- /************************************************************
- * @brief 解析所有平台下发的数据包
- * @param sock: sock编号
- type: 数据帧的类型
- pbuf: 数据
- buflen: 数据的长度
- * @retval 1: 消息正常处理
- * -2: 帧数据包错误
- *************************************************************/
- int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len)
- {
- int rc;
-
- switch(type)
- {
- case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break; //平台向设备发布消息
- case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break; //设备向平台发布 QOS1 确认报文
- case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break; //设备向平台发布 QOS2 确认报文
- case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break; //平台向设备发布 QOS2 消息释放报文
- case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break; //设备向平台发布 QOS2 消息释放报文的确认报文
- case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break; //设备向平台订阅报文时的响应报文
- case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break; //设备向平台发取消订阅报文时的响应报文
- case PINGRESP: rc = mqtt_pingResponse(); break; //心跳响应报文
- default: rc = 1; break;
- }
-
- return rc;
- }
- /************************************************************
- * @brief 处理发送邮箱 mqtt_recvMbox 中的消息,根据消息的类型,发送对应的数据包
- * @param sock: sock编号
- boxMsg: 消息内容
- * @retval 1: 消息正常处理
- * -1: 网络错误
- * -2: 组建发送数据时发送错误
- *************************************************************/
- char pubJsonString[jsonMaxSize];
- char pubJsonStringCopy[jsonMaxSize];
- int mqtt_userSendMessage(int sock, int boxMsg)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* TOPIC = (char*)get->messageTopic;
- // char* TOPIC = "/device/DTtest0003";
- int rc = 1;
- //用户发送的数据
- if((boxMsg & 0xf0000000) == 0x20000000)
- {
- switch(boxMsg)
- {
- case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- }
- }
- //协议站使用的数据
- else if((boxMsg & 0xf0000000) == 0x10000000)
- {
- switch(boxMsg)
- {
- case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break;
- case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break;
- case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break;
- }
- }
-
- return rc;
- }
- OS_EVENT *mqtt_sendMseeageMbox; //发送消息邮箱
- OS_EVENT *mqtt_recvMseeageMbox; //接收消息邮箱
- int mqtt_connectFlag; //成功连接服务器标志
- static int mysock; //连接mqtt服务器的sock编号
- /************************************************************
- * @brief MQTT主线程
- * @param arg: 未使用
- * @retval
- *************************************************************/
- void mqtt_userManThread(void *arg)
- {
- int rc;
- uint8_t err;
- void *mboxMsg;
- MQTT_PRINTF("mqtt mainthread start \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt mainthread start");
- __MQTT_START:
-
- //1.建立与服务器的连接
- mysock = -1;
- mqtt_connectFlag = 0;
- while(mysock < 0)
- {
- mysock = mqtt_userConnect();
- OSTimeDly(2000);
- }
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt connect success");
- OSMboxAccept(mqtt_sendMseeageMbox); //清空mbox的数据
- mqtt_connectFlag = 1;
-
- //2.订阅服务器的主题
- rc = mqtt_userSubscribeTopic(mysock);
- if(rc <= 0)
- {
- MQTT_PRINTF("subscribe error \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe error");
- if(rc == -1) goto __MQTT_START; //如果网络发生错误,重新建立连接
- }
- else
- {
- MQTT_PRINTF("subscribe success \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "subscribe success");
- }
- //3.循环发送数据
- while(1)
- {
- mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 2000, &err);
- if(OS_ERR_NONE == err)
- {
- //接收到了 网络异常消息
- if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR)
- goto __MQTT_START;
- else
- {
- if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1)
- goto __MQTT_START;
- memset(pubJsonString,0,strlen(pubJsonString));
- }
- }
- else //超时没有发送数据包,发送心跳包
- {
- rc = mqtt_pingReq(mysock);
- if(rc == -1)
- goto __MQTT_START;
- }
- OSTimeDly(100);
- }
- }
- /************************************************************
- * @brief MQTT接收线程,处理所有平台下发的数据帧, CONNACK除外
- * @param arg: 未使用
- * @retval
- *************************************************************/
- void mqtt_userReceiveThread(void *arg)
- {
- int len;
- int packetType;
- int msg;
- MQTT_PRINTF("mqtt receivethread start \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "mqtt receivethread start");
- while(1)
- {
- if(mqtt_connectFlag == 1)
- {
- len = transport_receive(mysock); //阻塞接收数据
- if(len <=0)
- {
- if(len == EWOULDBLOCK) //接收数据超时,重新接收
- {
- MQTT_PRINTF("receive data timeout \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "receive data timeout");
- continue;
- }
- else //接收数据时,网络发生了异常,给主线程发送消息,重新建立连接
- {
- MQTT_PRINTF("sock close \r\n");
- // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "sock close");
- transport_close(mysock);
-
- mqtt_connectFlag = 0;
- msg = MBOX_NETWORK_ERROR;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
- }
-
- memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH);
- packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata);
- mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len);
- }
- OSTimeDly(100);
- }
- }
- #define APP_TASK_MQTTMAIN_PRIO 6
- #define APP_TASK_MQTTMAIN_STK_SIZE 1024
- static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE];
- #define APP_TASK_MQTTRECEIVE_PRIO 7
- #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024
- static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE];
- /************************************************************
- * @brief 创建MQTT发送和接收线程,初始化相关数据
- * @param 此函数必须在LWIP协议栈初始完成以后再调用
- * @retval
- *************************************************************/
- void mqtt_threadCreate(void)
- {
- mqtt_connectFlag = 0;
- mqtt_sendMseeageMbox = OSMboxCreate(NULL);
- mqtt_recvMseeageMbox = OSMboxCreate(NULL);
- JsonQ = OSQCreate(json_message[0], QUEUE_SIZE); //创建json队列
- OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO);
- OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO);
- }
|