#include "data_task.h" #include "usart.h" #include "sys_mqtt.h" #include "sys_http.h" #include "mmodbus.h" #include "gateway_message.h" #include "dlt645_port.h" #include "myFile.h" #include "timer.h" #include "led.h" #include "tcp_server.h" #include "log.h" #include "app_ethernet.h" void protocolsModeFunc(GATEWAY_PARAMS* current_device, char* string); void transparentModeFunc(DEVICE_PARAMS* current_device); //recv_state:读写标志位 0:失败 1:成功 //mode:读取数据的方式 0:全部数据 1:不同数据 //def:检测mode = 1,string是否含有数据 //startFlag:读数据启动位 uint8_t recv_state = 0, mode = 0, def = 0, startFlag = 0; int time1,time2; void data_task(void const * argument) { dlt645_init(10); // 若读不到数据,则延时 参数 秒 mmodbus_init(10);// 若读不到数据,则延时 参数 秒 GATEWAY_PARAMS *get; char *device_config_json = mymalloc(SRAMEX, 20 * 1024); if(device_config_json == NULL) LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"device_config_json malloc fail"); memset(device_config_json,0,strlen(device_config_json)); read_file("device.txt", device_config_json); addGatewayParams(device_config_json); myfree(SRAMEX,device_config_json); get= get_gateway_config_params(); LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"device params not empty"); DEVICE_PARAMS *current_device=get->device_params; char *string = mymalloc(SRAMEX, 5 * 1024); // 接收读取数据 if(string == NULL) LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"string malloc fail"); memset(string,0,strlen(string)); while (current_device!=NULL) { if(ProtocolsModeFlag) { protocolsModeFunc(get,string); } else if(TransparentModeFlag) { transparentModeFunc(current_device); } current_device=get->device_params; HAL_GPIO_WritePin(GPIOF,GPIO_PIN_6,GPIO_PIN_RESET); vTaskDelay(500); } vPortFree(string); LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"data_task return"); } /* ********************************************************************************************************* * 函 数 名: int compareArrays(uint8_t arr1[], uint8_t arr2[], int size) * 功能说明: 比较两个数组是否相同 * 形 参: arr1[] 数组1,arr2[] 数组2,size 比较数组的大小 * 返 回 值: 1: 相同 0:不相同 ********************************************************************************************************* */ int compareArrays(uint8_t arr1[], uint8_t arr2[], int size) { for (int i = 0; i < size; ++i) { if (arr1[i] != arr2[i]) { return 1; // 两个数组不相同,返回0 } } return 0; // 两个数组相同,返回1 } /* ********************************************************************************************************* * 函 数 名: int READ_MODBUS_DATA(DEVICE_PARAMS *device) * 功能说明: 读取当前节点上的modbus数据 * 形 参: DEVICE_PARAMS *device 当前设备 * 返 回 值: 1: 成功 0:失败 ********************************************************************************************************* */ int read_device_data(DEVICE_PARAMS *device, char* string) { DEVICE_PARAMS *current_device=device; GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command; GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command; while(current_device->params != NULL) { if (current_device->protocol == MODBUS_READ) { uint16_t *data = mymalloc(SRAMEX,currentModbusParams->registerByteNum /2); // modbus寄存器长度 mmodbus_set16bitOrder(current_device->MDBbigLittleFormat); // 读单个寄存器 if (currentModbusParams->functionCode == 0x03) { // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data); bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress, currentModbusParams->registerAddress, currentModbusParams->registerByteNum /2, data); if (success) { uint32_t value; if (currentModbusParams->registerByteNum == 4) { value = (uint32_t)data[0] | data[1]; } else if (currentModbusParams->registerByteNum == 2) { value = data[0]; } if(mode == 0)// all { sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},", current_device->deviceID, currentModbusParams->keyword, value); } else if(mode == 1)// def { if((value - currentModbusParams->value) != 0) { sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},", current_device->deviceID, currentModbusParams->keyword, value); def = 1; } } if (currentModbusParams->decimalPoint == 0) { currentModbusParams->value = value; } else { currentModbusParams->value = value / my_pow(10,currentModbusParams->decimalPoint); } } currentModbusParams = currentModbusParams->nextParams; if (currentModbusParams == NULL) { current_device = current_device->nextDevice; currentModbusParams = current_device->params->gateway_read_modbus_command; if(current_device == NULL) { sprintf(string + strlen(string) - 1, ""); return 1; } } } } else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97) { uint8_t read_buf[10]; uint32_t dltValue; currentDLT645Params->rxLen = 0; memset(read_buf, 0, 10); memset(currentDLT645Params->data, 0, 10); dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645); int8_t rs; if (current_device->protocol == DLT645_2007) { rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007); } else if (current_device->protocol == DLT645_1997) { rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997); } if (rs != -1) { if(mode == 0)// all { if (rs <= 4) { memcpy(currentDLT645Params->data, read_buf, 4); currentDLT645Params->rxLen = 4; } else if (rs == 5) { memcpy(currentDLT645Params->data, read_buf, 5); currentDLT645Params->rxLen = 5; } else if (rs > 5) { memcpy(currentDLT645Params->data, read_buf, 9); currentDLT645Params->rxLen = 9; } dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16| currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3]; sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}", currentDLT645Params->keyword, currentDLT645Params->deviceID645[0], currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2], currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4], currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue); } else if(mode == 1)//def { if(compareArrays(read_buf,currentDLT645Params->data,10))// 不相同1,相同0 { if (rs <= 4) { memcpy(currentDLT645Params->data, read_buf, 4); currentDLT645Params->rxLen = 4; } else if (rs == 5) { memcpy(currentDLT645Params->data, read_buf, 5); currentDLT645Params->rxLen = 5; } else if (rs > 5) { memcpy(currentDLT645Params->data, read_buf, 9); currentDLT645Params->rxLen = 9; } dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16| currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3]; sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}", currentDLT645Params->keyword, currentDLT645Params->deviceID645[0], currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2], currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4], currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue); def = 1; } } } currentDLT645Params = currentDLT645Params->nextParams; if (currentDLT645Params == NULL) { current_device = current_device->nextDevice; currentDLT645Params = current_device->params->gateway_read_dlt645_command; if(current_device == NULL) { sprintf(string + strlen(string) - 1, ""); return 1; } } } } return 1; } /* ********************************************************************************************************* * 函 数 名:void WRITE_MODBUS_DATA(char* cJSONstring) * 功能说明: 接收mqtt数据并写入modbus寄存器 * 形 参:char* cJSONstring mqtt接收到的数据 * 返 回 值: 无 ********************************************************************************************************* */ void write_modbus_data(char* JSON_STRING) { JSON_CMD jsonMsg; GATEWAY_PARAMS* get; get = get_gateway_config_params(); DEVICE_PARAMS* current_device = get->device_params; jsonMsg.parameter =parseIntField(JSON_STRING, "\"parameter\":"); parseStringField(JSON_STRING, "\"deviceId\":\"", (char*)&jsonMsg.deviceId); parseStringField(JSON_STRING, "\"identifier\":\"", (char*)&jsonMsg.identifier); parseStringField(JSON_STRING, "\"messageId\":\"", (char*)&jsonMsg.messageId); parseStringField(JSON_STRING, "\"action\":\"", (char*)&jsonMsg.action); LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"write to mqtt"); while(current_device) { char* device_ID = (char*)current_device->deviceID; GATEWAY_WRITE_MODBUS_COMMAND *currentModbusWriteParams = current_device->params->gateway_write_modbus_command; GATEWAY_READ_MODBUS_COMMAND *currentModbusReadParams = current_device->params->gateway_read_modbus_command; char* pubJsonString = mymalloc(SRAMEX,150); switch(atoi((char*)&jsonMsg.action)) { case 0:/* write */ if(!strcmp(device_ID,(char*)&jsonMsg.deviceId)) { while(currentModbusWriteParams != NULL) { if(!strcmp((char*)¤tModbusWriteParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性 { recv_state = 0; delay_ms(100); while(mmodbus.done != 1) delay_ms(100); mmodbus_writeHoldingRegister16i(currentModbusWriteParams->slaveAddress, currentModbusWriteParams->registerAddress, jsonMsg.parameter); sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d}", jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state); // 组成要发送的json语句 mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic); delay_ms(100); } currentModbusWriteParams = currentModbusWriteParams->nextParams; } } break; case 1:/* read */ if(!strcmp(device_ID,(char*)&jsonMsg.deviceId)) { while(currentModbusReadParams != NULL) { if(!strcmp((char*)¤tModbusReadParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性 { delay_ms(100); recv_state = 0; uint16_t* data = mymalloc(SRAMEX, currentModbusReadParams->registerByteNum /2); // modbus寄存器长度 while(mmodbus.done != 1) delay_ms(100); bool success = mmodbus_readHoldingRegisters16i(currentModbusReadParams->slaveAddress,currentModbusReadParams->registerAddress, currentModbusReadParams->registerByteNum /2,data); if (success) { recv_state = 1; uint32_t value; if (currentModbusReadParams->registerByteNum == 4) { value = (uint32_t)data[0] | data[1]; } else if (currentModbusReadParams->registerByteNum == 2) { value = data[0]; } sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d,\"parameter\":%d}", jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state,value); // 组成要发送的json语句 mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic); delay_ms(100); } myfree(SRAMEX, data); } currentModbusReadParams = currentModbusReadParams->nextParams; } } break; case 3:/* reboot */ __set_PRIMASK(1); NVIC_SystemReset(); break; } current_device = current_device->nextDevice; myfree(SRAMEX, pubJsonString); } } // 重定义pow函数 uint32_t my_pow(int base, int exponent) { uint32_t result = 1; for(int i = 0; i < exponent; i++) { result *= base; } return result; } void protocolsModeFunc(GATEWAY_PARAMS* get, char* string) { if(mqtt_connectFlag) { HAL_GPIO_WritePin(GPIOF,GPIO_PIN_6,GPIO_PIN_SET); time1 = GetCurrentTime(); sprintf(string,"{\"deviceId\":\"%s\",\"data\":[",get->deviceId); // 组成要发送的json语句 if(startFlag && time2 <= time1 - ( 10 * 1000))// 10s进行一次全数据发送 { mode = 0;//all LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:All data"); read_device_data(get->device_params,string); sprintf(string + strlen(string),"]}"); mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic); time2 = GetCurrentTime(); } else { mode = 1;// def read_device_data(get->device_params, string); if(def)// 检测string是否含有数据 { def = 0; LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:Different data"); sprintf(string + strlen(string),"]}"); mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic); time2 = GetCurrentTime(); } } startFlag = 1; memset(string,0,strlen(string)); } } int transparent_data(DEVICE_PARAMS *device) { DEVICE_PARAMS *current_device=device; GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command; GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command; while(current_device->params != NULL) { if (current_device->protocol == MODBUS_READ) { // uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度 uint16_t *data = mymalloc(SRAMEX,currentModbusParams->registerByteNum /2); mmodbus_set16bitOrder(current_device->MDBbigLittleFormat); // 读单个寄存器 if (currentModbusParams->functionCode == 0x03) { // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data); bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress, currentModbusParams->registerAddress, currentModbusParams->registerByteNum /2, data); if (success) { uint32_t value; if((value - currentModbusParams->value) != 0) { for (uint8_t i = 0; i < mmodbus.rxBuf[2]; i += 2) { uint8_t H = mmodbus.rxBuf[i + 3]; mmodbus.rxBuf[i + 3] = mmodbus.rxBuf[i + 3 + 1]; mmodbus.rxBuf[i + 3 + 1] = H; } USART_485_Send(mmodbus.rxBuf,mmodbus.rxIndex); vTaskDelay(100); } } currentModbusParams = currentModbusParams->nextParams; if (currentModbusParams == NULL) { current_device = current_device->nextDevice; currentModbusParams = current_device->params->gateway_read_modbus_command; if(current_device == NULL) { return 1; } } } myfree(SRAMEX, data); } else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97) { uint8_t read_buf[10]; memset(read_buf, 0, 10); memset(currentDLT645Params->data, 0, 10); dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645); int8_t rs; if (current_device->protocol == DLT645_2007) { rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007); } else if (current_device->protocol == DLT645_1997) { rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997); } if (rs != -1) { if(compareArrays(read_buf,currentDLT645Params->data,10))// 不相同1,相同0 { portENTER_CRITICAL(); USART_485_Send(mmodbus.rxBuf,mmodbus.rxIndex); portEXIT_CRITICAL(); vTaskDelay(100); } } currentDLT645Params = currentDLT645Params->nextParams; if (currentDLT645Params == NULL) { current_device = current_device->nextDevice; currentDLT645Params = current_device->params->gateway_read_dlt645_command; if(current_device == NULL) { return 1; } } } } return 1; } void transparentModeFunc(DEVICE_PARAMS* current_device) { HAL_GPIO_WritePin(GPIOF,GPIO_PIN_6,GPIO_PIN_SET); LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"transparentMode:All data"); printf("transparentMode:All data\n"); transparent_data(current_device); }