| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520 | #include "task.h"#include "cjson.h"#include "myFile.h"#include "gateway_message.h"#include "log.h"#include "malloc.h"#include "sx1276.h"#include "dlt645.h"#include "usart.h"#include "node_data_acquisition.h"#include "sys_mqtt.h"#include "sys_http.h"#include "node_message.h"#include "usart.h"#include "mmodbus.h"#include "sys_mqtt.h"#include "gateway_message.h"#include "MQTTClient.h"#include "cJSON.h"#include "time_count.h"#include "dlt645_1997_private.h"char string[512];uint8_t read_cnt = 0;uint8_t count = 0;uint8_t jsonCunt = 1;	/***********************************************************************************************************	函 数 名: void data_task(void *pdata)*	功能说明: 主要是data_task处理线程,优先级高。其运行逻辑是将nandflash中的数据解析出来轮询发送数据*	形    参:无*	返 回 值: 无**********************************************************************************************************/void data_task(void *pdata){				OS_CPU_SR cpu_sr;		pdata = pdata;		dlt645_init(100);		mmodbus_init(1);						char *device_config_json = mymalloc(SRAMEX, 9 * 1024);		read_file("device.txt", device_config_json);		addGatewayParams(device_config_json);		myfree(SRAMEX, device_config_json);				GATEWAY_PARAMS *get;		get= get_gateway_config_params();		DEVICE_PARAMS *current_device=get->device_params;//		Config_485_Port(get->baudrate, get->dataBits, get->stopBit, get->parity, get->flowControl);		char *buf = mymalloc(SRAMEX, 9 * 1024);	// 接收读取的数据		memset(buf, 0, 9 * 1024);				while (current_device!=NULL)		{					time1 = GetCurrentTime();			if(mqtt_connectFlag)			{					if(jsonCunt || time2  <= time1 - (10 * 1000))// 10s进行一次全数据发送					{							read_device_data1(current_device, buf);							send_mqtt(buf);							jsonCunt = 0;							memset(buf,0,strlen(buf));								current_device=get->device_params;								time2 = GetCurrentTime();//							LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "data for all");					}					else					{						read_device_data2(current_device, buf);						if(count > 0)// count检测buf内是否含有数据						{							send_mqtt(buf);							memset(buf,0,strlen(buf));								current_device=get->device_params;							count = 0;//							LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "different data");						}					}				}				OSTimeDly(100);		}		myfree(SRAMEX, buf);}	/***********************************************************************************************************	函 数 名: void mqtt_to_device()*	功能说明: 将接收到的数据发送至设备*	形    参:*	返 回 值:  **********************************************************************************************************/void mqtt_to_device(){		uint8_t err;		StringInfo *message;		message	= (StringInfo*)OSMboxPend(mqtt_recvMseeageMbox, 1000, &err);					if(message != NULL)	//包含消息		{				write_modbus_data(message->p);	//写入数据				myfree(SRAMEX ,message->p);//释放内部数据				//OSTimeDly(1000);		}	}void find_diff(char* buf, char* string) {	}/***********************************************************************************************************	函 数 名: int READ_MODBUS_DATA(DEVICE_PARAMS *device)*	功能说明: 读取当前节点上的modbus数据*	形    参: DEVICE_PARAMS *device  当前设备*	返 回 值: 1: 成功 0:失败**********************************************************************************************************/int read_device_data1(DEVICE_PARAMS *device, char* buf){				DEVICE_PARAMS *current_device=device;			GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;		GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;			while(current_device->params != NULL)		{								if (current_device->protocol == MODBUS_READ)				{						protocol_485=1;						uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度						mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);						if (currentModbusParams->functionCode == 0x03 | currentModbusParams->functionCode == 0x01)						{								bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,																		 currentModbusParams->registerAddress,																		 currentModbusParams->registerByteNum /2,																		 data);								if (success)								{										uint32_t value;										if (currentModbusParams->registerByteNum == 4)										{												value = (uint32_t)data[0] | data[1];										}										else if (currentModbusParams->registerByteNum == 2)										{												value = data[0];										}												if (currentModbusParams->decimalPoint == 0)										{												currentModbusParams->value = value;										}										else										{												float convertedValue = (float)value / pow(10, currentModbusParams->decimalPoint);												currentModbusParams->value=convertedValue;										}										sprintf(buf + strlen(buf), "{\"deviceId\":\"%s\",\"%s\":%d},", 																							current_device->deviceID, currentModbusParams->keyword, value);																	}//								else//								{//										printf("read modbus register fail\n");//										return 0;//								}																/* 每读完一个寄存器,进行message判断 */								mqtt_to_device();																currentModbusParams = currentModbusParams->nextParams;								if (currentModbusParams == NULL)									{										current_device = current_device->nextDevice;										currentModbusParams = current_device->params->gateway_read_modbus_command;										if(current_device == NULL)										{												sprintf(buf + strlen(buf) - 1, ""); 												return 1;										}								}																		}									}				else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)				{						protocol_485=2;						uint8_t read_buf[10];						uint32_t dltValue;												currentDLT645Params->rxLen = 0;						memset(read_buf, 0, 10);						memset(currentDLT645Params->data, 0, 10);						dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);						int8_t rs;						if (current_device->protocol == DLT645_2007)						{								rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);						}						else if (current_device->protocol == DLT645_1997)						{								rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);						}						if (rs != -1)						{								if (rs <= 4)								{										memcpy(currentDLT645Params->data, read_buf, 4);										currentDLT645Params->rxLen = 4;								}								else if (rs == 5)								{										memcpy(currentDLT645Params->data, read_buf, 5);										currentDLT645Params->rxLen = 5;								}							else if (rs > 5)								{										memcpy(currentDLT645Params->data, read_buf, 9);										currentDLT645Params->rxLen = 9;								}																							dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|																		currentDLT645Params->data[2] << 8  | currentDLT645Params->data[3];													sprintf(buf + strlen(buf), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",																				currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],																				currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],																				currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],																				currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);																	}//						else//						{//								currentDLT645Params->rxLen = 0;//								printf("read DLT current data fail\n");//						}												currentDLT645Params = currentDLT645Params->nextParams;										if (currentDLT645Params == NULL)									{										current_device = current_device->nextDevice;										currentDLT645Params = current_device->params->gateway_read_dlt645_command;										if(current_device == NULL)										{												sprintf(buf + strlen(buf) - 1, ""); 												return 1;										}								}														}									}				return 1;}int read_device_data2(DEVICE_PARAMS *device, char* buf){		DEVICE_PARAMS *current_device=device;			GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;		GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;			while(current_device->params != NULL)		{								if (current_device->protocol == MODBUS_READ)				{						protocol_485=1;						uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度						mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);						if (currentModbusParams->functionCode == 0x03 | currentModbusParams->functionCode == 0x01)						{								bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,																		 currentModbusParams->registerAddress,																		 currentModbusParams->registerByteNum /2,																		 data);								if (success)								{										uint32_t value;										if (currentModbusParams->registerByteNum == 4)										{												value = (uint32_t)data[0] | data[1];										}										else if (currentModbusParams->registerByteNum == 2)										{												value = data[0];										}										if((value - currentModbusParams->value) != 0)										{												sprintf(buf + strlen(buf), "{\"deviceId\":\"%s\",\"%s\":%d},", 																									current_device->deviceID, currentModbusParams->keyword, value);												count++;										}											if (currentModbusParams->decimalPoint == 0)										{												currentModbusParams->value = value;										}										else										{												float convertedValue = (float)value / pow(10, currentModbusParams->decimalPoint);												currentModbusParams->value=convertedValue;										}																									}													currentModbusParams = currentModbusParams->nextParams;								if (currentModbusParams == NULL)									{										current_device = current_device->nextDevice;										currentModbusParams = current_device->params->gateway_read_modbus_command;										if(current_device == NULL)										{												sprintf(buf + strlen(buf) - 1, ""); 												return 1;										}								}																		}									}				else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)				{						protocol_485=2;						uint8_t read_buf[10];						uint32_t dltValue;												currentDLT645Params->rxLen = 0;						memset(read_buf, 0, 10);						memset(currentDLT645Params->data, 0, 10);						dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);						int8_t rs;						if (current_device->protocol == DLT645_2007)						{								rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);						}						else if (current_device->protocol == DLT645_1997)						{								rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);						}						if (rs != -1)						{								if (rs <= 4)								{										memcpy(currentDLT645Params->data, read_buf, 4);										currentDLT645Params->rxLen = 4;								}								else if (rs == 5)								{										memcpy(currentDLT645Params->data, read_buf, 5);										currentDLT645Params->rxLen = 5;								}							else if (rs > 5)								{										memcpy(currentDLT645Params->data, read_buf, 9);										currentDLT645Params->rxLen = 9;								}																							dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|																		currentDLT645Params->data[2] << 8  | currentDLT645Params->data[3];													sprintf(buf + strlen(buf), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",																				currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],																				currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],																				currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],																				currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);																	}										currentDLT645Params = currentDLT645Params->nextParams;										if (currentDLT645Params == NULL)									{										current_device = current_device->nextDevice;										currentDLT645Params = current_device->params->gateway_read_dlt645_command;										if(current_device == NULL)										{												sprintf(buf + strlen(buf) - 1, ""); 												return 1;										}								}														}									}				return 1;	}/***********************************************************************************************************	函 数 名:void WRITE_MODBUS_DATA(char* cJSONstring)*	功能说明: 接收mqtt数据并写入modbus寄存器*	形    参:char* cJSONstring mqtt接收到的数据*	返 回 值: 无**********************************************************************************************************/void write_modbus_data(char* cJSONstring){			GATEWAY_PARAMS* get;		get = get_gateway_config_params();		DEVICE_PARAMS* current_device = get->device_params;		/* 利用cJSOn_Parse解析数据,获取各类型数据 */		cJSON *root = cJSON_Parse(cJSONstring);		const char *deviceId = cJSON_GetStringValue(cJSON_GetObjectItem(root, "deviceId"));			const cJSON *power = cJSON_GetObjectItemCaseSensitive(root, "power");		const cJSON *temp = cJSON_GetObjectItemCaseSensitive(root, "temp");		const cJSON *mode = cJSON_GetObjectItemCaseSensitive(root, "mode");		const cJSON *fan = cJSON_GetObjectItemCaseSensitive(root, "fan");				while(current_device)		{				char* device_ID = (char*)current_device->deviceID;				GATEWAY_WRITE_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_write_modbus_command;				if(!strcmp(device_ID,deviceId)) //匹配ID				{						OSTimeDly(100);					/* 写入寄存器操作 */						if(power)						{								mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress, 																								currentModbusParams->registerAddress, 																								power->valueint);						}						OSTimeDly(100);						if(temp)						{								currentModbusParams = currentModbusParams->nextParams;								mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress, 																								currentModbusParams->registerAddress, 																								temp->valueint);						}						OSTimeDly(100);						if(mode)						{								currentModbusParams = currentModbusParams->nextParams;								mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress, 																								currentModbusParams->registerAddress, 																								mode->valueint);						}						OSTimeDly(100);						if(fan)						{								currentModbusParams = currentModbusParams->nextParams;								mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress, 																									currentModbusParams->registerAddress, 																									fan->valueint);											}				}				current_device = current_device->nextDevice;		}		cJSON_Delete(root);}/***********************************************************************************************************	函 数 名: void find_difference(char* buf, char* pubJsonStringCopy, char* string)*	功能说明: 比较出参数1和参数2的不同处*	形    参: 参数1:新数据 参数2:旧数据 参数3:输出参数*	返 回 值: 无**********************************************************************************************************/void find_difference(char* buf, char* pubJsonStringCopy, char* string) {    const char* delimiter = "{}";    char* saveptr1;    char* saveptr2;		char* data1 = mymalloc(SRAMEX, strlen(buf));			char* data2 = mymalloc(SRAMEX, strlen(pubJsonStringCopy));				memcpy(data1, buf, strlen(buf));		memcpy(data2, pubJsonStringCopy, strlen(pubJsonStringCopy));     // 利用strtok_r函数分割字符串,并逐一比较    char* token1 = strtok_r((char*)data1, delimiter, &saveptr1);    char* token2 = strtok_r((char*)data2, delimiter, &saveptr2);    while (token1 != NULL && token2 != NULL) 		{        if (strcmp(token1, token2) != 0) 				{            memcpy(string + strlen(string), token1, strlen(token1));        }        token1 = strtok_r(NULL, delimiter, &saveptr1);        token2 = strtok_r(NULL, delimiter, &saveptr2);    }//     // 如果有剩余字符串未比较,则打印剩余字符串//    while (token1 != NULL) {//        sprintf(string + strlen(string),"%s,", token1);//        token1 = strtok_r(NULL, delimiter, &saveptr1);//    }//    while (token2 != NULL) {//        //sprintf(string + strlen(string),"{%s},", token2);//        token2 = strtok_r(NULL, delimiter, &saveptr2);//    }		myfree(SRAMEX, data1);		myfree(SRAMEX, data2);}/***********************************************************************************************************	函 数 名: void send_mqtt(char*buf, int jsonCunt)*	功能说明: 将数据发送到mqtt*	形    参: 参数1:读取数据 参数2:第一次发送标志*	返 回 值: 无**********************************************************************************************************/void send_mqtt(char*buf){		GATEWAY_PARAMS *get;		get= get_gateway_config_params();		sprintf(pubJsonString,"{\"DEVICEID\":\"%s\",\"data\":[%s]}",get->deviceId, buf);	// 组成要发送的json语句										int msg = MBOX_USER_PUBLISHQOS0;			if(mqtt_connectFlag==1) OSMboxPost(mqtt_sendMseeageMbox, &msg);}
 |