data_task.c 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. #include "data_task.h"
  2. #include "usart.h"
  3. #include "sys_mqtt.h"
  4. #include "sys_http.h"
  5. #include "mmodbus.h"
  6. #include "dlt645_port.h"
  7. #include "gd32_flash.h"
  8. #include "protocol.h"
  9. #include "timer.h"
  10. #include "led.h"
  11. #include "tcp_server.h"
  12. #include "log.h"
  13. void protocolsModeFunc(GATEWAY_PARAMS* current_device, char* string);
  14. void transparentModeFunc(DEVICE_PARAMS* current_device);
  15. uint8_t recv_state = 0;
  16. uint8_t mode = 0;
  17. uint8_t def = 0;
  18. uint8_t startFlag = 0;// 读取数据初始标志位
  19. int time1,time2,size;
  20. void data_task(void *pdata)
  21. {
  22. dlt645_init(1); // 若读不到数据,则延时 参数 秒
  23. mmodbus_init(10);// 若读不到数据,则延时 参数 秒
  24. GATEWAY_PARAMS *get;
  25. char *device_config_json=pvPortMalloc( 10 *1024 );
  26. memset(device_config_json,0,strlen(device_config_json));
  27. portENTER_CRITICAL();
  28. read_data_from_flash(device_config_json);
  29. addGatewayParams(device_config_json);
  30. vPortFree(device_config_json);
  31. portEXIT_CRITICAL();
  32. get= get_gateway_config_params();
  33. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"device params not empty");
  34. DEVICE_PARAMS *current_device=get->device_params;
  35. char *string = pvPortMalloc(3 * 1024); // 接收读取数据
  36. if(string == NULL)
  37. {
  38. LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"buf or string malloc fail");
  39. }
  40. memset(string,0,strlen(string));
  41. while (current_device!=NULL)
  42. {
  43. if(ProtocolsModeFlag)
  44. {
  45. protocolsModeFunc(get,string);
  46. }
  47. else if(TransparentModeFlag)
  48. {
  49. transparentModeFunc(current_device);
  50. }
  51. current_device=get->device_params;
  52. vTaskDelay(500);
  53. }
  54. vPortFree(string);
  55. LogPrint(LOG_ERROR,__FILE__,__FUNCTION__,__LINE__,"data_task return");
  56. }
  57. /*
  58. *********************************************************************************************************
  59. * 函 数 名: int compareArrays(uint8_t arr1[], uint8_t arr2[], int size)
  60. * 功能说明: 比较两个数组是否相同
  61. * 形 参: arr1[] 数组1,arr2[] 数组2,size 比较数组的大小
  62. * 返 回 值: 1: 相同 0:不相同
  63. *********************************************************************************************************
  64. */
  65. int compareArrays(uint8_t arr1[], uint8_t arr2[], int size) {
  66. for (int i = 0; i < size; ++i) {
  67. if (arr1[i] != arr2[i]) {
  68. return 1; // 两个数组不相同,返回0
  69. }
  70. }
  71. return 0; // 两个数组相同,返回1
  72. }
  73. /*
  74. *********************************************************************************************************
  75. * 函 数 名: int READ_MODBUS_DATA(DEVICE_PARAMS *device)
  76. * 功能说明: 读取当前节点上的modbus数据
  77. * 形 参: DEVICE_PARAMS *device 当前设备
  78. * 返 回 值: 1: 成功 0:失败
  79. *********************************************************************************************************
  80. */
  81. int read_device_data(DEVICE_PARAMS *device, char* string)
  82. {
  83. DEVICE_PARAMS *current_device=device;
  84. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  85. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  86. while(current_device->params != NULL)
  87. {
  88. gd_eval_led_toggle(LED_485TX);
  89. if (current_device->protocol == MODBUS_READ)
  90. {
  91. uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度
  92. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  93. // 读单个寄存器
  94. if (currentModbusParams->functionCode == 0x03)
  95. {
  96. // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data);
  97. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  98. currentModbusParams->registerAddress,
  99. currentModbusParams->registerByteNum /2,
  100. data);
  101. if (success)
  102. {
  103. uint32_t value;
  104. if (currentModbusParams->registerByteNum == 4)
  105. {
  106. value = (uint32_t)data[0] | data[1];
  107. }
  108. else if (currentModbusParams->registerByteNum == 2)
  109. {
  110. value = data[0];
  111. }
  112. if(mode == 0)// all
  113. {
  114. sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},",
  115. current_device->deviceID, currentModbusParams->keyword, value);
  116. }
  117. else if(mode == 1)// def
  118. {
  119. if((value - currentModbusParams->value) != 0)
  120. {
  121. sprintf(string + strlen(string), "{\"deviceId\":\"%s\",\"%s\":%d},",
  122. current_device->deviceID, currentModbusParams->keyword, value);
  123. def = 1;
  124. }
  125. }
  126. if (currentModbusParams->decimalPoint == 0)
  127. {
  128. currentModbusParams->value = value;
  129. }
  130. else
  131. {
  132. currentModbusParams->value = value / my_pow(10,currentModbusParams->decimalPoint);
  133. }
  134. }
  135. currentModbusParams = currentModbusParams->nextParams;
  136. if (currentModbusParams == NULL)
  137. {
  138. current_device = current_device->nextDevice;
  139. currentModbusParams = current_device->params->gateway_read_modbus_command;
  140. if(current_device == NULL)
  141. {
  142. sprintf(string + strlen(string) - 1, "");
  143. return 1;
  144. }
  145. }
  146. }
  147. }
  148. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  149. {
  150. uint8_t read_buf[10];
  151. uint32_t dltValue;
  152. currentDLT645Params->rxLen = 0;
  153. memset(read_buf, 0, 10);
  154. memset(currentDLT645Params->data, 0, 10);
  155. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  156. int8_t rs;
  157. if (current_device->protocol == DLT645_2007)
  158. {
  159. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  160. }
  161. else if (current_device->protocol == DLT645_1997)
  162. {
  163. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  164. }
  165. if (rs != -1)
  166. {
  167. if(mode == 0)// all
  168. {
  169. if (rs <= 4)
  170. {
  171. memcpy(currentDLT645Params->data, read_buf, 4);
  172. currentDLT645Params->rxLen = 4;
  173. }
  174. else if (rs == 5)
  175. {
  176. memcpy(currentDLT645Params->data, read_buf, 5);
  177. currentDLT645Params->rxLen = 5;
  178. }
  179. else if (rs > 5)
  180. {
  181. memcpy(currentDLT645Params->data, read_buf, 9);
  182. currentDLT645Params->rxLen = 9;
  183. }
  184. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  185. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  186. sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  187. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  188. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  189. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  190. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  191. }
  192. else if(mode == 1)//def
  193. {
  194. if(compareArrays(read_buf,currentDLT645Params->data,10))// 不相同1,相同0
  195. {
  196. if (rs <= 4)
  197. {
  198. memcpy(currentDLT645Params->data, read_buf, 4);
  199. currentDLT645Params->rxLen = 4;
  200. }
  201. else if (rs == 5)
  202. {
  203. memcpy(currentDLT645Params->data, read_buf, 5);
  204. currentDLT645Params->rxLen = 5;
  205. }
  206. else if (rs > 5)
  207. {
  208. memcpy(currentDLT645Params->data, read_buf, 9);
  209. currentDLT645Params->rxLen = 9;
  210. }
  211. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  212. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  213. sprintf(string + strlen(string), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  214. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  215. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  216. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  217. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  218. def = 1;
  219. }
  220. }
  221. }
  222. currentDLT645Params = currentDLT645Params->nextParams;
  223. if (currentDLT645Params == NULL)
  224. {
  225. current_device = current_device->nextDevice;
  226. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  227. if(current_device == NULL)
  228. {
  229. sprintf(string + strlen(string) - 1, "");
  230. return 1;
  231. }
  232. }
  233. }
  234. }
  235. return 1;
  236. }
  237. /*
  238. *********************************************************************************************************
  239. * 函 数 名: void send_mqtt(char*buf, int jsonCunt)
  240. * 功能说明: 将数据发送到mqtt
  241. * 形 参: 参数1:读取数据 参数2:第一次发送标志
  242. * 返 回 值: 无
  243. *********************************************************************************************************
  244. */
  245. //int size;
  246. //void send_mqtt(char*buf){
  247. // LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"send to mqtt");
  248. // GATEWAY_PARAMS *get;
  249. // get= get_gateway_config_params();
  250. // size = xPortGetFreeHeapSize();
  251. // char *pubJsonString = pvPortMalloc(5 * 1024);
  252. // if(pubJsonString == NULL)
  253. // {
  254. // LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"mqtt_data malloc fail");
  255. // }
  256. // sprintf(pubJsonString,"{\"DEVICEID\":\"%s\",\"data\":[%s]}",get->deviceId, buf); // 组成要发送的json语句
  257. // mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic);
  258. // vPortFree(pubJsonString);
  259. //}
  260. /*
  261. *********************************************************************************************************
  262. * 函 数 名:void WRITE_MODBUS_DATA(char* cJSONstring)
  263. * 功能说明: 接收mqtt数据并写入modbus寄存器
  264. * 形 参:char* cJSONstring mqtt接收到的数据
  265. * 返 回 值: 无
  266. *********************************************************************************************************
  267. */
  268. void write_modbus_data(char* JSON_STRING)
  269. {
  270. JSON_CMD jsonMsg;
  271. GATEWAY_PARAMS* get;
  272. get = get_gateway_config_params();
  273. DEVICE_PARAMS* current_device = get->device_params;
  274. jsonMsg.parameter =parseIntField(JSON_STRING, "\"parameter\":");
  275. parseStringField(JSON_STRING, "\"deviceId\":\"", (char*)&jsonMsg.deviceId);
  276. parseStringField(JSON_STRING, "\"identifier\":\"", (char*)&jsonMsg.identifier);
  277. parseStringField(JSON_STRING, "\"messageId\":\"", (char*)&jsonMsg.messageId);
  278. parseStringField(JSON_STRING, "\"action\":\"", (char*)&jsonMsg.action);
  279. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"write to mqtt");
  280. while(current_device)
  281. {
  282. char* device_ID = (char*)current_device->deviceID;
  283. GATEWAY_WRITE_MODBUS_COMMAND *currentModbusWriteParams = current_device->params->gateway_write_modbus_command;
  284. GATEWAY_READ_MODBUS_COMMAND *currentModbusReadParams = current_device->params->gateway_read_modbus_command;
  285. char* pubJsonString = pvPortMalloc(150);
  286. switch(atoi((char*)&jsonMsg.action))
  287. {
  288. case 0:/* write */
  289. if(!strcmp(device_ID,(char*)&jsonMsg.deviceId))
  290. {
  291. while(currentModbusWriteParams != NULL)
  292. {
  293. if(!strcmp((char*)&currentModbusWriteParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性
  294. {
  295. recv_state = 0;
  296. delay_ms(100);
  297. mmodbus_writeHoldingRegister16i(currentModbusWriteParams->slaveAddress, currentModbusWriteParams->registerAddress, jsonMsg.parameter);
  298. // sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d}",
  299. // jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state); // 组成要发送的json语句
  300. // mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic);
  301. delay_ms(100);
  302. }
  303. currentModbusWriteParams = currentModbusWriteParams->nextParams;
  304. }
  305. }
  306. break;
  307. case 1:/* read */
  308. if(!strcmp(device_ID,(char*)&jsonMsg.deviceId))
  309. {
  310. while(currentModbusReadParams != NULL)
  311. {
  312. if(!strcmp((char*)&currentModbusReadParams->keyword,(char*)&jsonMsg.identifier)) //匹配ID和属性
  313. {
  314. delay_ms(100);
  315. recv_state = 0;
  316. uint16_t data[currentModbusReadParams->registerByteNum /2]; // modbus寄存器长度
  317. bool success = mmodbus_readHoldingRegisters16i(currentModbusReadParams->slaveAddress,currentModbusReadParams->registerAddress,
  318. currentModbusReadParams->registerByteNum /2,data);
  319. if (success)
  320. {
  321. recv_state = 1;
  322. uint32_t value;
  323. if (currentModbusReadParams->registerByteNum == 4)
  324. {
  325. value = (uint32_t)data[0] | data[1];
  326. }
  327. else if (currentModbusReadParams->registerByteNum == 2)
  328. {
  329. value = data[0];
  330. }
  331. sprintf(pubJsonString,"{\"action\":\"%s\",\"identifier\":\"%s\",\"deviceId\":\"%s\",\"messageId\":\"%s\",\"state\":%d,\"parameter\":%d}",
  332. jsonMsg.action,jsonMsg.identifier,jsonMsg.deviceId,jsonMsg.messageId,recv_state,value); // 组成要发送的json语句
  333. mqtt_publish_data(pubJsonString, QOS0, strlen(pubJsonString), (char*)&get->messageTopic);
  334. delay_ms(100);
  335. }
  336. }
  337. currentModbusReadParams = currentModbusReadParams->nextParams;
  338. }
  339. }
  340. break;
  341. case 3:/* reboot */
  342. __set_PRIMASK(1);
  343. NVIC_SystemReset();
  344. break;
  345. }
  346. current_device = current_device->nextDevice;
  347. vPortFree(pubJsonString);
  348. }
  349. }
  350. // 重定义pow函数
  351. uint32_t my_pow(int base, int exponent) {
  352. uint32_t result = 1;
  353. for(int i = 0; i < exponent; i++) {
  354. result *= base;
  355. }
  356. return result;
  357. }
  358. void protocolsModeFunc(GATEWAY_PARAMS* get, char* string)
  359. {
  360. if(mqtt_connectFlag)
  361. {
  362. time1 = GetCurrentTime();
  363. sprintf(string,"{\"deviceId\":\"%s\",\"data\":[",get->deviceId); // 组成要发送的json语句
  364. if(startFlag && time2 <= time1 - ( 10 * 1000))// 60s进行一次全数据发送
  365. {
  366. mode = 0;//all
  367. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:All data");
  368. read_device_data(get->device_params,string);
  369. sprintf(string + strlen(string),"]}");
  370. mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic);
  371. time2 = GetCurrentTime();
  372. }
  373. else
  374. {
  375. mode = 1;// def
  376. read_device_data(get->device_params, string);
  377. if(def)// 检测string是否含有数据
  378. {
  379. def = 0;
  380. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"protocolsMode:Different data");
  381. sprintf(string + strlen(string),"]}");
  382. mqtt_publish_data(string, QOS0, strlen(string), (char*)&get->messageTopic);
  383. time2 = GetCurrentTime();
  384. }
  385. }
  386. startFlag = 1;
  387. memset(string,0,strlen(string));
  388. }
  389. }
  390. int transparent_data(DEVICE_PARAMS *device)
  391. {
  392. DEVICE_PARAMS *current_device=device;
  393. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  394. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  395. while(current_device->params != NULL)
  396. {
  397. gd_eval_led_toggle(LED_485TX);
  398. if (current_device->protocol == MODBUS_READ)
  399. {
  400. uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度
  401. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  402. // 读单个寄存器
  403. if (currentModbusParams->functionCode == 0x03)
  404. {
  405. // bool success = mmodbus_readHoldingRegisters16i(0x17,0x00,0x02,data);
  406. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  407. currentModbusParams->registerAddress,
  408. currentModbusParams->registerByteNum /2,
  409. data);
  410. if (success)
  411. {
  412. uint32_t value;
  413. if((value - currentModbusParams->value) != 0)
  414. {
  415. for (uint8_t i = 0; i < mmodbus.rxBuf[2]; i += 2)
  416. {
  417. uint8_t H = mmodbus.rxBuf[i + 3];
  418. mmodbus.rxBuf[i + 3] = mmodbus.rxBuf[i + 3 + 1];
  419. mmodbus.rxBuf[i + 3 + 1] = H;
  420. }
  421. gd_com_485_send(mmodbus.rxBuf,mmodbus.rxIndex);
  422. vTaskDelay(100);
  423. }
  424. }
  425. currentModbusParams = currentModbusParams->nextParams;
  426. if (currentModbusParams == NULL)
  427. {
  428. current_device = current_device->nextDevice;
  429. currentModbusParams = current_device->params->gateway_read_modbus_command;
  430. if(current_device == NULL)
  431. {
  432. return 1;
  433. }
  434. }
  435. }
  436. }
  437. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  438. {
  439. uint8_t read_buf[10];
  440. memset(read_buf, 0, 10);
  441. memset(currentDLT645Params->data, 0, 10);
  442. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  443. int8_t rs;
  444. if (current_device->protocol == DLT645_2007)
  445. {
  446. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  447. }
  448. else if (current_device->protocol == DLT645_1997)
  449. {
  450. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  451. }
  452. if (rs != -1)
  453. {
  454. if(compareArrays(read_buf,currentDLT645Params->data,10))// 不相同1,相同0
  455. {
  456. portENTER_CRITICAL();
  457. gd_com_485_send(mmodbus.rxBuf,mmodbus.rxIndex);
  458. portEXIT_CRITICAL();
  459. vTaskDelay(100);
  460. }
  461. }
  462. currentDLT645Params = currentDLT645Params->nextParams;
  463. if (currentDLT645Params == NULL)
  464. {
  465. current_device = current_device->nextDevice;
  466. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  467. if(current_device == NULL)
  468. {
  469. return 1;
  470. }
  471. }
  472. }
  473. gd_eval_led_off(LED_485TX);
  474. }
  475. return 1;
  476. }
  477. void transparentModeFunc(DEVICE_PARAMS* current_device)
  478. {
  479. LogPrint(LOG_INFO,__FILE__,__FUNCTION__,__LINE__,"transparentMode:All data");
  480. printf("transparentMode:All data\n");
  481. transparent_data(current_device);
  482. }