123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- #include "string.h"
- #include "stm32f2xx.h"
- #include "ucos_ii.h"
- #include "hd_eth.h"
- #include "lwip/arch.h"
- #include "sys_mqtt.h"
- #include "transport.h"
- #include "MQTTFormat.h"
- #include "cJSON.h"
- #include "led.h"
- #include "MQTTClient.h"
- #include "malloc.h"
- #include "mmodbus.h"
- #include "myFile.h"
- #include "gateway_message.h"
- #include "task.h"
- int mqtt_userConnect(void)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* MQTT_SERVER_ADDR = (char*)get->host;
- int MQTT_SERVER_PORT = get->port;
- int sock = -1;
-
-
- sock = transport_open(MQTT_SERVER_ADDR, MQTT_SERVER_PORT);
- if(sock < 0)
- {
- MQTT_PRINTF("connect tcp server error \r\n");
- return -1;
- }
- MQTT_PRINTF("connect tcp server success \r\n");
-
- if(mqtt_connectToMqttServer(sock) <= 0)
- {
- MQTT_PRINTF("connect mqtt server error \r\n");
- return -1;
- }
- MQTT_PRINTF("connect mqtt server success \r\n");
-
- return sock;
- }
- int mqtt_userSubscribeTopic(int sock)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* TOPICPC = (char*)get->commandTopic;
- return mqtt_subscribeTopic(sock, TOPICPC, 2);
- }
- void *json_message[10];
- #define QUEUE_SIZE 10
- OS_EVENT *JsonQ;
- void mqtt_outputMsg(MQTTString *name, uint8_t *msgbuf, int msglen, uint16_t id, int qos)
- {
- int lenght=msglen;
- MQTT_PRINTF("receive a msg: id=%d qos=%d topic=%s msg=%s \r\n", id, qos, name->lenstring.data, msgbuf);
- StringInfo message;
- message.stringLength=msglen;
- message.p=mymalloc(SRAMIN ,msglen+1);
- memcpy(message.p,msgbuf,msglen);
- uint8_t err;
- OSTimeDly(1500);
- err=OSQPost(JsonQ,(void *)&message);
- commd = 0;
- switch(err)
- {
- case OS_ERR_NONE:
- break;
- case OS_ERR_Q_FULL:
- MQTT_PRINTF("receive a msg queue is full \r\n");
- break;
- default:
- break;
- }
- }
- int mqtt_recvPublishMessage(int sock, uint8_t *pbuf, int buflen)
- {
- uint8_t dup, retained;
- int qos, payloadlen;
- uint16_t packetid;
- MQTTString topicName;
- uint8_t *payload;
- int msg;
-
- if(MQTTDeserialize_publish(&dup, &qos, &retained, &packetid, &topicName, &payload, &payloadlen, pbuf, buflen) == 1)
- {
- mqtt_outputMsg(&topicName, payload, payloadlen, packetid, qos);
- }
- else return -2;
-
- if(qos == 1)
- {
- mqtt_recvPublishQos1_packid = packetid;
- msg = MBOX_MQTT_QOS1PUBACK;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
-
- if(qos == 2)
- {
- mqtt_recvPublishQos2_packid = packetid;
- msg = MBOX_MQTT_QOS2PUBREC;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
-
- return 1;
- }
- int mqtt_userReceiveMessage(int sock, int type, uint8_t *pbuf, int len)
- {
- int rc;
-
- switch(type)
- {
- case PUBLISH: rc = mqtt_recvPublishMessage(sock, pbuf, len); break;
- case PUBACK: rc = mqtt_publishMessage_qos1_PUBACK(pbuf, len); break;
- case PUBREC: rc = mqtt_publishMessage_qos2_PUBREC(pbuf, len); break;
- case PUBREL: rc = mqtt_recvPublishMessage_qos2_PUBREL(sock, pbuf, len); break;
- case PUBCOMP: rc = mqtt_publishMessage_qos2_PUBCOMP(pbuf, len); break;
- case SUBACK: rc = mqtt_subscribeTopic_SUBACK(pbuf, len);break;
- case UNSUBACK: rc = mqtt_unSubscribeTopic_UNSUBACK(pbuf, len); break;
- case PINGRESP: rc = mqtt_pingResponse(); break;
- default: rc = 1; break;
- }
-
- return rc;
- }
- char pubJsonString[jsonMaxSize];
- char pubJsonStringCopy[jsonMaxSize];
- int mqtt_userSendMessage(int sock, int boxMsg)
- {
- GATEWAY_PARAMS* get;
- get = get_gateway_config_params();
- char* TOPIC = (char*)get->messageTopic;
- int rc = 1;
-
- if((boxMsg & 0xf0000000) == 0x20000000)
- {
- switch(boxMsg)
- {
- case MBOX_USER_PUBLISHQOS0: rc = mqtt_publishMessage_qos0(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- case MBOX_USER_PUBLISHQOS1: rc = mqtt_publishMessage_qos1(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- case MBOX_USER_PUBLISHQOS2: rc = mqtt_publishMessage_qos2(sock, TOPIC, (uint8_t *)pubJsonString, strlen(pubJsonString)); break;
- }
- }
-
- else if((boxMsg & 0xf0000000) == 0x10000000)
- {
- switch(boxMsg)
- {
- case MBOX_MQTT_QOS1PUBACK: rc = mqtt_recvPublishMessage_qos1_PUBACK(sock, mqtt_recvPublishQos1_packid); break;
- case MBOX_MQTT_QOS2PUBREC: rc = mqtt_recvPublishMessage_qos2_PUBREC(sock, mqtt_recvPublishQos2_packid); break;
- case MBOX_MQTT_QOS2PUBCOMP: rc = mqtt_recvPublishMessage_qos2_PUBCOMP(sock, mqtt_recvPublishQos2_packid); break;
- }
- }
-
- return rc;
- }
- OS_EVENT *mqtt_sendMseeageMbox;
- int mqtt_connectFlag;
- static int mysock;
- void mqtt_userManThread(void *arg)
- {
- int rc;
- uint8_t err;
- void *mboxMsg;
- MQTT_PRINTF("mqtt mainthread start \r\n");
-
- __MQTT_START:
-
-
- mysock = -1;
- mqtt_connectFlag = 0;
- while(mysock < 0)
- {
- mysock = mqtt_userConnect();
- OSTimeDly(2000);
- }
- OSMboxAccept(mqtt_sendMseeageMbox);
- mqtt_connectFlag = 1;
-
-
- rc = mqtt_userSubscribeTopic(mysock);
- if(rc <= 0)
- {
- MQTT_PRINTF("subscribe error \r\n");
- if(rc == -1) goto __MQTT_START;
- }
- else MQTT_PRINTF("subscribe success \r\n");
-
-
- while(1)
- {
- mboxMsg = OSMboxPend(mqtt_sendMseeageMbox, 60000, &err);
- if(OS_ERR_NONE == err)
- {
-
- if((*(unsigned int *)mboxMsg) == MBOX_NETWORK_ERROR) goto __MQTT_START;
- else
- {
- if(mqtt_userSendMessage(mysock, *(unsigned int *)mboxMsg) == -1)
- goto __MQTT_START;
- memset(pubJsonString,0,strlen(pubJsonString));
- }
- }
- else
- {
- rc = mqtt_pingReq(mysock);
- if(rc == -1) goto __MQTT_START;
- }
- OSTimeDly(1);
- }
- }
- void mqtt_userReceiveThread(void *arg)
- {
- int len;
- int packetType;
- int msg;
- MQTT_PRINTF("mqtt receivethread start \r\n");
-
- while(1)
- {
- if(mqtt_connectFlag == 1)
- {
- len = transport_receive(mysock);
- if(len <=0)
- {
- if(len == EWOULDBLOCK)
- {
- MQTT_PRINTF("receive data timeout \r\n");
- continue;
- }
- else
- {
- MQTT_PRINTF("sock close \r\n");
- transport_close(mysock);
-
- mqtt_connectFlag = 0;
- msg = MBOX_NETWORK_ERROR;
- OSMboxPost(mqtt_sendMseeageMbox, &msg);
- }
- }
-
- memset(mqtt_recvbuffer, 0, MQTT_RECVBUF_LENTH);
- packetType = MQTTPacket_read(mqtt_recvbuffer, len, transport_getdata);
- mqtt_userReceiveMessage(mysock, packetType, mqtt_recvbuffer, len);
- }
- OSTimeDly(1);
- }
- }
- #define APP_TASK_MQTTMAIN_PRIO 6
- #define APP_TASK_MQTTMAIN_STK_SIZE 1024
- static OS_STK mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE];
- #define APP_TASK_MQTTRECEIVE_PRIO 7
- #define APP_TASK_MQTTRECEIVE_STK_SIZE 1024
- static OS_STK mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE];
- void mqtt_threadCreate(void)
- {
- mqtt_connectFlag = 0;
- mqtt_sendMseeageMbox = OSMboxCreate(NULL);
- JsonQ = OSQCreate(json_message[0], QUEUE_SIZE);
- OSTaskCreate(mqtt_userManThread, NULL, &mqttmainTaskStack[APP_TASK_MQTTMAIN_STK_SIZE-1], APP_TASK_MQTTMAIN_PRIO);
- OSTaskCreate(mqtt_userReceiveThread, NULL, &mqttreceiveTaskStack[APP_TASK_MQTTRECEIVE_STK_SIZE-1], APP_TASK_MQTTRECEIVE_PRIO);
- }
|