task.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. #include "task.h"
  2. #include "cjson.h"
  3. #include "myFile.h"
  4. #include "gateway_message.h"
  5. #include "log.h"
  6. #include "malloc.h"
  7. #include "sx1276.h"
  8. #include "dlt645.h"
  9. #include "usart.h"
  10. #include "node_data_acquisition.h"
  11. #include "sys_mqtt.h"
  12. #include "sys_http.h"
  13. #include "node_message.h"
  14. #include "usart.h"
  15. #include "mmodbus.h"
  16. #include "sys_mqtt.h"
  17. #include "gateway_message.h"
  18. #include "MQTTClient.h"
  19. #include "cJSON.h"
  20. #include "time_count.h"
  21. #include "dlt645_1997_private.h"
  22. char string[512];
  23. uint8_t read_cnt = 0;
  24. uint8_t count = 0;
  25. uint8_t jsonCunt = 1;
  26. /*
  27. *********************************************************************************************************
  28. * 函 数 名: void data_task(void *pdata)
  29. * 功能说明: 主要是data_task处理线程,优先级高。其运行逻辑是将nandflash中的数据解析出来轮询发送数据
  30. * 形 参:无
  31. * 返 回 值: 无
  32. *********************************************************************************************************
  33. */
  34. void data_task(void *pdata)
  35. {
  36. OS_CPU_SR cpu_sr;
  37. pdata = pdata;
  38. dlt645_init(100);
  39. mmodbus_init(1);
  40. char *device_config_json = mymalloc(SRAMEX, 9 * 1024);
  41. read_file("device.txt", device_config_json);
  42. addGatewayParams(device_config_json);
  43. myfree(SRAMEX, device_config_json);
  44. GATEWAY_PARAMS *get;
  45. get= get_gateway_config_params();
  46. DEVICE_PARAMS *current_device=get->device_params;
  47. // Config_485_Port(get->baudrate, get->dataBits, get->stopBit, get->parity, get->flowControl);
  48. char *buf = mymalloc(SRAMEX, 9 * 1024); // 接收读取的数据
  49. memset(buf, 0, 9 * 1024);
  50. while (current_device!=NULL)
  51. {
  52. time1 = GetCurrentTime();
  53. if(mqtt_connectFlag)
  54. {
  55. if(jsonCunt || time2 <= time1 - (10 * 1000))// 10s进行一次全数据发送
  56. {
  57. read_device_data1(current_device, buf);
  58. send_mqtt(buf);
  59. jsonCunt = 0;
  60. memset(buf,0,strlen(buf));
  61. current_device=get->device_params;
  62. time2 = GetCurrentTime();
  63. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "data for all");
  64. }
  65. else
  66. {
  67. read_device_data2(current_device, buf);
  68. if(count > 0)// count检测buf内是否含有数据
  69. {
  70. send_mqtt(buf);
  71. memset(buf,0,strlen(buf));
  72. current_device=get->device_params;
  73. count = 0;
  74. // LogPrint(LOG_INFO,__FILE__, __FUNCTION__, __LINE__, "different data");
  75. }
  76. }
  77. }
  78. OSTimeDly(100);
  79. }
  80. myfree(SRAMEX, buf);
  81. }
  82. /*
  83. *********************************************************************************************************
  84. * 函 数 名: void mqtt_to_device()
  85. * 功能说明: 将接收到的数据发送至设备
  86. * 形 参:
  87. * 返 回 值:
  88. *********************************************************************************************************
  89. */
  90. void mqtt_to_device(){
  91. uint8_t err;
  92. StringInfo *message;
  93. message = (StringInfo*)OSMboxPend(mqtt_recvMseeageMbox, 1000, &err);
  94. if(message != NULL) //包含消息
  95. {
  96. write_modbus_data(message->p); //写入数据
  97. myfree(SRAMEX ,message->p);//释放内部数据
  98. //OSTimeDly(1000);
  99. }
  100. }
  101. void find_diff(char* buf, char* string) {
  102. }
  103. /*
  104. *********************************************************************************************************
  105. * 函 数 名: int READ_MODBUS_DATA(DEVICE_PARAMS *device)
  106. * 功能说明: 读取当前节点上的modbus数据
  107. * 形 参: DEVICE_PARAMS *device 当前设备
  108. * 返 回 值: 1: 成功 0:失败
  109. *********************************************************************************************************
  110. */
  111. int read_device_data1(DEVICE_PARAMS *device, char* buf)
  112. {
  113. DEVICE_PARAMS *current_device=device;
  114. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  115. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  116. while(current_device->params != NULL)
  117. {
  118. if (current_device->protocol == MODBUS_READ)
  119. {
  120. protocol_485=1;
  121. uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度
  122. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  123. if (currentModbusParams->functionCode == 0x03 | currentModbusParams->functionCode == 0x01)
  124. {
  125. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  126. currentModbusParams->registerAddress,
  127. currentModbusParams->registerByteNum /2,
  128. data);
  129. if (success)
  130. {
  131. uint32_t value;
  132. if (currentModbusParams->registerByteNum == 4)
  133. {
  134. value = (uint32_t)data[0] | data[1];
  135. }
  136. else if (currentModbusParams->registerByteNum == 2)
  137. {
  138. value = data[0];
  139. }
  140. if (currentModbusParams->decimalPoint == 0)
  141. {
  142. currentModbusParams->value = value;
  143. }
  144. else
  145. {
  146. float convertedValue = (float)value / pow(10, currentModbusParams->decimalPoint);
  147. currentModbusParams->value=convertedValue;
  148. }
  149. sprintf(buf + strlen(buf), "{\"deviceId\":\"%s\",\"%s\":%d},",
  150. current_device->deviceID, currentModbusParams->keyword, value);
  151. }
  152. // else
  153. // {
  154. // printf("read modbus register fail\n");
  155. // return 0;
  156. // }
  157. /* 每读完一个寄存器,进行message判断 */
  158. mqtt_to_device();
  159. currentModbusParams = currentModbusParams->nextParams;
  160. if (currentModbusParams == NULL)
  161. {
  162. current_device = current_device->nextDevice;
  163. currentModbusParams = current_device->params->gateway_read_modbus_command;
  164. if(current_device == NULL)
  165. {
  166. sprintf(buf + strlen(buf) - 1, "");
  167. return 1;
  168. }
  169. }
  170. }
  171. }
  172. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  173. {
  174. protocol_485=2;
  175. uint8_t read_buf[10];
  176. uint32_t dltValue;
  177. currentDLT645Params->rxLen = 0;
  178. memset(read_buf, 0, 10);
  179. memset(currentDLT645Params->data, 0, 10);
  180. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  181. int8_t rs;
  182. if (current_device->protocol == DLT645_2007)
  183. {
  184. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  185. }
  186. else if (current_device->protocol == DLT645_1997)
  187. {
  188. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  189. }
  190. if (rs != -1)
  191. {
  192. if (rs <= 4)
  193. {
  194. memcpy(currentDLT645Params->data, read_buf, 4);
  195. currentDLT645Params->rxLen = 4;
  196. }
  197. else if (rs == 5)
  198. {
  199. memcpy(currentDLT645Params->data, read_buf, 5);
  200. currentDLT645Params->rxLen = 5;
  201. }
  202. else if (rs > 5)
  203. {
  204. memcpy(currentDLT645Params->data, read_buf, 9);
  205. currentDLT645Params->rxLen = 9;
  206. }
  207. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  208. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  209. sprintf(buf + strlen(buf), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  210. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  211. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  212. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  213. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  214. }
  215. // else
  216. // {
  217. // currentDLT645Params->rxLen = 0;
  218. // printf("read DLT current data fail\n");
  219. // }
  220. currentDLT645Params = currentDLT645Params->nextParams;
  221. if (currentDLT645Params == NULL)
  222. {
  223. current_device = current_device->nextDevice;
  224. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  225. if(current_device == NULL)
  226. {
  227. sprintf(buf + strlen(buf) - 1, "");
  228. return 1;
  229. }
  230. }
  231. }
  232. }
  233. return 1;
  234. }
  235. int read_device_data2(DEVICE_PARAMS *device, char* buf)
  236. {
  237. DEVICE_PARAMS *current_device=device;
  238. GATEWAY_READ_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_read_modbus_command;
  239. GATEWAY_READ_DLT645_COMMAND *currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  240. while(current_device->params != NULL)
  241. {
  242. if (current_device->protocol == MODBUS_READ)
  243. {
  244. protocol_485=1;
  245. uint16_t data[currentModbusParams->registerByteNum /2]; // modbus寄存器长度
  246. mmodbus_set16bitOrder(current_device->MDBbigLittleFormat);
  247. if (currentModbusParams->functionCode == 0x03 | currentModbusParams->functionCode == 0x01)
  248. {
  249. bool success = mmodbus_readHoldingRegisters16i(currentModbusParams->slaveAddress,
  250. currentModbusParams->registerAddress,
  251. currentModbusParams->registerByteNum /2,
  252. data);
  253. if (success)
  254. {
  255. uint32_t value;
  256. if (currentModbusParams->registerByteNum == 4)
  257. {
  258. value = (uint32_t)data[0] | data[1];
  259. }
  260. else if (currentModbusParams->registerByteNum == 2)
  261. {
  262. value = data[0];
  263. }
  264. if((value - currentModbusParams->value) != 0)
  265. {
  266. sprintf(buf + strlen(buf), "{\"deviceId\":\"%s\",\"%s\":%d},",
  267. current_device->deviceID, currentModbusParams->keyword, value);
  268. count++;
  269. }
  270. if (currentModbusParams->decimalPoint == 0)
  271. {
  272. currentModbusParams->value = value;
  273. }
  274. else
  275. {
  276. float convertedValue = (float)value / pow(10, currentModbusParams->decimalPoint);
  277. currentModbusParams->value=convertedValue;
  278. }
  279. }
  280. currentModbusParams = currentModbusParams->nextParams;
  281. if (currentModbusParams == NULL)
  282. {
  283. current_device = current_device->nextDevice;
  284. currentModbusParams = current_device->params->gateway_read_modbus_command;
  285. if(current_device == NULL)
  286. {
  287. sprintf(buf + strlen(buf) - 1, "");
  288. return 1;
  289. }
  290. }
  291. }
  292. }
  293. else if (current_device->protocol == DLT645_2007 || current_device->protocol == DLT645_97)
  294. {
  295. protocol_485=2;
  296. uint8_t read_buf[10];
  297. uint32_t dltValue;
  298. currentDLT645Params->rxLen = 0;
  299. memset(read_buf, 0, 10);
  300. memset(currentDLT645Params->data, 0, 10);
  301. dlt645_set_addr(&dlt645, currentDLT645Params->deviceID645);
  302. int8_t rs;
  303. if (current_device->protocol == DLT645_2007)
  304. {
  305. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_2007);
  306. }
  307. else if (current_device->protocol == DLT645_1997)
  308. {
  309. rs = dlt645_read_data(&dlt645, currentDLT645Params->Identification, read_buf, DLT645_1997);
  310. }
  311. if (rs != -1)
  312. {
  313. if (rs <= 4)
  314. {
  315. memcpy(currentDLT645Params->data, read_buf, 4);
  316. currentDLT645Params->rxLen = 4;
  317. }
  318. else if (rs == 5)
  319. {
  320. memcpy(currentDLT645Params->data, read_buf, 5);
  321. currentDLT645Params->rxLen = 5;
  322. }
  323. else if (rs > 5)
  324. {
  325. memcpy(currentDLT645Params->data, read_buf, 9);
  326. currentDLT645Params->rxLen = 9;
  327. }
  328. dltValue = currentDLT645Params->data[0] << 24 | currentDLT645Params->data[1] << 16|
  329. currentDLT645Params->data[2] << 8 | currentDLT645Params->data[3];
  330. sprintf(buf + strlen(buf), "{\"identifier\":\"%s\",\"deviceID645\":\"%02x%02x%02x%02x%02x%02x\",\"identifier645\":%d,\"value\":%X}",
  331. currentDLT645Params->keyword, currentDLT645Params->deviceID645[0],
  332. currentDLT645Params->deviceID645[1],currentDLT645Params->deviceID645[2],
  333. currentDLT645Params->deviceID645[3],currentDLT645Params->deviceID645[4],
  334. currentDLT645Params->deviceID645[5],currentDLT645Params->Identification,dltValue);
  335. }
  336. currentDLT645Params = currentDLT645Params->nextParams;
  337. if (currentDLT645Params == NULL)
  338. {
  339. current_device = current_device->nextDevice;
  340. currentDLT645Params = current_device->params->gateway_read_dlt645_command;
  341. if(current_device == NULL)
  342. {
  343. sprintf(buf + strlen(buf) - 1, "");
  344. return 1;
  345. }
  346. }
  347. }
  348. }
  349. return 1;
  350. }
  351. /*
  352. *********************************************************************************************************
  353. * 函 数 名:void WRITE_MODBUS_DATA(char* cJSONstring)
  354. * 功能说明: 接收mqtt数据并写入modbus寄存器
  355. * 形 参:char* cJSONstring mqtt接收到的数据
  356. * 返 回 值: 无
  357. *********************************************************************************************************
  358. */
  359. void write_modbus_data(char* cJSONstring)
  360. {
  361. GATEWAY_PARAMS* get;
  362. get = get_gateway_config_params();
  363. DEVICE_PARAMS* current_device = get->device_params;
  364. /* 利用cJSOn_Parse解析数据,获取各类型数据 */
  365. cJSON *root = cJSON_Parse(cJSONstring);
  366. const char *deviceId = cJSON_GetStringValue(cJSON_GetObjectItem(root, "deviceId"));
  367. const cJSON *power = cJSON_GetObjectItemCaseSensitive(root, "power");
  368. const cJSON *temp = cJSON_GetObjectItemCaseSensitive(root, "temp");
  369. const cJSON *mode = cJSON_GetObjectItemCaseSensitive(root, "mode");
  370. const cJSON *fan = cJSON_GetObjectItemCaseSensitive(root, "fan");
  371. while(current_device)
  372. {
  373. char* device_ID = (char*)current_device->deviceID;
  374. GATEWAY_WRITE_MODBUS_COMMAND *currentModbusParams = current_device->params->gateway_write_modbus_command;
  375. if(!strcmp(device_ID,deviceId)) //匹配ID
  376. {
  377. OSTimeDly(100);
  378. /* 写入寄存器操作 */
  379. if(power)
  380. {
  381. mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress,
  382. currentModbusParams->registerAddress,
  383. power->valueint);
  384. }
  385. OSTimeDly(100);
  386. if(temp)
  387. {
  388. currentModbusParams = currentModbusParams->nextParams;
  389. mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress,
  390. currentModbusParams->registerAddress,
  391. temp->valueint);
  392. }
  393. OSTimeDly(100);
  394. if(mode)
  395. {
  396. currentModbusParams = currentModbusParams->nextParams;
  397. mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress,
  398. currentModbusParams->registerAddress,
  399. mode->valueint);
  400. }
  401. OSTimeDly(100);
  402. if(fan)
  403. {
  404. currentModbusParams = currentModbusParams->nextParams;
  405. mmodbus_writeHoldingRegister16i(currentModbusParams->slaveAddress,
  406. currentModbusParams->registerAddress,
  407. fan->valueint);
  408. }
  409. }
  410. current_device = current_device->nextDevice;
  411. }
  412. cJSON_Delete(root);
  413. }
  414. /*
  415. *********************************************************************************************************
  416. * 函 数 名: void find_difference(char* buf, char* pubJsonStringCopy, char* string)
  417. * 功能说明: 比较出参数1和参数2的不同处
  418. * 形 参: 参数1:新数据 参数2:旧数据 参数3:输出参数
  419. * 返 回 值: 无
  420. *********************************************************************************************************
  421. */
  422. void find_difference(char* buf, char* pubJsonStringCopy, char* string)
  423. {
  424. const char* delimiter = "{}";
  425. char* saveptr1;
  426. char* saveptr2;
  427. char* data1 = mymalloc(SRAMEX, strlen(buf));
  428. char* data2 = mymalloc(SRAMEX, strlen(pubJsonStringCopy));
  429. memcpy(data1, buf, strlen(buf));
  430. memcpy(data2, pubJsonStringCopy, strlen(pubJsonStringCopy));
  431. // 利用strtok_r函数分割字符串,并逐一比较
  432. char* token1 = strtok_r((char*)data1, delimiter, &saveptr1);
  433. char* token2 = strtok_r((char*)data2, delimiter, &saveptr2);
  434. while (token1 != NULL && token2 != NULL)
  435. {
  436. if (strcmp(token1, token2) != 0)
  437. {
  438. memcpy(string + strlen(string), token1, strlen(token1));
  439. }
  440. token1 = strtok_r(NULL, delimiter, &saveptr1);
  441. token2 = strtok_r(NULL, delimiter, &saveptr2);
  442. }
  443. // // 如果有剩余字符串未比较,则打印剩余字符串
  444. // while (token1 != NULL) {
  445. // sprintf(string + strlen(string),"%s,", token1);
  446. // token1 = strtok_r(NULL, delimiter, &saveptr1);
  447. // }
  448. // while (token2 != NULL) {
  449. // //sprintf(string + strlen(string),"{%s},", token2);
  450. // token2 = strtok_r(NULL, delimiter, &saveptr2);
  451. // }
  452. myfree(SRAMEX, data1);
  453. myfree(SRAMEX, data2);
  454. }
  455. /*
  456. *********************************************************************************************************
  457. * 函 数 名: void send_mqtt(char*buf, int jsonCunt)
  458. * 功能说明: 将数据发送到mqtt
  459. * 形 参: 参数1:读取数据 参数2:第一次发送标志
  460. * 返 回 值: 无
  461. *********************************************************************************************************
  462. */
  463. void send_mqtt(char*buf){
  464. GATEWAY_PARAMS *get;
  465. get= get_gateway_config_params();
  466. sprintf(pubJsonString,"{\"DEVICEID\":\"%s\",\"data\":[%s]}",get->deviceId, buf); // 组成要发送的json语句
  467. int msg = MBOX_USER_PUBLISHQOS0;
  468. if(mqtt_connectFlag==1) OSMboxPost(mqtt_sendMseeageMbox, &msg);
  469. }