cs104_slave.c 106 KB


  1. #include "iec_include.h"
  2. #define CS104_DEFAULT_PORT 2404
  3. typedef struct sMessageQueue* MessageQueue;
  4. typedef struct sHighPriorityASDUQueue* HighPriorityASDUQueue;
  5. typedef struct sMasterConnection* MasterConnection;
  6. typedef struct sHighPriorityASDUQueue* HighPriorityASDUQueue;
  7. /***************************************************
  8. * HighPriorityASDUQueue
  9. ***************************************************/
  10. struct sHighPriorityASDUQueue {
  11. int size; /* size of buffer in bytes */
  12. int entryCounter; /* number of messages (ASDU) in the queue */
  13. uint8_t* firstEntry;
  14. uint8_t* lastEntry;
  15. uint8_t* lastInBufferEntry;
  16. uint8_t* buffer;
  17. #if (CONFIG_USE_SEMAPHORES == 1)
  18. platform_mutex_t* queueLock;
  19. #endif
  20. };
  21. typedef struct {
  22. uint8_t msg[256];
  23. int msgSize;
  24. } FrameBuffer;
  25. struct sMessageQueueEntryInfo {
  26. uint64_t entryId;
  27. unsigned int entryState:2;
  28. unsigned int size:8;
  29. };
  30. struct sMessageQueue {
  31. int size; /* size of buffer in bytes */
  32. int entryCounter; /* number of messages (ASDU) in the queue */
  33. uint8_t* firstEntry; /* first entry in FIFO */
  34. uint8_t* lastEntry; /* last entry in FIFO */
  35. uint8_t* lastInBufferEntry; /* entry with highest address in FIFO buffer */
  36. uint64_t entryId; /* ID of next entry; will be increased by one for each new entry */
  37. uint8_t* buffer;
  38. #if (CONFIG_USE_SEMAPHORES == 1)
  39. platform_mutex_t* queueLock;
  40. #endif
  41. };
  42. struct sCS104_RedundancyGroup {
  43. char* name; /**< name of the group to be shown in debug messages, or NULL */
  44. MessageQueue asduQueue; /**< low priority ASDU queue and buffer */
  45. HighPriorityASDUQueue connectionAsduQueue; /**< high priority ASDU queue */
  46. LinkedList allowedClients;
  47. };
  48. typedef enum {
  49. QUEUE_ENTRY_STATE_NOT_USED_OR_CONFIRMED,
  50. QUEUE_ENTRY_STATE_WAITING_FOR_TRANSMISSION,
  51. QUEUE_ENTRY_STATE_SENT_BUT_NOT_CONFIRMED
  52. } QueueEntryState;
  53. typedef enum {
  54. M_CON_STATE_STOPPED, /* only U frames allowed */
  55. M_CON_STATE_STARTED, /* U, I, S frames allowed */
  56. M_CON_STATE_UNCONFIRMED_STOPPED /* only U, S frames allowed */
  57. } MasterConnectionState;
  58. /***************************************************
  59. * Slave
  60. ***************************************************/
  61. struct sCS104_Slave {
  62. CS101_InterrogationHandler interrogationHandler;
  63. void* interrogationHandlerParameter;
  64. CS101_CounterInterrogationHandler counterInterrogationHandler;
  65. void* counterInterrogationHandlerParameter;
  66. CS101_ReadHandler readHandler;
  67. void* readHandlerParameter;
  68. CS101_ClockSynchronizationHandler clockSyncHandler;
  69. void* clockSyncHandlerParameter;
  70. CS101_ResetProcessHandler resetProcessHandler;
  71. void* resetProcessHandlerParameter;
  72. CS101_DelayAcquisitionHandler delayAcquisitionHandler;
  73. void* delayAcquisitionHandlerParameter;
  74. CS101_ASDUHandler asduHandler;
  75. void* asduHandlerParameter;
  76. CS104_ConnectionRequestHandler connectionRequestHandler;
  77. void* connectionRequestHandlerParameter;
  78. CS104_ConnectionEventHandler connectionEventHandler;
  79. void* connectionEventHandlerParameter;
  80. CS104_SlaveRawMessageHandler rawMessageHandler;
  81. void* rawMessageHandlerParameter;
  82. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  83. TLSConfiguration tlsConfig;
  84. #endif
  85. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP)
  86. MessageQueue asduQueue; /**< low priority ASDU queue and buffer */
  87. HighPriorityASDUQueue connectionAsduQueue; /**< high priority ASDU queue */
  88. #endif
  89. int maxLowPrioQueueSize;
  90. int maxHighPrioQueueSize;
  91. int openConnections; /**< number of connected clients */
  92. MasterConnection masterConnections[CONFIG_CS104_MAX_CLIENT_CONNECTIONS]; /**< references to all MasterConnection objects */
  93. #if (CONFIG_USE_SEMAPHORES == 1)
  94. platform_mutex_t* openConnectionsLock;
  95. #endif
  96. #if (CONFIG_USE_THREADS == 1)
  97. bool isThreadlessMode;
  98. #endif
  99. int maxOpenConnections; /**< maximum accepted open client connections */
  100. struct sCS104_APCIParameters conParameters;
  101. struct sCS101_AppLayerParameters alParameters;
  102. bool isStarting;
  103. bool isRunning;
  104. bool stopRunning;
  105. #if (CONFIG_USE_SEMAPHORES == 1)
  106. platform_mutex_t* stateLock; /* protect isStarting, isRunning, stopRunning */
  107. #endif
  108. int tcpPort;
  109. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS)
  110. LinkedList redundancyGroups;
  111. #endif
  112. CS104_ServerMode serverMode;
  113. char* localAddress;
  114. #if (CONFIG_USE_THREADS == 1)
  115. platform_thread_t* listeningThread;
  116. #endif
  117. ServerSocket serverSocket;
  118. LinkedList plugins;
  119. };
  120. typedef struct {
  121. uint64_t entryId; /* required to identify message in server (low-priority) queue */
  122. uint8_t* queueEntry; /* NULL if ASDU is not from low-priority queue */
  123. uint64_t sentTime; /* required for T1 timeout */
  124. int seqNo;
  125. } SentASDUSlave;
  126. struct sMasterConnection {
  127. Socket socket;
  128. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  129. TLSSocket tlsSocket;
  130. #endif
  131. /* can be moved to CS104_Slave struct */
  132. struct sIMasterConnection iMasterConnection;
  133. CS104_Slave slave;
  134. MasterConnectionState state;
  135. unsigned int isUsed:1;
  136. unsigned int isRunning:1;
  137. unsigned int timeoutT2Triggered:1;
  138. unsigned int waitingForTestFRcon:1;
  139. uint16_t maxSentASDUs; /* k-parameter */
  140. int16_t oldestSentASDU; /* oldest sent ASDU in k-buffer */
  141. int16_t newestSentASDU; /* newest sent ASDU in k-buffer */
  142. uint16_t sendCount; /* sent messages - sequence counter */
  143. uint16_t receiveCount; /* received messages - sequence counter */
  144. int unconfirmedReceivedIMessages; /* number of unconfirmed messages received */
  145. /* timeout T2 handling */
  146. uint64_t lastConfirmationTime; /* timestamp when the last confirmation message (for I messages) was sent */
  147. uint64_t nextT3Timeout;
  148. uint64_t nextTestFRConTimeout; /* timeout T1 when waiting for TEST FR con */
  149. SentASDUSlave* sentASDUs;
  150. #if (CONFIG_USE_THREADS == 1)
  151. platform_thread_t* connectionThread;
  152. #endif
  153. #if (CONFIG_USE_SEMAPHORES == 1)
  154. platform_mutex_t* sentASDUsLock;
  155. platform_mutex_t* stateLock;
  156. #endif
  157. HandleSet handleSet;
  158. uint8_t recvBuffer[260];
  159. int recvBufPos;
  160. uint8_t sendBuffer[260];
  161. MessageQueue lowPrioQueue;
  162. HighPriorityASDUQueue highPrioQueue;
  163. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  164. CS104_RedundancyGroup redundancyGroup;
  165. #endif
  166. };
  167. static uint8_t STARTDT_CON_MSG[] = { 0x68, 0x04, 0x0b, 0x00, 0x00, 0x00 };
  168. #define STARTDT_CON_MSG_SIZE 6
  169. static uint8_t STOPDT_CON_MSG[] = { 0x68, 0x04, 0x23, 0x00, 0x00, 0x00 };
  170. #define STOPDT_CON_MSG_SIZE 6
  171. static uint8_t TESTFR_CON_MSG[] = { 0x68, 0x04, 0x83, 0x00, 0x00, 0x00 };
  172. #define TESTFR_CON_MSG_SIZE 6
  173. static uint8_t TESTFR_ACT_MSG[] = { 0x68, 0x04, 0x43, 0x00, 0x00, 0x00 };
  174. #define TESTFR_ACT_MSG_SIZE 6
  175. static MasterConnection
  176. MasterConnection_create(CS104_Slave slave);
  177. static bool
  178. HighPriorityASDUQueue_enqueue(HighPriorityASDUQueue self, CS101_ASDU asdu)
  179. {
  180. int asduSize = asdu->asduHeaderLength + asdu->payloadSize;
  181. if (asduSize > 256 - IEC60870_5_104_APCI_LENGTH) {
  182. printf("CS104 SLAVE: ASDU too large!\n");
  183. return false;
  184. }
  185. int entrySize = sizeof(uint16_t) + asduSize;
  186. #if (CONFIG_USE_SEMAPHORES == 1)
  187. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  188. #endif
  189. bool enqueued = true;
  190. uint16_t msgSize;
  191. uint8_t* nextMsgPtr;
  192. if (self->entryCounter == 0) {
  193. self->firstEntry = self->buffer;
  194. self->lastInBufferEntry = self->firstEntry;
  195. nextMsgPtr = self->buffer;
  196. }
  197. else {
  198. memcpy(&msgSize, self->lastEntry, sizeof(uint16_t));
  199. nextMsgPtr = self->lastEntry + sizeof(uint16_t) + msgSize;
  200. }
  201. if (nextMsgPtr + entrySize > self->buffer + self->size) {
  202. nextMsgPtr = self->buffer;
  203. self->lastInBufferEntry = self->lastEntry;
  204. }
  205. if (self->entryCounter > 0) {
  206. if (nextMsgPtr <= self->firstEntry) {
  207. if (nextMsgPtr + entrySize > self->firstEntry) {
  208. enqueued = false;
  209. }
  210. }
  211. else {
  212. self->lastInBufferEntry = nextMsgPtr;
  213. }
  214. }
  215. if (enqueued) {
  216. self->lastEntry = nextMsgPtr;
  217. self->entryCounter++;
  218. struct sBufferFrame bufferFrame;
  219. Frame frame = BufferFrame_initialize(&bufferFrame, nextMsgPtr + sizeof(uint16_t), 0);
  220. CS101_ASDU_encode(asdu, frame);
  221. msgSize = asduSize;
  222. memcpy(nextMsgPtr, &msgSize, sizeof(uint16_t));
  223. printf("CS104 SLAVE: ASDUs in PRIO-FIFO: %i (new(size=%i/%i): %p, first: %p, last: %p lastInBuf: %p)\n", self->entryCounter, entrySize, asduSize, nextMsgPtr,
  224. self->firstEntry, self->lastEntry, self->lastInBufferEntry);
  225. }
  226. #if (CONFIG_USE_SEMAPHORES == 1)
  227. xSemaphoreGive(self->queueLock);
  228. #endif
  229. return enqueued;
  230. }
  231. static struct sCS104_APCIParameters defaultConnectionParameters = {
  232. /* .k = */ 12,
  233. /* .w = */ 8,
  234. /* .t0 = */ 10,
  235. /* .t1 = */ 15,
  236. /* .t2 = */ 10,
  237. /* .t3 = */ 20
  238. };
  239. static struct sCS101_AppLayerParameters defaultAppLayerParameters = {
  240. /* .sizeOfTypeId = */ 1,
  241. /* .sizeOfVSQ = */ 1,
  242. /* .sizeOfCOT = */ 2,
  243. /* .originatorAddress = */ 0,
  244. /* .sizeOfCA = */ 2,
  245. /* .sizeOfIOA = */ 3,
  246. /* .maxSizeOfASDU = */ 249
  247. };
  248. static CS104_Slave
  249. createSlave(int maxLowPrioQueueSize, int maxHighPrioQueueSize)
  250. {
  251. CS104_Slave self = (CS104_Slave) GLOBAL_CALLOC(1, sizeof(struct sCS104_Slave));
  252. if (self != NULL) {
  253. self->conParameters = defaultConnectionParameters;
  254. self->alParameters = defaultAppLayerParameters;
  255. self->asduHandler = NULL;
  256. self->interrogationHandler = NULL;
  257. self->counterInterrogationHandler = NULL;
  258. self->readHandler = NULL;
  259. self->clockSyncHandler = NULL;
  260. self->resetProcessHandler = NULL;
  261. self->delayAcquisitionHandler = NULL;
  262. self->connectionRequestHandler = NULL;
  263. self->connectionEventHandler = NULL;
  264. self->rawMessageHandler = NULL;
  265. self->maxLowPrioQueueSize = maxLowPrioQueueSize;
  266. self->maxHighPrioQueueSize = maxHighPrioQueueSize;
  267. {
  268. int i;
  269. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  270. self->masterConnections[i] = MasterConnection_create(self);
  271. }
  272. }
  273. self->maxOpenConnections = CONFIG_CS104_MAX_CLIENT_CONNECTIONS;
  274. #if (CONFIG_USE_SEMAPHORES == 1)
  275. self->openConnectionsLock = xSemaphoreCreateMutex();
  276. self->stateLock = xSemaphoreCreateMutex();
  277. #endif
  278. #if (CONFIG_USE_THREADS == 1)
  279. self->isThreadlessMode = false;
  280. #endif
  281. self->isRunning = false;
  282. self->stopRunning = false;
  283. self->localAddress = NULL;
  284. self->tcpPort = CS104_DEFAULT_PORT;
  285. self->openConnections = 0;
  286. #if (CONFIG_USE_THREADS == 1)
  287. self->listeningThread = NULL;
  288. #endif
  289. self->serverSocket = NULL;
  290. self->plugins = NULL;
  291. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  292. self->tlsConfig = NULL;
  293. #endif
  294. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  295. self->redundancyGroups = NULL;
  296. #endif
  297. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  298. self->serverMode = CS104_MODE_SINGLE_REDUNDANCY_GROUP;
  299. #else
  300. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  301. self->serverMode = CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP;
  302. #endif
  303. #endif
  304. }
  305. return self;
  306. }
  307. CS104_Slave
  308. CS104_Slave_create(int maxLowPrioQueueSize, int maxHighPrioQueueSize)
  309. {
  310. return createSlave(maxLowPrioQueueSize, maxHighPrioQueueSize);
  311. }
  312. /* Depends on ASDU size! */
  313. static bool
  314. HighPriorityASDUQueue_isFull(HighPriorityASDUQueue self)
  315. {
  316. bool full = false;
  317. int entrySize = sizeof(uint16_t) + (256 - IEC60870_5_104_APCI_LENGTH);
  318. #if (CONFIG_USE_SEMAPHORES == 1)
  319. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  320. #endif
  321. uint16_t msgSize;
  322. uint8_t* nextMsgPtr;
  323. if (self->entryCounter > 0) {
  324. memcpy(&msgSize, self->lastEntry, sizeof(uint16_t));
  325. nextMsgPtr = self->lastEntry + sizeof(uint16_t) + msgSize;
  326. if (nextMsgPtr + entrySize > self->buffer + self->size) {
  327. nextMsgPtr = self->buffer;
  328. }
  329. if (nextMsgPtr <= self->firstEntry) {
  330. if (nextMsgPtr + entrySize > self->firstEntry) {
  331. full = true;
  332. }
  333. }
  334. }
  335. #if (CONFIG_USE_SEMAPHORES == 1)
  336. xSemaphoreGive(self->queueLock);
  337. #endif
  338. return full;
  339. }
  340. /********************************************
  341. * IMasterConnection
  342. *******************************************/
  343. static bool
  344. MasterConnection_isActive(MasterConnection self)
  345. {
  346. bool isActive = false;
  347. #if (CONFIG_USE_SEMAPHORES == 1)
  348. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  349. #endif
  350. if (self->state == M_CON_STATE_STARTED)
  351. isActive = true;
  352. #if (CONFIG_USE_SEMAPHORES == 1)
  353. xSemaphoreGive(self->stateLock);
  354. #endif
  355. return isActive;
  356. }
  357. static bool
  358. isSentBufferFull(MasterConnection self)
  359. {
  360. /* locking of k-buffer has to be done by caller! */
  361. if (self->oldestSentASDU == -1)
  362. return false;
  363. int newIndex = (self->newestSentASDU + 1) % (self->maxSentASDUs);
  364. if (newIndex == self->oldestSentASDU)
  365. return true;
  366. else
  367. return false;
  368. }
  369. static bool
  370. _IMasterConnection_isReady(IMasterConnection self)
  371. {
  372. MasterConnection con = (MasterConnection) self->object;
  373. if (MasterConnection_isActive(con)) {
  374. if (isSentBufferFull(con) == false)
  375. return true;
  376. if (HighPriorityASDUQueue_isFull(con->highPrioQueue))
  377. return false;
  378. return true;
  379. }
  380. else
  381. return false;
  382. }
  383. static int
  384. writeToSocket(MasterConnection self, uint8_t* buf, int size)
  385. {
  386. if (self->slave->rawMessageHandler)
  387. self->slave->rawMessageHandler(self->slave->rawMessageHandlerParameter,
  388. &(self->iMasterConnection), buf, size, true);
  389. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  390. if (self->tlsSocket)
  391. return TLSSocket_write(self->tlsSocket, buf, size);
  392. else
  393. return Socket_write(self->socket, buf, size);
  394. #else
  395. return Socket_write(self->socket, buf, size);
  396. #endif
  397. }
  398. static int
  399. sendIMessage(MasterConnection self, uint8_t* buffer, int msgSize)
  400. {
  401. #if (CONFIG_USE_SEMAPHORES == 1)
  402. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  403. #endif
  404. buffer[0] = (uint8_t) 0x68;
  405. buffer[1] = (uint8_t) (msgSize - 2);
  406. buffer[2] = (uint8_t) ((self->sendCount % 128) * 2);
  407. buffer[3] = (uint8_t) (self->sendCount / 128);
  408. buffer[4] = (uint8_t) ((self->receiveCount % 128) * 2);
  409. buffer[5] = (uint8_t) (self->receiveCount / 128);
  410. if (writeToSocket(self, buffer, msgSize) > 0) {
  411. printf("CS104 SLAVE: SEND I (size = %i) N(S) = %i N(R) = %i\n", msgSize, self->sendCount, self->receiveCount);
  412. self->sendCount = (self->sendCount + 1) % 32768;
  413. self->unconfirmedReceivedIMessages = 0;
  414. self->timeoutT2Triggered = false;
  415. }
  416. else
  417. self->isRunning = false;
  418. self->unconfirmedReceivedIMessages = 0;
  419. int sendCount = self->sendCount;
  420. #if (CONFIG_USE_SEMAPHORES == 1)
  421. xSemaphoreGive(self->stateLock);
  422. #endif
  423. return sendCount;
  424. }
  425. /********************************************************
  426. * MasterConnection
  427. *********************************************************/
  428. static void
  429. printSendBuffer(MasterConnection self)
  430. {
  431. if (self->oldestSentASDU != -1) {
  432. int currentIndex = self->oldestSentASDU;
  433. int nextIndex = 0;
  434. printf ("CS104 SLAVE: ------k-buffer------\n");
  435. do {
  436. printf("CS104 SLAVE: %02i : SeqNo=%i time=%llu : queueEntry=%p\n", currentIndex,
  437. self->sentASDUs[currentIndex].seqNo,
  438. self->sentASDUs[currentIndex].sentTime,
  439. self->sentASDUs[currentIndex].queueEntry);
  440. if (currentIndex == self->newestSentASDU)
  441. nextIndex = -1;
  442. else
  443. currentIndex = (currentIndex + 1) % self->maxSentASDUs;
  444. } while (nextIndex != -1);
  445. printf ("CS104 SLAVE: --------------------\n");
  446. }
  447. else
  448. printf("CS104 SLAVE: k-buffer is empty\n");
  449. }
  450. static void
  451. sendASDU(MasterConnection self, uint8_t* buffer, int msgSize, uint64_t entryId, uint8_t* queueEntry)
  452. {
  453. int currentIndex = 0;
  454. if (self->oldestSentASDU == -1) {
  455. self->oldestSentASDU = 0;
  456. self->newestSentASDU = 0;
  457. }
  458. else {
  459. currentIndex = (self->newestSentASDU + 1) % self->maxSentASDUs;
  460. }
  461. self->sentASDUs[currentIndex].entryId = entryId;
  462. self->sentASDUs[currentIndex].queueEntry = queueEntry;
  463. self->sentASDUs[currentIndex].seqNo = sendIMessage(self, buffer, msgSize);
  464. self->sentASDUs[currentIndex].sentTime = Hal_getTimeInMs();
  465. self->newestSentASDU = currentIndex;
  466. printSendBuffer(self);
  467. }
  468. static bool
  469. sendASDUInternal(MasterConnection self, CS101_ASDU asdu)
  470. {
  471. bool asduSent;
  472. if (MasterConnection_isActive(self))
  473. {
  474. #if (CONFIG_USE_SEMAPHORES == 1)
  475. xSemaphoreTake(self->sentASDUsLock, portMAX_DELAY);
  476. #endif
  477. if (isSentBufferFull(self) == false) {
  478. FrameBuffer frameBuffer;
  479. struct sBufferFrame bufferFrame;
  480. Frame frame = BufferFrame_initialize(&bufferFrame, frameBuffer.msg, IEC60870_5_104_APCI_LENGTH);
  481. CS101_ASDU_encode(asdu, frame);
  482. frameBuffer.msgSize = Frame_getMsgSize(frame);
  483. sendASDU(self, frameBuffer.msg, frameBuffer.msgSize, 0, NULL);
  484. #if (CONFIG_USE_SEMAPHORES == 1)
  485. xSemaphoreGive(self->sentASDUsLock);
  486. #endif
  487. asduSent = true;
  488. }
  489. else {
  490. #if (CONFIG_USE_SEMAPHORES == 1)
  491. xSemaphoreGive(self->sentASDUsLock);
  492. #endif
  493. asduSent = HighPriorityASDUQueue_enqueue(self->highPrioQueue, asdu);
  494. }
  495. }
  496. else
  497. asduSent = false;
  498. if (asduSent == false)
  499. printf("CS104 SLAVE: unable to send response (state=%i)\n", self->state);
  500. return asduSent;
  501. }
  502. static bool
  503. _IMasterConnection_sendASDU(IMasterConnection self, CS101_ASDU asdu)
  504. {
  505. MasterConnection con = (MasterConnection) self->object;
  506. return sendASDUInternal(con, asdu);
  507. }
  508. static bool
  509. _IMasterConnection_sendACT_CON(IMasterConnection self, CS101_ASDU asdu, bool negative)
  510. {
  511. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_CON);
  512. CS101_ASDU_setNegative(asdu, negative);
  513. return _IMasterConnection_sendASDU(self, asdu);
  514. }
  515. static bool
  516. _IMasterConnection_sendACT_TERM(IMasterConnection self, CS101_ASDU asdu)
  517. {
  518. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_TERMINATION);
  519. CS101_ASDU_setNegative(asdu, false);
  520. return _IMasterConnection_sendASDU(self, asdu);
  521. }
  522. static void
  523. MasterConnection_close(MasterConnection self)
  524. {
  525. #if (CONFIG_USE_SEMAPHORES == 1)
  526. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  527. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  528. self->isRunning = false;
  529. self->state = M_CON_STATE_STOPPED;
  530. #if (CONFIG_USE_SEMAPHORES == 1)
  531. xSemaphoreGive(self->stateLock);
  532. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  533. }
  534. static void
  535. _IMasterConnection_close(IMasterConnection self)
  536. {
  537. MasterConnection con = (MasterConnection) self->object;
  538. MasterConnection_close(con);
  539. }
  540. static int
  541. _IMasterConnection_getPeerAddress(IMasterConnection self, char* addrBuf, int addrBufSize)
  542. {
  543. MasterConnection con = (MasterConnection) self->object;
  544. char buf[54];
  545. if (con->socket == NULL) {
  546. return 0;
  547. }
  548. char* addrStr = Socket_getPeerAddressStatic(con->socket, buf);
  549. if (addrStr == NULL)
  550. return 0;
  551. int len = (int) strlen(buf);
  552. if (len < addrBufSize) {
  553. strcpy(addrBuf, buf);
  554. return len;
  555. }
  556. else
  557. return 0;
  558. }
  559. static CS101_AppLayerParameters
  560. _IMasterConnection_getApplicationLayerParameters(IMasterConnection self)
  561. {
  562. MasterConnection con = (MasterConnection) self->object;
  563. return &(con->slave->alParameters);
  564. }
  565. /********************************************
  566. * END IMasterConnection
  567. *******************************************/
  568. static MasterConnection
  569. MasterConnection_create(CS104_Slave slave)
  570. {
  571. MasterConnection self = (MasterConnection) GLOBAL_CALLOC(1, sizeof(struct sMasterConnection));
  572. if (self != NULL)
  573. {
  574. self->state = M_CON_STATE_STOPPED;
  575. self->isUsed = false;
  576. self->slave = slave;
  577. self->maxSentASDUs = 0;
  578. self->sentASDUs = NULL;
  579. self->iMasterConnection.object = self;
  580. self->iMasterConnection.getApplicationLayerParameters = _IMasterConnection_getApplicationLayerParameters;
  581. self->iMasterConnection.isReady = _IMasterConnection_isReady;
  582. self->iMasterConnection.sendASDU = _IMasterConnection_sendASDU;
  583. self->iMasterConnection.sendACT_CON = _IMasterConnection_sendACT_CON;
  584. self->iMasterConnection.sendACT_TERM = _IMasterConnection_sendACT_TERM;
  585. self->iMasterConnection.close = _IMasterConnection_close;
  586. self->iMasterConnection.getPeerAddress = _IMasterConnection_getPeerAddress;
  587. #if (CONFIG_USE_THREADS == 1)
  588. self->connectionThread = NULL;
  589. #endif
  590. #if (CONFIG_USE_SEMAPHORES == 1)
  591. self->sentASDUsLock = xSemaphoreCreateMutex();
  592. self->stateLock = xSemaphoreCreateMutex();
  593. #endif
  594. self->handleSet = Handleset_new();
  595. /* initialize pointers with NULL to avoid segmentation fault on destroy call */
  596. self->socket = NULL;
  597. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  598. self->tlsSocket = NULL;
  599. #endif
  600. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  601. self->redundancyGroup = NULL;
  602. #endif
  603. self->lowPrioQueue = NULL;
  604. self->highPrioQueue = NULL;
  605. }
  606. return self;
  607. }
  608. void
  609. CS104_Slave_setLocalAddress(CS104_Slave self, const char* ipAddress)
  610. {
  611. if (self->localAddress)
  612. GLOBAL_FREEMEM(self->localAddress);
  613. self->localAddress = (char*) GLOBAL_MALLOC(strlen(ipAddress) + 1);
  614. if (self->localAddress)
  615. strcpy(self->localAddress, ipAddress);
  616. }
  617. void
  618. CS104_Slave_setServerMode(CS104_Slave self, CS104_ServerMode serverMode)
  619. {
  620. self->serverMode = serverMode;
  621. }
  622. CS104_APCIParameters
  623. CS104_Slave_getConnectionParameters(CS104_Slave self)
  624. {
  625. return &(self->conParameters);
  626. }
  627. CS101_AppLayerParameters
  628. CS104_Slave_getAppLayerParameters(CS104_Slave self)
  629. {
  630. return &(self->alParameters);
  631. }
  632. static bool
  633. isRunning(CS104_Slave self)
  634. {
  635. bool isRunning;
  636. #if (CONFIG_USE_SEMAPHORES == 1)
  637. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  638. #endif
  639. isRunning = self->isRunning;
  640. #if (CONFIG_USE_SEMAPHORES == 1)
  641. xSemaphoreGive(self->stateLock);
  642. #endif
  643. return isRunning;
  644. }
  645. static bool
  646. isStarting(CS104_Slave self)
  647. {
  648. bool isStarting;
  649. #if (CONFIG_USE_SEMAPHORES == 1)
  650. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  651. #endif
  652. isStarting = self->isStarting;
  653. #if (CONFIG_USE_SEMAPHORES == 1)
  654. xSemaphoreGive(self->stateLock);
  655. #endif
  656. return isStarting;
  657. }
  658. static bool
  659. isStopRunningSet(CS104_Slave self)
  660. {
  661. bool isStopRunningSet;
  662. #if (CONFIG_USE_SEMAPHORES == 1)
  663. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  664. #endif
  665. isStopRunningSet = self->stopRunning;
  666. #if (CONFIG_USE_SEMAPHORES == 1)
  667. xSemaphoreGive(self->stateLock);
  668. #endif
  669. return isStopRunningSet;
  670. }
  671. bool
  672. CS104_Slave_isRunning(CS104_Slave self)
  673. {
  674. return isRunning(self);
  675. }
  676. static void
  677. MessageQueue_initialize(MessageQueue self)
  678. {
  679. self->entryCounter = 0;
  680. self->firstEntry = NULL;
  681. self->lastEntry = NULL;
  682. self->lastInBufferEntry = NULL;
  683. self->entryId = 1;
  684. }
  685. static MessageQueue
  686. MessageQueue_create(int maxQueueSize)
  687. {
  688. MessageQueue self = (MessageQueue) GLOBAL_MALLOC(sizeof(struct sMessageQueue));
  689. if (self) {
  690. self->size = maxQueueSize * (sizeof(struct sMessageQueueEntryInfo) + 256);
  691. printf("CS104 SLAVE: event queue buffer size: %i bytes\n", self->size);
  692. self->buffer = (uint8_t*) GLOBAL_CALLOC(1, self->size);
  693. #if (CONFIG_USE_SEMAPHORES == 1)
  694. self->queueLock = xSemaphoreCreateMutex();
  695. #endif
  696. MessageQueue_initialize(self);
  697. }
  698. return self;
  699. }
  700. static void
  701. HighPriorityASDUQueue_initialize(HighPriorityASDUQueue self)
  702. {
  703. self->entryCounter = 0;
  704. self->firstEntry = NULL;
  705. self->lastEntry = NULL;
  706. self->lastInBufferEntry = NULL;
  707. }
  708. static HighPriorityASDUQueue
  709. HighPriorityASDUQueue_create(int maxQueueSize)
  710. {
  711. HighPriorityASDUQueue self = (HighPriorityASDUQueue) GLOBAL_MALLOC(sizeof(struct sHighPriorityASDUQueue));
  712. if (self) {
  713. self->size = maxQueueSize * (sizeof(uint16_t) + 256);
  714. self->buffer = (uint8_t*) GLOBAL_CALLOC(1, self->size);
  715. #if (CONFIG_USE_SEMAPHORES == 1)
  716. self->queueLock = xSemaphoreCreateMutex();
  717. #endif
  718. HighPriorityASDUQueue_initialize(self);
  719. }
  720. return self;
  721. }
  722. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  723. static void
  724. initializeMessageQueues(CS104_Slave self, int lowPrioMaxQueueSize, int highPrioMaxQueueSize)
  725. {
  726. /* initialized low priority queue */
  727. if (lowPrioMaxQueueSize < 1)
  728. lowPrioMaxQueueSize = CONFIG_CS104_MESSAGE_QUEUE_SIZE;
  729. self->asduQueue = MessageQueue_create(lowPrioMaxQueueSize);
  730. /* initialize high priority queue */
  731. if (highPrioMaxQueueSize < 1)
  732. highPrioMaxQueueSize = CONFIG_CS104_MESSAGE_QUEUE_HIGH_PRIO_SIZE;
  733. self->connectionAsduQueue = HighPriorityASDUQueue_create(highPrioMaxQueueSize);
  734. }
  735. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1) */
  736. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  737. static void
  738. CS104_RedundancyGroup_initializeMessageQueues(CS104_RedundancyGroup self, int lowPrioMaxQueueSize, int highPrioMaxQueueSize)
  739. {
  740. /* initialized low priority queue */
  741. if (lowPrioMaxQueueSize < 1)
  742. lowPrioMaxQueueSize = CONFIG_CS104_MESSAGE_QUEUE_SIZE;
  743. self->asduQueue = MessageQueue_create(lowPrioMaxQueueSize);
  744. /* initialize high priority queue */
  745. if (highPrioMaxQueueSize < 1)
  746. highPrioMaxQueueSize = CONFIG_CS104_MESSAGE_QUEUE_HIGH_PRIO_SIZE;
  747. self->connectionAsduQueue = HighPriorityASDUQueue_create(highPrioMaxQueueSize);
  748. }
  749. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  750. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  751. static void
  752. initializeConnectionSpecificQueues(CS104_Slave self)
  753. {
  754. int i;
  755. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  756. self->masterConnections[i]->lowPrioQueue = MessageQueue_create(self->maxLowPrioQueueSize);
  757. self->masterConnections[i]->highPrioQueue = HighPriorityASDUQueue_create(self->maxHighPrioQueueSize);
  758. }
  759. }
  760. static void
  761. MessageQueue_destroy(MessageQueue self)
  762. {
  763. if (self != NULL) {
  764. #if (CONFIG_USE_SEMAPHORES == 1)
  765. platform_mutex_destroy(self->queueLock);
  766. #endif
  767. GLOBAL_FREEMEM(self->buffer);
  768. GLOBAL_FREEMEM(self);
  769. }
  770. }
  771. static void
  772. HighPriorityASDUQueue_destroy(HighPriorityASDUQueue self)
  773. {
  774. if (self) {
  775. if (self->buffer)
  776. GLOBAL_FREEMEM(self->buffer);
  777. #if (CONFIG_USE_SEMAPHORES == 1)
  778. platform_mutex_destroy(self->queueLock);
  779. #endif
  780. GLOBAL_FREEMEM(self);
  781. }
  782. }
  783. static void
  784. deleteConnectionSpecificQueues(CS104_Slave self)
  785. {
  786. int i;
  787. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  788. if (self->masterConnections[i]->lowPrioQueue) {
  789. MessageQueue_destroy(self->masterConnections[i]->lowPrioQueue);
  790. self->masterConnections[i]->lowPrioQueue = NULL;
  791. }
  792. if (self->masterConnections[i]->highPrioQueue) {
  793. HighPriorityASDUQueue_destroy(self->masterConnections[i]->highPrioQueue);
  794. self->masterConnections[i]->highPrioQueue = NULL;
  795. }
  796. }
  797. }
  798. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1) */
  799. char *strdup(const char *s)
  800. {
  801. size_t len = strlen(s) + 1;
  802. void *new = mymalloc(SRAMEX,len);
  803. if (new == NULL)
  804. return NULL;
  805. return (char *)memcpy(new, s, len);
  806. }
  807. CS104_RedundancyGroup
  808. CS104_RedundancyGroup_create(const char* name)
  809. {
  810. CS104_RedundancyGroup self = (CS104_RedundancyGroup) GLOBAL_MALLOC(sizeof(struct sCS104_RedundancyGroup));
  811. if (self) {
  812. if (name)
  813. self->name = strdup(name);
  814. else
  815. self->name = NULL;
  816. self->asduQueue = NULL;
  817. self->connectionAsduQueue = NULL;
  818. self->allowedClients = NULL;
  819. }
  820. return self;
  821. }
  822. void
  823. CS104_Slave_addRedundancyGroup(CS104_Slave self, CS104_RedundancyGroup redundancyGroup)
  824. {
  825. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  826. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS) {
  827. if (self->redundancyGroups == NULL)
  828. self->redundancyGroups = LinkedList_create();
  829. LinkedList_add(self->redundancyGroups, redundancyGroup);
  830. }
  831. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  832. }
  833. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  834. static void
  835. initializeRedundancyGroups(CS104_Slave self, int lowPrioMaxQueueSize, int highPrioMaxQueueSize)
  836. {
  837. if (self->redundancyGroups == NULL) {
  838. CS104_RedundancyGroup redGroup = CS104_RedundancyGroup_create(NULL);
  839. CS104_Slave_addRedundancyGroup(self, redGroup);
  840. }
  841. LinkedList element = LinkedList_getNext(self->redundancyGroups);
  842. while (element) {
  843. CS104_RedundancyGroup redGroup = (CS104_RedundancyGroup) LinkedList_getData(element);
  844. if (redGroup->asduQueue == NULL)
  845. CS104_RedundancyGroup_initializeMessageQueues(redGroup, lowPrioMaxQueueSize, highPrioMaxQueueSize);
  846. element = LinkedList_getNext(element);
  847. }
  848. }
  849. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  850. /***************************************************
  851. * RedundancyGroup
  852. ***************************************************/
  853. typedef struct sCS104_IPAddress* CS104_IPAddress;
  854. struct sCS104_IPAddress
  855. {
  856. uint8_t address[16];
  857. eCS104_IPAddressType type;
  858. };
  859. static void
  860. CS104_IPAddress_setFromString(CS104_IPAddress self, const char* ipAddrStr)
  861. {
  862. if (strchr(ipAddrStr, '.') != NULL) {
  863. /* parse IPv4 string */
  864. self->type = IP_ADDRESS_TYPE_IPV4;
  865. int i;
  866. for (i = 0; i < 4; i++) {
  867. self->address[i] = (uint8_t) strtoul(ipAddrStr, NULL, 10);
  868. ipAddrStr = strchr(ipAddrStr, '.');
  869. if ((ipAddrStr == NULL) || (*ipAddrStr == 0))
  870. break;
  871. ipAddrStr++;
  872. }
  873. }
  874. else {
  875. self->type = IP_ADDRESS_TYPE_IPV6;
  876. int i;
  877. for (i = 0; i < 8; i++) {
  878. uint32_t val = strtoul(ipAddrStr, NULL, 16);
  879. self->address[i * 2] = val / 0x100;
  880. self->address[i * 2 + 1] = val % 0x100;
  881. ipAddrStr = strchr(ipAddrStr, ':');
  882. if ((ipAddrStr == NULL) || (*ipAddrStr == 0))
  883. break;
  884. ipAddrStr++;
  885. }
  886. }
  887. }
  888. static bool
  889. CS104_IPAddress_equals(CS104_IPAddress self, CS104_IPAddress other)
  890. {
  891. if (self->type != other->type)
  892. return false;
  893. int size;
  894. if (self->type == IP_ADDRESS_TYPE_IPV4)
  895. size = 4;
  896. else
  897. size = 16;
  898. int i;
  899. for (i = 0; i < size; i++) {
  900. if (self->address[i] != other->address[i])
  901. return false;
  902. }
  903. return true;
  904. }
  905. static bool
  906. CS104_RedundancyGroup_isCatchAll(CS104_RedundancyGroup self)
  907. {
  908. if (self->allowedClients)
  909. return false;
  910. else
  911. return true;
  912. }
  913. static bool
  914. CS104_RedundancyGroup_matches(CS104_RedundancyGroup self, CS104_IPAddress ipAddress)
  915. {
  916. if (self->allowedClients == NULL)
  917. return false;
  918. LinkedList element = LinkedList_getNext(self->allowedClients);
  919. while (element) {
  920. CS104_IPAddress allowedAddress = (CS104_IPAddress) LinkedList_getData(element);
  921. if (CS104_IPAddress_equals(ipAddress, allowedAddress))
  922. return true;
  923. element = LinkedList_getNext(element);
  924. }
  925. return false;
  926. }
  927. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  928. static CS104_RedundancyGroup
  929. getMatchingRedundancyGroup(CS104_Slave self, char* ipAddrStr)
  930. {
  931. struct sCS104_IPAddress ipAddress;
  932. CS104_IPAddress_setFromString(&ipAddress, ipAddrStr);
  933. CS104_RedundancyGroup catchAllGroup = NULL;
  934. CS104_RedundancyGroup matchingGroup = NULL;
  935. LinkedList element = LinkedList_getNext(self->redundancyGroups);
  936. while (element) {
  937. CS104_RedundancyGroup redGroup = (CS104_RedundancyGroup) LinkedList_getData(element);
  938. if (CS104_RedundancyGroup_matches(redGroup, &ipAddress)) {
  939. matchingGroup = redGroup;
  940. break;
  941. }
  942. if (CS104_RedundancyGroup_isCatchAll(redGroup))
  943. catchAllGroup = redGroup;
  944. element = LinkedList_getNext(element);
  945. }
  946. if (matchingGroup == NULL)
  947. matchingGroup = catchAllGroup;
  948. return matchingGroup;
  949. }
  950. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  951. static void
  952. _resetT3Timeout(MasterConnection self, uint64_t currentTime)
  953. {
  954. self->nextT3Timeout = currentTime + (uint64_t) (self->slave->conParameters.t3 * 1000);
  955. }
  956. static void
  957. resetT3Timeout(MasterConnection self, uint64_t currentTime)
  958. {
  959. #if (CONFIG_USE_SEMAPHORES == 1)
  960. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  961. #endif
  962. _resetT3Timeout(self, currentTime);
  963. #if (CONFIG_USE_SEMAPHORES == 1)
  964. xSemaphoreGive(self->stateLock);
  965. #endif
  966. }
  967. static void
  968. MessageQueue_releaseAllQueuedASDUs(MessageQueue self)
  969. {
  970. #if (CONFIG_USE_SEMAPHORES == 1)
  971. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  972. #endif
  973. self->firstEntry = NULL;
  974. self->lastEntry = NULL;
  975. self->lastInBufferEntry = NULL;
  976. self->entryCounter = 0;
  977. #if (CONFIG_USE_SEMAPHORES == 1)
  978. xSemaphoreGive(self->queueLock);
  979. #endif
  980. }
  981. static void
  982. HighPriorityASDUQueue_resetConnectionQueue(HighPriorityASDUQueue self)
  983. {
  984. #if (CONFIG_USE_SEMAPHORES == 1)
  985. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  986. #endif
  987. self->firstEntry = 0;
  988. self->lastEntry = 0;
  989. self->lastInBufferEntry = 0;
  990. self->entryCounter = 0;
  991. #if (CONFIG_USE_SEMAPHORES == 1)
  992. xSemaphoreGive(self->queueLock);
  993. #endif
  994. }
  995. static bool
  996. MasterConnection_init(MasterConnection self, Socket skt, MessageQueue lowPrioQueue, HighPriorityASDUQueue highPrioQueue)
  997. {
  998. if (self)
  999. {
  1000. self->socket = skt;
  1001. self->isRunning = false;
  1002. self->receiveCount = 0;
  1003. self->sendCount = 0;
  1004. self->recvBufPos = 0;
  1005. if (self->maxSentASDUs != self->slave->conParameters.k)
  1006. {
  1007. if (self->sentASDUs)
  1008. {
  1009. GLOBAL_FREEMEM(self->sentASDUs);
  1010. self->sentASDUs = NULL;
  1011. }
  1012. }
  1013. if (self->sentASDUs == NULL)
  1014. {
  1015. self->maxSentASDUs = self->slave->conParameters.k;
  1016. self->sentASDUs = (SentASDUSlave*) GLOBAL_CALLOC(self->maxSentASDUs, sizeof(SentASDUSlave));
  1017. if (self->sentASDUs == NULL)
  1018. {
  1019. printf("CS104 SLAVE: Failed to allocate memory for sent ASDU buffer\n");
  1020. return false;
  1021. }
  1022. }
  1023. self->unconfirmedReceivedIMessages = 0;
  1024. self->lastConfirmationTime = 0xffffffffffffffffULL;// /* 18446744073709551615ULL */
  1025. self->timeoutT2Triggered = false;
  1026. self->oldestSentASDU = -1;
  1027. self->newestSentASDU = -1;
  1028. resetT3Timeout(self, Hal_getTimeInMs());
  1029. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  1030. if (self->slave->tlsConfig != NULL) {
  1031. self->tlsSocket = TLSSocket_create(skt, self->slave->tlsConfig, false);
  1032. if (self->tlsSocket == NULL) {
  1033. printf("CS104 SLAVE: Failed to create TLS context. Close connection\n");
  1034. self->isUsed = false;
  1035. return false;
  1036. }
  1037. }
  1038. else
  1039. self->tlsSocket = NULL;
  1040. #endif
  1041. /* for the mode CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP we use the connection specific queues */
  1042. if (lowPrioQueue)
  1043. self->lowPrioQueue = lowPrioQueue;
  1044. else {
  1045. MessageQueue_releaseAllQueuedASDUs(self->lowPrioQueue);
  1046. }
  1047. if (highPrioQueue)
  1048. self->highPrioQueue = highPrioQueue;
  1049. HighPriorityASDUQueue_resetConnectionQueue(self->highPrioQueue);
  1050. self->waitingForTestFRcon = false;
  1051. return true;
  1052. }
  1053. else {
  1054. return false;
  1055. }
  1056. }
  1057. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  1058. static bool
  1059. MasterConnection_initEx(MasterConnection self, Socket skt, CS104_RedundancyGroup redGroup)
  1060. {
  1061. bool retVal = false;
  1062. if (self) {
  1063. retVal = MasterConnection_init(self, skt, redGroup->asduQueue, redGroup->connectionAsduQueue);
  1064. if (retVal)
  1065. self->redundancyGroup = redGroup;
  1066. }
  1067. return retVal;
  1068. }
  1069. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  1070. static char*
  1071. getPeerAddress(Socket socket, char* ipAddress)
  1072. {
  1073. char* ipAddrStr = NULL;
  1074. if (Socket_getPeerAddressStatic(socket, ipAddress)) {
  1075. /* remove TCP port part */
  1076. if (ipAddress[0] == '[') {
  1077. /* IPV6 address */
  1078. ipAddrStr = ipAddress + 1;
  1079. char* separator = strchr(ipAddrStr, ']');
  1080. if (separator != NULL)
  1081. *separator = 0;
  1082. }
  1083. else {
  1084. /* IPV4 address */
  1085. ipAddrStr = ipAddress;
  1086. char* separator = strchr(ipAddrStr, ':');
  1087. if (separator != NULL)
  1088. *separator = 0;
  1089. }
  1090. }
  1091. return ipAddrStr;
  1092. }
  1093. static bool
  1094. callConnectionRequestHandler(CS104_Slave self, Socket newSocket)
  1095. {
  1096. char ipAddress[60];
  1097. char* ipAddrStr = getPeerAddress(newSocket, ipAddress);
  1098. if (ipAddrStr == NULL)
  1099. return false;
  1100. if (self->connectionRequestHandler != NULL) {
  1101. return self->connectionRequestHandler(self->connectionRequestHandlerParameter,
  1102. ipAddrStr);
  1103. }
  1104. else
  1105. return true;
  1106. }
  1107. /**
  1108. * \return number of bytes read, or -1 in case of an error
  1109. */
  1110. static int
  1111. readFromSocket(MasterConnection self, uint8_t* buffer, int size)
  1112. {
  1113. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  1114. if (self->tlsSocket != NULL)
  1115. return TLSSocket_read(self->tlsSocket, buffer, size);
  1116. else
  1117. return Socket_read(self->socket, buffer, size);
  1118. #else
  1119. return Socket_read(self->socket, buffer, size);
  1120. #endif
  1121. }
  1122. /**
  1123. * \brief Read message part into receive buffer
  1124. *
  1125. * \return -1 in case of an error, 0 when no complete message can be read, > 0 when a complete message is in buffer
  1126. */
  1127. static int
  1128. receiveMessage(MasterConnection self)
  1129. {
  1130. uint8_t* buffer = self->recvBuffer;
  1131. int bufPos = self->recvBufPos;
  1132. /* read start byte */
  1133. if (bufPos == 0) {
  1134. int readFirst = readFromSocket(self, buffer, 1);
  1135. if (readFirst < 1)
  1136. return readFirst;
  1137. if (buffer[0] != 0x68)
  1138. return -1; /* message error */
  1139. bufPos++;
  1140. }
  1141. /* read length byte */
  1142. if (bufPos == 1) {
  1143. if (readFromSocket(self, buffer + 1, 1) != 1) {
  1144. self->recvBufPos = 0;
  1145. return -1;
  1146. }
  1147. bufPos++;
  1148. }
  1149. /* read remaining frame */
  1150. if (bufPos > 1) {
  1151. int length = buffer[1];
  1152. int remainingLength = length - bufPos + 2;
  1153. int readCnt = readFromSocket(self, buffer + bufPos, remainingLength);
  1154. if (readCnt == remainingLength) {
  1155. self->recvBufPos = 0;
  1156. return length + 2;
  1157. }
  1158. else if (readCnt == -1) {
  1159. self->recvBufPos = 0;
  1160. return -1;
  1161. }
  1162. else {
  1163. self->recvBufPos = bufPos + readCnt;
  1164. return 0;
  1165. }
  1166. }
  1167. self->recvBufPos = bufPos;
  1168. return 0;
  1169. }
  1170. static MasterConnection
  1171. getFreeConnection(CS104_Slave self)
  1172. {
  1173. MasterConnection connection = NULL;
  1174. int i;
  1175. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++)
  1176. {
  1177. MasterConnection con = self->masterConnections[i];
  1178. if (con) {
  1179. #if (CONFIG_USE_SEMAPHORES)
  1180. xSemaphoreTake(con->stateLock, portMAX_DELAY);
  1181. #endif
  1182. if (con->isUsed == false) {
  1183. connection = con;
  1184. connection->isUsed = true;
  1185. }
  1186. #if (CONFIG_USE_SEMAPHORES)
  1187. xSemaphoreGive(con->stateLock);
  1188. #endif
  1189. }
  1190. if (connection)
  1191. break;
  1192. }
  1193. return connection;
  1194. }
  1195. static bool
  1196. MasterConnection_isRunning(MasterConnection self)
  1197. {
  1198. bool retVal;
  1199. #if (CONFIG_USE_SEMAPHORES == 1)
  1200. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1201. #endif
  1202. retVal = self->isRunning;
  1203. #if (CONFIG_USE_SEMAPHORES == 1)
  1204. xSemaphoreGive(self->stateLock);
  1205. #endif
  1206. return retVal;
  1207. }
  1208. static void
  1209. removeFirstEntry(MessageQueue self)
  1210. {
  1211. if (self->firstEntry == self->lastInBufferEntry)
  1212. {
  1213. if (self->firstEntry == self->lastEntry)
  1214. {
  1215. self->firstEntry = NULL;
  1216. self->lastEntry = NULL;
  1217. self->lastInBufferEntry = NULL;
  1218. }
  1219. else {
  1220. self->firstEntry = self->buffer;
  1221. self->lastInBufferEntry = self->lastEntry;
  1222. }
  1223. }
  1224. else
  1225. {
  1226. struct sMessageQueueEntryInfo entryInfo;
  1227. memcpy(&entryInfo, self->firstEntry, sizeof(struct sMessageQueueEntryInfo));
  1228. self->firstEntry = self->firstEntry + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  1229. }
  1230. self->entryCounter--;
  1231. }
  1232. static void
  1233. MessageQueue_markAsduAsConfirmed(MessageQueue self, uint8_t* queueEntry, uint64_t entryId)
  1234. {
  1235. if (self->entryCounter > 0)
  1236. {
  1237. /* entryId plausibility check */
  1238. uint64_t entryIdDiff = self->entryId - 1 - entryId;
  1239. if (entryIdDiff < (unsigned) self->entryCounter)
  1240. {
  1241. struct sMessageQueueEntryInfo entryInfo;
  1242. memcpy(&entryInfo, queueEntry, sizeof(struct sMessageQueueEntryInfo));
  1243. /* check if ASDU is matching */
  1244. if (entryInfo.entryId == entryId)
  1245. {
  1246. entryInfo.entryState = QUEUE_ENTRY_STATE_NOT_USED_OR_CONFIRMED;
  1247. memcpy(queueEntry, &entryInfo, sizeof(struct sMessageQueueEntryInfo));
  1248. if (queueEntry == self->firstEntry) {
  1249. removeFirstEntry(self);
  1250. }
  1251. }
  1252. else {
  1253. /* we shouldn't be here - probably bug in queue handling code */
  1254. printf("CS104 SLAVE: message queue corrupted\n");
  1255. }
  1256. }
  1257. }
  1258. }
  1259. static bool
  1260. checkSequenceNumber(MasterConnection self, int seqNo)
  1261. {
  1262. #if (CONFIG_USE_SEMAPHORES == 1)
  1263. xSemaphoreTake(self->sentASDUsLock, portMAX_DELAY);
  1264. #endif
  1265. /* check if received sequence number is valid */
  1266. bool seqNoIsValid = false;
  1267. bool counterOverflowDetected = false;
  1268. int oldestValidSeqNo = -1;
  1269. if (self->oldestSentASDU == -1) { /* if k-Buffer is empty */
  1270. if (seqNo == self->sendCount)
  1271. seqNoIsValid = true;
  1272. }
  1273. else {
  1274. /* two cases are required to reflect sequence number overflow */
  1275. int oldestAsduSeqNo = self->sentASDUs[self->oldestSentASDU].seqNo;
  1276. int newestAsduSeqNo = self->sentASDUs[self->newestSentASDU].seqNo;
  1277. if (oldestAsduSeqNo <= newestAsduSeqNo) {
  1278. if ((seqNo >= oldestAsduSeqNo) && (seqNo <= newestAsduSeqNo))
  1279. seqNoIsValid = true;
  1280. }
  1281. else {
  1282. if ((seqNo >= oldestAsduSeqNo) || (seqNo <= newestAsduSeqNo))
  1283. seqNoIsValid = true;
  1284. counterOverflowDetected = true;
  1285. }
  1286. /* check if confirmed message was already removed from list */
  1287. if (oldestAsduSeqNo == 0)
  1288. oldestValidSeqNo = 32767;
  1289. else
  1290. oldestValidSeqNo = (oldestAsduSeqNo - 1) % 32768;
  1291. if (oldestValidSeqNo == seqNo)
  1292. seqNoIsValid = true;
  1293. }
  1294. if (seqNoIsValid)
  1295. {
  1296. if (self->oldestSentASDU != -1)
  1297. {
  1298. do
  1299. {
  1300. int oldestAsduSeqNo = self->sentASDUs[self->oldestSentASDU].seqNo;
  1301. if (counterOverflowDetected == false) {
  1302. if (seqNo < oldestAsduSeqNo)
  1303. break;
  1304. }
  1305. if (seqNo == oldestValidSeqNo)
  1306. break;
  1307. /* remove from server (low-priority) queue if required */
  1308. if (self->sentASDUs[self->oldestSentASDU].queueEntry != NULL)
  1309. {
  1310. xSemaphoreTake(self->lowPrioQueue, portMAX_DELAY);
  1311. MessageQueue_markAsduAsConfirmed(self->lowPrioQueue,
  1312. self->sentASDUs[self->oldestSentASDU].queueEntry,
  1313. self->sentASDUs[self->oldestSentASDU].entryId);
  1314. self->sentASDUs[self->oldestSentASDU].queueEntry = NULL;
  1315. self->sentASDUs[self->oldestSentASDU].seqNo = -1;
  1316. xSemaphoreGive(self->lowPrioQueue);
  1317. }
  1318. if (oldestAsduSeqNo == seqNo)
  1319. {
  1320. /* we arrived at the seq# that has been confirmed */
  1321. if (self->oldestSentASDU == self->newestSentASDU)
  1322. self->oldestSentASDU = -1;
  1323. else
  1324. self->oldestSentASDU = (self->oldestSentASDU + 1) % self->maxSentASDUs;
  1325. break;
  1326. }
  1327. self->oldestSentASDU = (self->oldestSentASDU + 1) % self->maxSentASDUs;
  1328. int checkIndex = (self->newestSentASDU + 1) % self->maxSentASDUs;
  1329. if (self->oldestSentASDU == checkIndex) {
  1330. self->oldestSentASDU = -1;
  1331. break;
  1332. }
  1333. } while (true);
  1334. }
  1335. }
  1336. else
  1337. printf("CS104 SLAVE: Received sequence number out of range");
  1338. #if (CONFIG_USE_SEMAPHORES == 1)
  1339. xSemaphoreGive(self->sentASDUsLock);
  1340. #endif
  1341. return seqNoIsValid;
  1342. }
  1343. static void
  1344. responseCOTUnknown(CS101_ASDU asdu, MasterConnection self)
  1345. {
  1346. printf("CS104 SLAVE: with unknown COT\n");
  1347. CS101_ASDU_setCOT(asdu, CS101_COT_UNKNOWN_COT);
  1348. CS101_ASDU_setNegative(asdu, true);
  1349. sendASDUInternal(self, asdu);
  1350. }
  1351. /*
  1352. * Handle received ASDUs
  1353. *
  1354. * Call the appropriate callbacks according to ASDU type and CoT
  1355. *
  1356. * \return true when ASDU is valid, false otherwise (e.g. corrupted message data)
  1357. */
  1358. static bool
  1359. handleASDU(MasterConnection self, CS101_ASDU asdu)
  1360. {
  1361. bool messageHandled = false;
  1362. CS104_Slave slave = self->slave;
  1363. /* call plugins */
  1364. if (slave->plugins) {
  1365. LinkedList pluginElem = LinkedList_getNext(slave->plugins);
  1366. while (pluginElem) {
  1367. CS101_SlavePlugin plugin = (CS101_SlavePlugin) LinkedList_getData(pluginElem);
  1368. CS101_SlavePlugin_Result result = plugin->handleAsdu(plugin->parameter, &(self->iMasterConnection), asdu);
  1369. if (result == CS101_PLUGIN_RESULT_HANDLED)
  1370. return true;
  1371. pluginElem = LinkedList_getNext(pluginElem);
  1372. }
  1373. }
  1374. uint8_t cot = CS101_ASDU_getCOT(asdu);
  1375. switch (CS101_ASDU_getTypeID(asdu)) {
  1376. case C_IC_NA_1: /* 100 - interrogation command */
  1377. printf("CS104 SLAVE: Rcvd interrogation command C_IC_NA_1\n");
  1378. if ((cot == CS101_COT_ACTIVATION) || (cot == CS101_COT_DEACTIVATION)) {
  1379. if (slave->interrogationHandler != NULL) {
  1380. union uInformationObject _io;
  1381. InterrogationCommand irc = (InterrogationCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1382. if (irc) {
  1383. if (slave->interrogationHandler(slave->interrogationHandlerParameter,
  1384. &(self->iMasterConnection), asdu, InterrogationCommand_getQOI(irc)))
  1385. messageHandled = true;
  1386. }
  1387. else
  1388. return false;
  1389. }
  1390. }
  1391. else {
  1392. responseCOTUnknown(asdu, self);
  1393. messageHandled = true;
  1394. }
  1395. break;
  1396. case C_CI_NA_1: /* 101 - counter interrogation command */
  1397. printf("CS104 SLAVE: Rcvd counter interrogation command C_CI_NA_1\n");
  1398. if ((cot == CS101_COT_ACTIVATION) || (cot == CS101_COT_DEACTIVATION)) {
  1399. if (slave->counterInterrogationHandler != NULL) {
  1400. union uInformationObject _io;
  1401. CounterInterrogationCommand cic = (CounterInterrogationCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1402. if (cic) {
  1403. if (slave->counterInterrogationHandler(slave->counterInterrogationHandlerParameter,
  1404. &(self->iMasterConnection), asdu, CounterInterrogationCommand_getQCC(cic)))
  1405. messageHandled = true;
  1406. }
  1407. else
  1408. return false;
  1409. }
  1410. }
  1411. else {
  1412. responseCOTUnknown(asdu, self);
  1413. messageHandled = true;
  1414. }
  1415. break;
  1416. case C_RD_NA_1: /* 102 - read command */
  1417. printf("CS104 SLAVE: Rcvd read command C_RD_NA_1\n");
  1418. if (cot == CS101_COT_REQUEST) {
  1419. if (slave->readHandler != NULL) {
  1420. union uInformationObject _io;
  1421. ReadCommand rc = (ReadCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1422. if (rc) {
  1423. if (slave->readHandler(slave->readHandlerParameter,
  1424. &(self->iMasterConnection), asdu, InformationObject_getObjectAddress((InformationObject) rc)))
  1425. messageHandled = true;
  1426. }
  1427. else
  1428. return false;
  1429. }
  1430. }
  1431. else {
  1432. responseCOTUnknown(asdu, self);
  1433. messageHandled = true;
  1434. }
  1435. break;
  1436. case C_CS_NA_1: /* 103 - Clock synchronization command */
  1437. printf("CS104 SLAVE: Rcvd clock sync command C_CS_NA_1\n");
  1438. if (cot == CS101_COT_ACTIVATION) {
  1439. if (slave->clockSyncHandler != NULL) {
  1440. union uInformationObject _io;
  1441. ClockSynchronizationCommand csc = (ClockSynchronizationCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1442. if (csc) {
  1443. CP56Time2a newTime = ClockSynchronizationCommand_getTime(csc);
  1444. if (slave->clockSyncHandler(slave->clockSyncHandlerParameter,
  1445. &(self->iMasterConnection), asdu, newTime)) {
  1446. CS101_ASDU_removeAllElements(asdu);
  1447. ClockSynchronizationCommand_create(csc, 0, newTime);
  1448. CS101_ASDU_addInformationObject(asdu, (InformationObject) csc);
  1449. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_CON);
  1450. sendASDUInternal(self, asdu);
  1451. }
  1452. else {
  1453. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_CON);
  1454. CS101_ASDU_setNegative(asdu, true);
  1455. sendASDUInternal(self, asdu);
  1456. }
  1457. messageHandled = true;
  1458. }
  1459. else
  1460. return false;
  1461. }
  1462. }
  1463. else {
  1464. responseCOTUnknown(asdu, self);
  1465. messageHandled = true;
  1466. }
  1467. break;
  1468. case C_TS_NA_1: /* 104 - test command */
  1469. #if (CONFIG_ALLOW_C_TS_NA_1_FOR_CS104 == 1)
  1470. printf("CS104 SLAVE: Rcvd test command C_TS_NA_1\n");
  1471. if (cot == CS101_COT_ACTIVATION) {
  1472. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_CON);
  1473. sendASDUInternal(self, asdu);
  1474. messageHandled = true;
  1475. }
  1476. else {
  1477. responseCOTUnknown(asdu, self);
  1478. messageHandled = true;
  1479. }
  1480. #else
  1481. /* this command is not supported/allowed for IEC 104 */
  1482. printf("CS104 SLAVE: Rcvd test command C_TS_NA_1 -> not allowed\n");
  1483. messageHandled = false;
  1484. #endif /* (CONFIG_ALLOW_C_TS_NA_1_FOR_CS104 == 1) */
  1485. break;
  1486. case C_RP_NA_1: /* 105 - Reset process command */
  1487. printf("CS104 SLAVE: Rcvd reset process command C_RP_NA_1\n");
  1488. if (cot == CS101_COT_ACTIVATION) {
  1489. if (slave->resetProcessHandler != NULL) {
  1490. union uInformationObject _io;
  1491. ResetProcessCommand rpc = (ResetProcessCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1492. if (rpc) {
  1493. if (slave->resetProcessHandler(slave->resetProcessHandlerParameter,
  1494. &(self->iMasterConnection), asdu, ResetProcessCommand_getQRP(rpc)))
  1495. messageHandled = true;
  1496. }
  1497. else
  1498. return false;
  1499. }
  1500. }
  1501. else {
  1502. responseCOTUnknown(asdu, self);
  1503. messageHandled = true;
  1504. }
  1505. break;
  1506. case C_CD_NA_1: /* 106 - Delay acquisition command */
  1507. printf("CS104 SLAVE: Rcvd delay acquisition command C_CD_NA_1\n");
  1508. if ((cot == CS101_COT_ACTIVATION) || (cot == CS101_COT_SPONTANEOUS)) {
  1509. if (slave->delayAcquisitionHandler != NULL) {
  1510. union uInformationObject _io;
  1511. DelayAcquisitionCommand dac = (DelayAcquisitionCommand) CS101_ASDU_getElementEx(asdu, (InformationObject) &_io, 0);
  1512. if (dac) {
  1513. if (slave->delayAcquisitionHandler(slave->delayAcquisitionHandlerParameter,
  1514. &(self->iMasterConnection), asdu, DelayAcquisitionCommand_getDelay(dac)))
  1515. messageHandled = true;
  1516. }
  1517. else
  1518. return false;
  1519. }
  1520. }
  1521. else {
  1522. responseCOTUnknown(asdu, self);
  1523. messageHandled = true;
  1524. }
  1525. break;
  1526. case C_TS_TA_1: /* 107 - test command with timestamp */
  1527. printf("CS104 SLAVE: Rcvd test command with CP56Time2a C_TS_TA_1\n");
  1528. if (cot != CS101_COT_ACTIVATION) {
  1529. CS101_ASDU_setCOT(asdu, CS101_COT_UNKNOWN_COT);
  1530. CS101_ASDU_setNegative(asdu, true);
  1531. }
  1532. else
  1533. CS101_ASDU_setCOT(asdu, CS101_COT_ACTIVATION_CON);
  1534. sendASDUInternal(self, asdu);
  1535. messageHandled = true;
  1536. break;
  1537. default: /* no special handler available -> use default handler */
  1538. break;
  1539. }
  1540. if ((messageHandled == false) && (slave->asduHandler != NULL))
  1541. if (slave->asduHandler(slave->asduHandlerParameter, &(self->iMasterConnection), asdu))
  1542. messageHandled = true;
  1543. if (messageHandled == false) {
  1544. /* send error response */
  1545. CS101_ASDU_setCOT(asdu, CS101_COT_UNKNOWN_TYPE_ID);
  1546. CS101_ASDU_setNegative(asdu, true);
  1547. sendASDUInternal(self, asdu);
  1548. }
  1549. return true;
  1550. }
  1551. static void
  1552. MasterConnection_deactivate(MasterConnection self)
  1553. {
  1554. #if (CONFIG_USE_SEMAPHORES == 1)
  1555. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1556. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  1557. if (self->isUsed)
  1558. {
  1559. if (self->state == M_CON_STATE_STARTED)
  1560. {
  1561. if (self->slave->connectionEventHandler) {
  1562. self->slave->connectionEventHandler(self->slave->connectionEventHandlerParameter, &(self->iMasterConnection), CS104_CON_EVENT_DEACTIVATED);
  1563. }
  1564. }
  1565. }
  1566. self->state = M_CON_STATE_UNCONFIRMED_STOPPED;
  1567. #if (CONFIG_USE_SEMAPHORES == 1)
  1568. xSemaphoreGive(self->stateLock);
  1569. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  1570. }
  1571. static void
  1572. MasterConnection_activate(MasterConnection self)
  1573. {
  1574. #if (CONFIG_USE_SEMAPHORES == 1)
  1575. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1576. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  1577. if (self->state != M_CON_STATE_STARTED) {
  1578. if (self->slave->connectionEventHandler) {
  1579. self->slave->connectionEventHandler(self->slave->connectionEventHandlerParameter, &(self->iMasterConnection), CS104_CON_EVENT_ACTIVATED);
  1580. }
  1581. }
  1582. self->state = M_CON_STATE_STARTED;
  1583. #if (CONFIG_USE_SEMAPHORES == 1)
  1584. xSemaphoreGive(self->stateLock);
  1585. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  1586. }
  1587. /**
  1588. * Activate connection and deactivate existing active connections if required
  1589. */
  1590. static void
  1591. CS104_Slave_activate(CS104_Slave self, MasterConnection connectionToActivate)
  1592. {
  1593. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  1594. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP) {
  1595. /* Deactivate all other connections */
  1596. #if (CONFIG_USE_SEMAPHORES == 1)
  1597. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  1598. #endif
  1599. int i;
  1600. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  1601. MasterConnection con = self->masterConnections[i];
  1602. if (con && con->isUsed) {
  1603. if (con != connectionToActivate)
  1604. MasterConnection_deactivate(con);
  1605. }
  1606. }
  1607. #if (CONFIG_USE_SEMAPHORES == 1)
  1608. xSemaphoreGive(self->openConnectionsLock);
  1609. #endif
  1610. }
  1611. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1) */
  1612. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  1613. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS) {
  1614. /* Deactivate all other connections of the same redundancy group */
  1615. #if (CONFIG_USE_SEMAPHORES == 1)
  1616. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  1617. #endif
  1618. int i;
  1619. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  1620. MasterConnection con = self->masterConnections[i];
  1621. if (con && con->isUsed) {
  1622. if (con->redundancyGroup == connectionToActivate->redundancyGroup) {
  1623. if (con != connectionToActivate)
  1624. MasterConnection_deactivate(con);
  1625. }
  1626. }
  1627. }
  1628. #if (CONFIG_USE_SEMAPHORES == 1)
  1629. xSemaphoreGive(self->openConnectionsLock);
  1630. #endif
  1631. }
  1632. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  1633. MasterConnection_activate(connectionToActivate);
  1634. }
  1635. /* unprotected version of sendSMessage */
  1636. static void
  1637. _sendSMessage(MasterConnection self)
  1638. {
  1639. uint8_t msg[6];
  1640. msg[0] = 0x68;
  1641. msg[1] = 0x04;
  1642. msg[2] = 0x01;
  1643. msg[3] = 0;
  1644. msg[4] = (uint8_t) ((self->receiveCount % 128) * 2);
  1645. msg[5] = (uint8_t) (self->receiveCount / 128);
  1646. if (writeToSocket(self, msg, 6) < 0)
  1647. self->isRunning = false;
  1648. }
  1649. static void
  1650. sendSMessage(MasterConnection self)
  1651. {
  1652. #if (CONFIG_USE_SEMAPHORES == 1)
  1653. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1654. #endif
  1655. _sendSMessage(self);
  1656. #if (CONFIG_USE_SEMAPHORES == 1)
  1657. xSemaphoreGive(self->stateLock);
  1658. #endif
  1659. }
  1660. static bool
  1661. MessageQueue_hasUnconfirmedIMessages(MessageQueue self)
  1662. {
  1663. bool retVal = false;
  1664. if (self->entryCounter != 0)
  1665. {
  1666. uint8_t* entryPtr = self->firstEntry;
  1667. struct sMessageQueueEntryInfo entryInfo;
  1668. while (entryPtr)
  1669. {
  1670. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  1671. if (entryInfo.entryState == QUEUE_ENTRY_STATE_SENT_BUT_NOT_CONFIRMED)
  1672. {
  1673. retVal = true;
  1674. break;
  1675. }
  1676. if (entryPtr == self->lastEntry)
  1677. break;
  1678. /* move to next entry */
  1679. if (entryPtr == self->lastInBufferEntry)
  1680. entryPtr = self->buffer;
  1681. else
  1682. entryPtr = entryPtr + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  1683. }
  1684. }
  1685. return retVal;
  1686. }
  1687. static bool
  1688. HighPriorityASDUQueue_hasUnconfirmedIMessages(HighPriorityASDUQueue self)
  1689. {
  1690. bool retVal = false;
  1691. if (self->entryCounter != 0)
  1692. {
  1693. uint8_t* entryPtr = self->firstEntry;
  1694. struct sMessageQueueEntryInfo entryInfo;
  1695. while (entryPtr)
  1696. {
  1697. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  1698. if (entryInfo.entryState == QUEUE_ENTRY_STATE_SENT_BUT_NOT_CONFIRMED)
  1699. {
  1700. retVal = true;
  1701. break;
  1702. }
  1703. if (entryPtr == self->lastEntry)
  1704. break;
  1705. /* move to next entry */
  1706. if (entryPtr == self->lastInBufferEntry)
  1707. entryPtr = self->buffer;
  1708. else
  1709. entryPtr = entryPtr + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  1710. }
  1711. }
  1712. return retVal;
  1713. }
  1714. static bool
  1715. MasterConnection_hasUnconfirmedMessages(MasterConnection self)
  1716. {
  1717. bool retVal = false;
  1718. if (self->lowPrioQueue)
  1719. {
  1720. if (MessageQueue_hasUnconfirmedIMessages(self->lowPrioQueue))
  1721. return true;
  1722. if (HighPriorityASDUQueue_hasUnconfirmedIMessages(self->highPrioQueue))
  1723. return true;
  1724. }
  1725. return retVal;
  1726. }
  1727. static bool
  1728. handleMessage(MasterConnection self, uint8_t* buffer, int msgSize)
  1729. {
  1730. uint64_t currentTime = Hal_getTimeInMs();
  1731. if (msgSize >= 3)
  1732. {
  1733. if (buffer[0] != 0x68) {
  1734. printf("CS104 SLAVE: Invalid START character!");
  1735. return false;
  1736. }
  1737. uint8_t lengthOfApdu = buffer[1];
  1738. if (lengthOfApdu != msgSize - 2) {
  1739. printf("CS104 SLAVE: Invalid length of APDU");
  1740. return false;
  1741. }
  1742. if ((buffer[2] & 1) == 0) /* I message */
  1743. {
  1744. if (msgSize < 7) {
  1745. printf("CS104 SLAVE: Received I msg too small!");
  1746. return false;
  1747. }
  1748. #if (CONFIG_USE_SEMAPHORES == 1)
  1749. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1750. #endif
  1751. if (self->state != M_CON_STATE_STARTED)
  1752. {
  1753. printf("CS104 SLAVE: Received I message while connection not active -> close connection");
  1754. #if (CONFIG_USE_SEMAPHORES == 1)
  1755. xSemaphoreGive(self->stateLock);
  1756. #endif
  1757. return false;
  1758. }
  1759. if (self->timeoutT2Triggered == false) {
  1760. self->timeoutT2Triggered = true;
  1761. self->lastConfirmationTime = currentTime; /* start timeout T2 */
  1762. }
  1763. #if (CONFIG_USE_SEMAPHORES == 1)
  1764. xSemaphoreGive(self->stateLock);
  1765. #endif
  1766. int frameSendSequenceNumber = ((buffer [3] * 0x100) + (buffer [2] & 0xfe)) / 2;
  1767. int frameRecvSequenceNumber = ((buffer [5] * 0x100) + (buffer [4] & 0xfe)) / 2;
  1768. printf("CS104 SLAVE: Received I frame: N(S) = %i N(R) = %i\n", frameSendSequenceNumber, frameRecvSequenceNumber);
  1769. #if (CONFIG_USE_SEMAPHORES == 1)
  1770. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1771. #endif
  1772. if (frameSendSequenceNumber != self->receiveCount) {
  1773. #if (CONFIG_USE_SEMAPHORES == 1)
  1774. xSemaphoreGive(self->stateLock);
  1775. #endif
  1776. printf("CS104 SLAVE: Sequence error - close connection");
  1777. return false;
  1778. }
  1779. #if (CONFIG_USE_SEMAPHORES == 1)
  1780. xSemaphoreGive(self->stateLock);
  1781. #endif
  1782. if (checkSequenceNumber (self, frameRecvSequenceNumber) == false) {
  1783. printf("CS104 SLAVE: Sequence number check failed - close connection");
  1784. return false;
  1785. }
  1786. #if (CONFIG_USE_SEMAPHORES == 1)
  1787. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1788. #endif
  1789. self->receiveCount = (self->receiveCount + 1) % 32768;
  1790. self->unconfirmedReceivedIMessages++;
  1791. #if (CONFIG_USE_SEMAPHORES == 1)
  1792. xSemaphoreGive(self->stateLock);
  1793. #endif
  1794. if (MasterConnection_isActive(self))
  1795. {
  1796. CS101_ASDU asdu = CS101_ASDU_createFromBuffer(&(self->slave->alParameters), buffer + 6, msgSize - 6);
  1797. if (asdu)
  1798. {
  1799. bool validAsdu = handleASDU(self, asdu);
  1800. CS101_ASDU_destroy(asdu);
  1801. if (validAsdu == false) {
  1802. printf("CS104 SLAVE: ASDU corrupted");
  1803. return false;
  1804. }
  1805. }
  1806. else {
  1807. printf("CS104 SLAVE: Invalid ASDU");
  1808. return false;
  1809. }
  1810. }
  1811. else {
  1812. printf("CS104 SLAVE: Received I message while connection not activate -> close connection");
  1813. return false;
  1814. }
  1815. }
  1816. /* Check for TESTFR_ACT message */
  1817. else if ((buffer[2] & 0x43) == 0x43) {
  1818. printf("CS104 SLAVE: Send TESTFR_CON\n");
  1819. if (writeToSocket(self, TESTFR_CON_MSG, TESTFR_CON_MSG_SIZE) < 0)
  1820. return false;
  1821. }
  1822. /* Check for STARTDT_ACT message */
  1823. else if ((buffer [2] & 0x07) == 0x07) {
  1824. CS104_Slave_activate(self->slave, self);
  1825. HighPriorityASDUQueue_resetConnectionQueue(self->highPrioQueue);
  1826. printf("CS104 SLAVE: Send STARTDT_CON\n");
  1827. if (writeToSocket(self, STARTDT_CON_MSG, STARTDT_CON_MSG_SIZE) < 0)
  1828. return false;
  1829. }
  1830. /* Check for STOPDT_ACT message */
  1831. else if ((buffer [2] & 0x13) == 0x13)
  1832. {
  1833. printf("CS104 SLAVE: Received STARTDT_ACT\n");
  1834. MasterConnection_deactivate(self);
  1835. /* Send S-Message to confirm all outstanding messages */
  1836. #if (CONFIG_USE_SEMAPHORES == 1)
  1837. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1838. #endif
  1839. self->lastConfirmationTime = Hal_getTimeInMs();
  1840. self->unconfirmedReceivedIMessages = 0;
  1841. self->timeoutT2Triggered = false;
  1842. _sendSMessage(self);
  1843. #if (CONFIG_USE_SEMAPHORES == 1)
  1844. xSemaphoreGive(self->stateLock);
  1845. #endif
  1846. if(MasterConnection_hasUnconfirmedMessages(self)) {
  1847. printf("CS104 SLAVE: Unconfirmed messages after STOPDT_ACT -> pending unconfirmed stopped state\n");
  1848. }
  1849. else
  1850. {
  1851. printf("CS104 SLAVE: Send STOPDT_CON\n");
  1852. self->state = M_CON_STATE_STOPPED;
  1853. if (writeToSocket(self, STOPDT_CON_MSG, STOPDT_CON_MSG_SIZE) < 0)
  1854. {
  1855. #if (CONFIG_USE_SEMAPHORES == 1)
  1856. xSemaphoreGive(self->stateLock);
  1857. #endif
  1858. return false;
  1859. }
  1860. }
  1861. #if (CONFIG_USE_SEMAPHORES == 1)
  1862. xSemaphoreGive(self->stateLock);
  1863. #endif
  1864. }
  1865. /* Check for TESTFR_CON message */
  1866. else if ((buffer[2] & 0x83) == 0x83)
  1867. {
  1868. printf("CS104 SLAVE: Recv TESTFR_CON\n");
  1869. #if (CONFIG_USE_SEMAPHORES == 1)
  1870. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1871. #endif
  1872. self->waitingForTestFRcon = false;
  1873. #if (CONFIG_USE_SEMAPHORES == 1)
  1874. xSemaphoreGive(self->stateLock);
  1875. #endif
  1876. }
  1877. else if (buffer [2] == 0x01) /* S-message */
  1878. {
  1879. int seqNo = (buffer[4] + buffer[5] * 0x100) / 2;
  1880. printf("CS104 SLAVE: Rcvd S(%i) (own sendcounter = %i)\n", seqNo, self->sendCount);
  1881. if (checkSequenceNumber(self, seqNo) == false) {
  1882. printf("CS104 SLAVE: S message - sequence number mismatch");
  1883. return false;
  1884. }
  1885. if (self->state == M_CON_STATE_UNCONFIRMED_STOPPED)
  1886. {
  1887. if (MasterConnection_hasUnconfirmedMessages(self) == false)
  1888. {
  1889. self->state = M_CON_STATE_STOPPED;
  1890. printf("CS104 SLAVE: Send STOPDT_CON\n");
  1891. if (writeToSocket(self, STOPDT_CON_MSG, STOPDT_CON_MSG_SIZE) < 0)
  1892. return false;
  1893. }
  1894. }
  1895. else if (self->state == M_CON_STATE_STOPPED)
  1896. {
  1897. printf("CS104 SLAVE: S message in stopped state -> active close\n");
  1898. /* actively close connection */
  1899. return false;
  1900. }
  1901. }
  1902. else {
  1903. printf("CS104 SLAVE: unknown message - IGNORE\n");
  1904. return true;
  1905. }
  1906. resetT3Timeout(self, currentTime);
  1907. return true;
  1908. }
  1909. else {
  1910. printf("CS104 SLAVE: Invalid message (too small)");
  1911. return false;
  1912. }
  1913. }
  1914. static bool
  1915. checkT3Timeout(MasterConnection self, uint64_t currentTime)
  1916. {
  1917. bool retVal = false;
  1918. #if (CONFIG_USE_SEMAPHORES == 1)
  1919. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1920. #endif
  1921. if (self->waitingForTestFRcon)
  1922. goto exit_function;
  1923. if (self->nextT3Timeout > (currentTime + (uint64_t) (self->slave->conParameters.t3 * 1000))) {
  1924. /* timeout value not plausible (maybe system time changed) */
  1925. _resetT3Timeout(self, currentTime);
  1926. }
  1927. if (currentTime > self->nextT3Timeout)
  1928. retVal = true;
  1929. exit_function:
  1930. #if (CONFIG_USE_SEMAPHORES == 1)
  1931. xSemaphoreGive(self->stateLock);
  1932. #endif
  1933. return retVal;
  1934. }
  1935. static void
  1936. resetTestFRConTimeout(MasterConnection self, uint64_t currentTime)
  1937. {
  1938. self->nextTestFRConTimeout = currentTime + (uint64_t) (self->slave->conParameters.t1 * 1000);
  1939. }
  1940. static bool
  1941. checkTestFRConTimeout(MasterConnection self, uint64_t currentTime)
  1942. {
  1943. if (self->nextTestFRConTimeout > (currentTime + (uint64_t) (self->slave->conParameters.t1 * 1000))) {
  1944. /* timeout value not plausible (maybe system time changed) */
  1945. resetTestFRConTimeout(self, currentTime);
  1946. }
  1947. if (currentTime > self->nextTestFRConTimeout)
  1948. return true;
  1949. else
  1950. return false;
  1951. }
  1952. static bool
  1953. handleTimeouts(MasterConnection self)
  1954. {
  1955. uint64_t currentTime = Hal_getTimeInMs();
  1956. bool timeoutsOk = true;
  1957. /* check T3 timeout */
  1958. if (checkT3Timeout(self, currentTime)) {
  1959. if (writeToSocket(self, TESTFR_ACT_MSG, TESTFR_ACT_MSG_SIZE) < 0) {
  1960. printf("CS104 SLAVE: Failed to write TESTFR ACT message\n");
  1961. #if (CONFIG_USE_SEMAPHORES == 1)
  1962. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1963. #endif
  1964. self->isRunning = false;
  1965. #if (CONFIG_USE_SEMAPHORES == 1)
  1966. xSemaphoreGive(self->stateLock);
  1967. #endif
  1968. }
  1969. #if (CONFIG_USE_SEMAPHORES == 1)
  1970. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1971. #endif
  1972. self->waitingForTestFRcon = true;
  1973. resetTestFRConTimeout(self, currentTime);
  1974. #if (CONFIG_USE_SEMAPHORES == 1)
  1975. xSemaphoreGive(self->stateLock);
  1976. #endif
  1977. }
  1978. #if (CONFIG_USE_SEMAPHORES == 1)
  1979. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  1980. #endif
  1981. /* Check for TEST FR con timeout */
  1982. if (self->waitingForTestFRcon) {
  1983. if (checkTestFRConTimeout(self, currentTime)) {
  1984. printf("CS104 SLAVE: Timeout for TESTFR CON message\n");
  1985. /* close connection */
  1986. timeoutsOk = false;
  1987. }
  1988. }
  1989. /* check timeout for others station I messages */
  1990. if (self->unconfirmedReceivedIMessages > 0) {
  1991. /* Check validity of last confirmation time */
  1992. if (self->lastConfirmationTime != 0xffffffffffffffffULL && self->lastConfirmationTime > currentTime) {
  1993. /* last confirmation time is in the future (maybe caused by system time change) */
  1994. self->lastConfirmationTime = currentTime;
  1995. }
  1996. if (currentTime > self->lastConfirmationTime) {
  1997. if ((currentTime - self->lastConfirmationTime) >= (uint64_t) (self->slave->conParameters.t2 * 1000)) {
  1998. self->lastConfirmationTime = currentTime;
  1999. self->unconfirmedReceivedIMessages = 0;
  2000. self->timeoutT2Triggered = false;
  2001. _sendSMessage(self);
  2002. }
  2003. }
  2004. }
  2005. #if (CONFIG_USE_SEMAPHORES == 1)
  2006. xSemaphoreGive(self->stateLock);
  2007. #endif
  2008. #if (CONFIG_USE_SEMAPHORES == 1)
  2009. xSemaphoreTake(self->sentASDUsLock, portMAX_DELAY);
  2010. #endif
  2011. /* check if counterpart confirmed I message */
  2012. if (self->oldestSentASDU != -1) {
  2013. /* check validity of sent time */
  2014. if (self->sentASDUs[self->oldestSentASDU].sentTime > currentTime) {
  2015. /* sent time is in the future (maybe caused by system time change) */
  2016. self->sentASDUs[self->oldestSentASDU].sentTime = currentTime;
  2017. }
  2018. if (currentTime > self->sentASDUs[self->oldestSentASDU].sentTime) {
  2019. if ((currentTime - self->sentASDUs[self->oldestSentASDU].sentTime) >= (uint64_t) (self->slave->conParameters.t1 * 1000)) {
  2020. timeoutsOk = false;
  2021. printSendBuffer(self);
  2022. printf("CS104 SLAVE: I message timeout for %i seqNo: %i\n", self->oldestSentASDU,
  2023. self->sentASDUs[self->oldestSentASDU].seqNo);
  2024. }
  2025. }
  2026. }
  2027. #if (CONFIG_USE_SEMAPHORES == 1)
  2028. xSemaphoreGive(self->sentASDUsLock);
  2029. #endif
  2030. return timeoutsOk;
  2031. }
  2032. static bool
  2033. HighPriorityASDUQueue_isAsduAvailable(HighPriorityASDUQueue self)
  2034. {
  2035. #if (CONFIG_USE_SEMAPHORES == 1)
  2036. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2037. #endif
  2038. bool retVal;
  2039. if (self->entryCounter > 0)
  2040. retVal = true;
  2041. else
  2042. retVal = false;
  2043. #if (CONFIG_USE_SEMAPHORES == 1)
  2044. xSemaphoreGive(self->queueLock);
  2045. #endif
  2046. return retVal;
  2047. }
  2048. static void
  2049. HighPriorityASDUQueue_lock(HighPriorityASDUQueue self)
  2050. {
  2051. #if (CONFIG_USE_SEMAPHORES == 1)
  2052. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2053. #endif
  2054. }
  2055. static void
  2056. HighPriorityASDUQueue_unlock(HighPriorityASDUQueue self)
  2057. {
  2058. #if (CONFIG_USE_SEMAPHORES == 1)
  2059. xSemaphoreGive(self->queueLock);
  2060. #endif
  2061. }
  2062. static uint8_t*
  2063. HighPriorityASDUQueue_getNextASDU(HighPriorityASDUQueue self, int* size)
  2064. {
  2065. uint8_t* buffer = NULL;
  2066. if (self->entryCounter > 0) {
  2067. self->entryCounter--;
  2068. uint16_t msgSize;
  2069. memcpy(&msgSize, self->firstEntry, 2);
  2070. *size = (int) msgSize;
  2071. buffer = self->firstEntry + 2;
  2072. if (self->entryCounter > 0) {
  2073. if (self->firstEntry == self->lastEntry) {
  2074. self->firstEntry = NULL;
  2075. self->lastEntry = NULL;
  2076. self->lastInBufferEntry = NULL;
  2077. }
  2078. else {
  2079. if (self->firstEntry == self->lastInBufferEntry) {
  2080. self->firstEntry = self->buffer;
  2081. self->lastInBufferEntry = self->lastEntry;
  2082. }
  2083. else {
  2084. self->firstEntry = self->firstEntry + 2 + msgSize;
  2085. }
  2086. }
  2087. }
  2088. }
  2089. return buffer;
  2090. }
  2091. static bool
  2092. sendNextHighPriorityASDU(MasterConnection self)
  2093. {
  2094. bool retVal = false;
  2095. uint8_t* buffer = NULL;
  2096. int msgSize = 0;
  2097. #if (CONFIG_USE_SEMAPHORES == 1)
  2098. xSemaphoreTake(self->sentASDUsLock, portMAX_DELAY);
  2099. #endif
  2100. if (isSentBufferFull(self))
  2101. goto exit_function;
  2102. HighPriorityASDUQueue_lock(self->highPrioQueue);
  2103. buffer = HighPriorityASDUQueue_getNextASDU(self->highPrioQueue, &msgSize);
  2104. if (buffer) {
  2105. memcpy(self->sendBuffer + IEC60870_5_104_APCI_LENGTH, buffer, msgSize);
  2106. msgSize += IEC60870_5_104_APCI_LENGTH;
  2107. sendASDU(self, self->sendBuffer, msgSize, 0, NULL);
  2108. retVal = true;
  2109. }
  2110. HighPriorityASDUQueue_unlock(self->highPrioQueue);
  2111. exit_function:
  2112. #if (CONFIG_USE_SEMAPHORES == 1)
  2113. xSemaphoreGive(self->sentASDUsLock);
  2114. #endif
  2115. return retVal;
  2116. }
  2117. static void
  2118. MessageQueue_lock(MessageQueue self)
  2119. {
  2120. #if (CONFIG_USE_SEMAPHORES == 1)
  2121. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2122. #endif
  2123. }
  2124. static void
  2125. MessageQueue_unlock(MessageQueue self)
  2126. {
  2127. #if (CONFIG_USE_SEMAPHORES == 1)
  2128. xSemaphoreGive(self->queueLock);
  2129. #endif
  2130. }
  2131. static uint8_t*
  2132. MessageQueue_getNextWaitingASDU(MessageQueue self, uint64_t* entryId, uint8_t** queueEntry, int* size)
  2133. {
  2134. uint8_t* buffer = NULL;
  2135. if (self->entryCounter != 0)
  2136. {
  2137. uint8_t* entryPtr = self->firstEntry;
  2138. struct sMessageQueueEntryInfo entryInfo;
  2139. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  2140. while (entryInfo.entryState != QUEUE_ENTRY_STATE_WAITING_FOR_TRANSMISSION)
  2141. {
  2142. if (entryPtr == self->lastEntry)
  2143. break;
  2144. /* move to next entry */
  2145. if (entryPtr == self->lastInBufferEntry)
  2146. entryPtr = self->buffer;
  2147. else
  2148. entryPtr = entryPtr + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  2149. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  2150. }
  2151. if (entryInfo.entryState == QUEUE_ENTRY_STATE_WAITING_FOR_TRANSMISSION)
  2152. {
  2153. *entryId = entryInfo.entryId;
  2154. *queueEntry = entryPtr;
  2155. entryInfo.entryState = QUEUE_ENTRY_STATE_SENT_BUT_NOT_CONFIRMED;
  2156. memcpy(entryPtr, &entryInfo, sizeof(struct sMessageQueueEntryInfo));
  2157. buffer = entryPtr + sizeof(struct sMessageQueueEntryInfo);
  2158. *size = entryInfo.size;
  2159. }
  2160. }
  2161. return buffer;
  2162. }
  2163. static void
  2164. sendNextLowPriorityASDU(MasterConnection self)
  2165. {
  2166. #if (CONFIG_USE_SEMAPHORES == 1)
  2167. xSemaphoreTake(self->sentASDUsLock, portMAX_DELAY);
  2168. #endif
  2169. uint8_t* asduBuffer;
  2170. if (isSentBufferFull(self))
  2171. goto exit_function;
  2172. MessageQueue_lock(self->lowPrioQueue);
  2173. uint64_t entryId;
  2174. uint8_t* queueEntry;
  2175. int msgSize;
  2176. asduBuffer = MessageQueue_getNextWaitingASDU(self->lowPrioQueue, &entryId, &queueEntry, &msgSize);
  2177. if (asduBuffer) {
  2178. memcpy(self->sendBuffer + IEC60870_5_104_APCI_LENGTH, asduBuffer, msgSize);
  2179. msgSize += IEC60870_5_104_APCI_LENGTH;
  2180. sendASDU(self, self->sendBuffer, msgSize, entryId, queueEntry);
  2181. }
  2182. MessageQueue_unlock(self->lowPrioQueue);
  2183. exit_function:
  2184. #if (CONFIG_USE_SEMAPHORES == 1)
  2185. xSemaphoreGive(self->sentASDUsLock);
  2186. #endif
  2187. return;
  2188. }
  2189. static bool
  2190. MessageQueue_isAsduAvailable(MessageQueue self)
  2191. {
  2192. #if (CONFIG_USE_SEMAPHORES == 1)
  2193. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2194. #endif
  2195. bool retVal;
  2196. if (self->entryCounter > 0)
  2197. retVal = true;
  2198. else
  2199. retVal = false;
  2200. #if (CONFIG_USE_SEMAPHORES == 1)
  2201. xSemaphoreGive(self->queueLock);
  2202. #endif
  2203. return retVal;
  2204. }
  2205. /**
  2206. * Send all high-priority ASDUs and the last waiting ASDU from the low-priority queue.
  2207. * Returns true if ASDUs are still waiting. This can happen when there are more ASDUs
  2208. * in the event (low-priority) buffer, or the connection is unavailable to send the high-priority
  2209. * ASDUs (congestion or connection lost).
  2210. */
  2211. static bool
  2212. sendWaitingASDUs(MasterConnection self)
  2213. {
  2214. /* send all available high priority ASDUs first */
  2215. while (HighPriorityASDUQueue_isAsduAvailable(self->highPrioQueue)) {
  2216. if (sendNextHighPriorityASDU(self) == false)
  2217. return true;
  2218. if (MasterConnection_isRunning(self) == false)
  2219. return true;
  2220. }
  2221. /* send messages from low-priority queue */
  2222. sendNextLowPriorityASDU(self);
  2223. if (MessageQueue_isAsduAvailable(self->lowPrioQueue))
  2224. return true;
  2225. else
  2226. return false;
  2227. }
  2228. static void
  2229. MessageQueue_setWaitingForTransmissionWhenNotConfirmed(MessageQueue self)
  2230. {
  2231. #if (CONFIG_USE_SEMAPHORES == 1)
  2232. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2233. #endif
  2234. if (self->entryCounter != 0)
  2235. {
  2236. uint8_t* entryPtr = self->firstEntry;
  2237. struct sMessageQueueEntryInfo entryInfo;
  2238. while (entryPtr)
  2239. {
  2240. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  2241. if (entryInfo.entryState == QUEUE_ENTRY_STATE_SENT_BUT_NOT_CONFIRMED) {
  2242. entryInfo.entryState = QUEUE_ENTRY_STATE_WAITING_FOR_TRANSMISSION;
  2243. }
  2244. memcpy(entryPtr, &entryInfo, sizeof(struct sMessageQueueEntryInfo));
  2245. if (entryPtr == self->lastEntry)
  2246. break;
  2247. /* move to next entry */
  2248. if (entryPtr == self->lastInBufferEntry)
  2249. entryPtr = self->buffer;
  2250. else
  2251. entryPtr = entryPtr + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  2252. }
  2253. }
  2254. #if (CONFIG_USE_SEMAPHORES == 1)
  2255. xSemaphoreGive(self->queueLock);
  2256. #endif
  2257. }
  2258. void
  2259. Handleset_reset(HandleSet self)
  2260. {
  2261. if (self) {
  2262. if (self->sockets) {
  2263. LinkedList_destroyStatic(self->sockets);
  2264. self->sockets = LinkedList_create();
  2265. self->pollfdIsUpdated = false;
  2266. }
  2267. }
  2268. }
  2269. void
  2270. Handleset_addSocket(HandleSet self, const Socket sock)
  2271. {
  2272. if (self != NULL && sock != NULL && sock->fd != -1) {
  2273. LinkedList_add(self->sockets, sock);
  2274. self->pollfdIsUpdated = false;
  2275. }
  2276. }
  2277. static void
  2278. connectionHandlingThread(void* parameter)
  2279. {
  2280. MasterConnection self = (MasterConnection) parameter;
  2281. resetT3Timeout(self, Hal_getTimeInMs());
  2282. bool isAsduWaiting = false;
  2283. if (self->slave->connectionEventHandler) {
  2284. self->slave->connectionEventHandler(self->slave->connectionEventHandlerParameter, &(self->iMasterConnection), CS104_CON_EVENT_CONNECTION_OPENED);
  2285. }
  2286. while (MasterConnection_isRunning(self))
  2287. {
  2288. Handleset_reset(self->handleSet);
  2289. Handleset_addSocket(self->handleSet, self->socket);
  2290. int socketTimeout;
  2291. /*
  2292. * When an ASDU is waiting only have a short look to see if a client request
  2293. * was received. Otherwise wait to save CPU time.
  2294. */
  2295. if (isAsduWaiting)
  2296. socketTimeout = 1;
  2297. else
  2298. socketTimeout = 100;
  2299. if (Handleset_waitReady(self->handleSet, socketTimeout))
  2300. {
  2301. int bytesRec = receiveMessage(self);
  2302. if (bytesRec == -1) {
  2303. printf("CS104 SLAVE: Error reading from socket\n");
  2304. break;
  2305. }
  2306. if (bytesRec > 0)
  2307. {
  2308. printf("CS104 SLAVE: Connection: rcvd msg(%i bytes)\n", bytesRec);
  2309. if (self->slave->rawMessageHandler)
  2310. self->slave->rawMessageHandler(self->slave->rawMessageHandlerParameter,
  2311. &(self->iMasterConnection), self->recvBuffer, bytesRec, false);
  2312. if (handleMessage(self, self->recvBuffer, bytesRec) == false)
  2313. {
  2314. #if (CONFIG_USE_SEMAPHORES == 1)
  2315. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2316. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2317. self->isRunning = false;
  2318. #if (CONFIG_USE_SEMAPHORES == 1)
  2319. xSemaphoreGive(self->stateLock);
  2320. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2321. }
  2322. if (self->unconfirmedReceivedIMessages >= self->slave->conParameters.w)
  2323. {
  2324. self->lastConfirmationTime = Hal_getTimeInMs();
  2325. self->unconfirmedReceivedIMessages = 0;
  2326. self->timeoutT2Triggered = false;
  2327. sendSMessage(self);
  2328. }
  2329. }
  2330. }
  2331. if (handleTimeouts(self) == false) {
  2332. #if (CONFIG_USE_SEMAPHORES == 1)
  2333. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2334. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2335. self->isRunning = false;
  2336. #if (CONFIG_USE_SEMAPHORES == 1)
  2337. xSemaphoreGive(self->stateLock);
  2338. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2339. }
  2340. if (MasterConnection_isRunning(self))
  2341. {
  2342. if (MasterConnection_isActive(self))
  2343. {
  2344. isAsduWaiting = sendWaitingASDUs(self);
  2345. }
  2346. }
  2347. /* call plugins */
  2348. if (self->slave->plugins)
  2349. {
  2350. LinkedList pluginElem = LinkedList_getNext(self->slave->plugins);
  2351. while (pluginElem)
  2352. {
  2353. CS101_SlavePlugin plugin = (CS101_SlavePlugin) LinkedList_getData(pluginElem);
  2354. plugin->runTask(plugin->parameter, &(self->iMasterConnection));
  2355. pluginElem = LinkedList_getNext(pluginElem);
  2356. }
  2357. }
  2358. }
  2359. if (self->slave->connectionEventHandler) {
  2360. self->slave->connectionEventHandler(self->slave->connectionEventHandlerParameter, &(self->iMasterConnection), CS104_CON_EVENT_CONNECTION_CLOSED);
  2361. }
  2362. #if (CONFIG_USE_SEMAPHORES == 1)
  2363. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2364. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2365. self->isRunning = false;
  2366. #if (CONFIG_USE_SEMAPHORES == 1)
  2367. xSemaphoreGive(self->stateLock);
  2368. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2369. MessageQueue_setWaitingForTransmissionWhenNotConfirmed(self->lowPrioQueue);
  2370. }
  2371. static void
  2372. MasterConnection_deinit(MasterConnection self)
  2373. {
  2374. if (self)
  2375. {
  2376. #if (CONFIG_CS104_SUPPORT_TLS == 1)
  2377. if (self->tlsSocket != NULL)
  2378. TLSSocket_close(self->tlsSocket);
  2379. #endif
  2380. if (self->socket) {
  2381. Socket_destroy(self->socket);
  2382. self->socket = NULL;
  2383. }
  2384. self->state = M_CON_STATE_STOPPED;
  2385. }
  2386. }
  2387. #if (CONFIG_USE_THREADS == 1)
  2388. static void
  2389. MasterConnection_start(MasterConnection self)
  2390. {
  2391. if (self->connectionThread)
  2392. {
  2393. platform_thread_destroy(self->connectionThread);
  2394. self->connectionThread = NULL;
  2395. }
  2396. self->isRunning = true;
  2397. self->state = M_CON_STATE_STOPPED;
  2398. self->connectionThread =
  2399. platform_thread_init("masterConnect",connectionHandlingThread, self, 1024, 3, 50);
  2400. platform_thread_start(self->connectionThread);
  2401. }
  2402. #endif /* (CONFIG_USE_THREADS == 1) */
  2403. int
  2404. CS104_Slave_getOpenConnections(CS104_Slave self)
  2405. {
  2406. int openConnections;
  2407. #if (CONFIG_USE_SEMAPHORES == 1)
  2408. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2409. #endif
  2410. openConnections = self->openConnections;
  2411. #if (CONFIG_USE_SEMAPHORES == 1)
  2412. xSemaphoreGive(self->openConnectionsLock);
  2413. #endif
  2414. return openConnections;
  2415. }
  2416. #if (CONFIG_USE_THREADS == 1)
  2417. static void serverThread (void* parameter)
  2418. {
  2419. CS104_Slave self = (CS104_Slave) parameter;
  2420. if (self->localAddress)
  2421. self->serverSocket = TcpServerSocket_create(self->localAddress, self->tcpPort);
  2422. else
  2423. self->serverSocket = TcpServerSocket_create("0.0.0.0", self->tcpPort);
  2424. if (self->serverSocket == NULL) {
  2425. printf("CS104 SLAVE: Cannot create server socket\n");
  2426. #if (CONFIG_USE_SEMAPHORES == 1)
  2427. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2428. #endif
  2429. self->isStarting = false;
  2430. #if (CONFIG_USE_SEMAPHORES == 1)
  2431. xSemaphoreGive(self->stateLock);
  2432. #endif
  2433. goto exit_function;
  2434. }
  2435. ServerSocket_listen(self->serverSocket);
  2436. #if (CONFIG_USE_SEMAPHORES == 1)
  2437. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2438. #endif
  2439. self->isRunning = true;
  2440. self->isStarting = false;
  2441. #if (CONFIG_USE_SEMAPHORES == 1)
  2442. xSemaphoreGive(self->stateLock);
  2443. #endif
  2444. while (isStopRunningSet(self) == false) {
  2445. Socket newSocket = ServerSocket_accept(self->serverSocket);
  2446. if (newSocket != NULL) {
  2447. bool acceptConnection = true;
  2448. /* check if maximum number of open connections is reached */
  2449. if (self->maxOpenConnections > 0) {
  2450. if (CS104_Slave_getOpenConnections(self) >= self->maxOpenConnections)
  2451. acceptConnection = false;
  2452. }
  2453. if (acceptConnection)
  2454. acceptConnection = callConnectionRequestHandler(self, newSocket);
  2455. if (acceptConnection) {
  2456. MessageQueue lowPrioQueue = NULL;
  2457. HighPriorityASDUQueue highPrioQueue = NULL;
  2458. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  2459. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP) {
  2460. lowPrioQueue = self->asduQueue;
  2461. highPrioQueue = self->connectionAsduQueue;
  2462. }
  2463. #endif
  2464. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  2465. if (self->serverMode == CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP) {
  2466. lowPrioQueue = NULL;
  2467. highPrioQueue = NULL;
  2468. }
  2469. #endif
  2470. MasterConnection connection = NULL;
  2471. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  2472. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS) {
  2473. char ipAddress[60];
  2474. char* ipAddrStr = getPeerAddress(newSocket, ipAddress);
  2475. if (ipAddrStr) {
  2476. CS104_RedundancyGroup matchingGroup = getMatchingRedundancyGroup(self, ipAddrStr);
  2477. if (matchingGroup != NULL) {
  2478. #if (CONFIG_USE_SEMAPHORES)
  2479. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2480. #endif
  2481. connection = getFreeConnection(self);
  2482. if (connection) {
  2483. if (MasterConnection_initEx(connection, newSocket, matchingGroup)) {
  2484. self->openConnections++;
  2485. if (matchingGroup->name) {
  2486. printf("CS104 SLAVE: Add connection to group: %s\n", matchingGroup->name);
  2487. }
  2488. }
  2489. else {
  2490. connection->isUsed = false;
  2491. connection = NULL;
  2492. }
  2493. }
  2494. #if (CONFIG_USE_SEMAPHORES)
  2495. xSemaphoreGive(self->openConnectionsLock);
  2496. #endif
  2497. }
  2498. else {
  2499. printf("CS104 SLAVE: Found no matching redundancy group -> close connection\n");
  2500. }
  2501. }
  2502. else {
  2503. printf("CS104 SLAVE: cannot determine peer IP address -> close connection\n");
  2504. }
  2505. }
  2506. else {
  2507. #if (CONFIG_USE_SEMAPHORES)
  2508. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2509. #endif
  2510. connection = getFreeConnection(self);
  2511. if (connection) {
  2512. if (MasterConnection_init(connection, newSocket, lowPrioQueue, highPrioQueue)) {
  2513. self->openConnections++;
  2514. }
  2515. else {
  2516. connection->isUsed = false;
  2517. connection = NULL;
  2518. }
  2519. }
  2520. #if (CONFIG_USE_SEMAPHORES)
  2521. xSemaphoreGive(self->stateLock);
  2522. #endif
  2523. }
  2524. #else
  2525. #if (CONFIG_USE_SEMAPHORES)
  2526. Semaphore_wait(self->openConnectionsLock);
  2527. #endif
  2528. connection = getFreeConnection(self);
  2529. if (connection) {
  2530. if (MasterConnection_init(connection, newSocket, lowPrioQueue, highPrioQueue)) {
  2531. self->openConnections++;
  2532. }
  2533. else {
  2534. connection->isUsed = false;
  2535. connection = NULL;
  2536. }
  2537. }
  2538. #if (CONFIG_USE_SEMAPHORES)
  2539. Semaphore_post(self->openConnectionsLock);
  2540. #endif
  2541. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  2542. if (connection) {
  2543. /* now start the connection handling (thread) */
  2544. MasterConnection_start(connection);
  2545. }
  2546. else{
  2547. Socket_destroy(newSocket);
  2548. printf("CS104 SLAVE: Connection attempt failed!\n");
  2549. }
  2550. }
  2551. else {
  2552. Socket_destroy(newSocket);
  2553. }
  2554. }
  2555. else
  2556. delay_ms(10);
  2557. /* check if there are connections to close */
  2558. #if (CONFIG_USE_SEMAPHORES == 1)
  2559. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2560. #endif
  2561. int i;
  2562. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  2563. if (self->masterConnections[i])
  2564. {
  2565. MasterConnection connection = self->masterConnections[i];
  2566. #if (CONFIG_USE_SEMAPHORES == 1)
  2567. xSemaphoreTake(connection->stateLock, portMAX_DELAY);
  2568. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2569. bool isConnectionUsed = connection->isUsed;
  2570. #if (CONFIG_USE_SEMAPHORES == 1)
  2571. xSemaphoreGive(connection->stateLock);
  2572. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2573. if (isConnectionUsed) {
  2574. if (MasterConnection_isRunning(connection) == false) {
  2575. if (connection->connectionThread) {
  2576. platform_thread_destroy(connection->connectionThread);
  2577. #if (CONFIG_USE_SEMAPHORES == 1)
  2578. xSemaphoreTake(connection->stateLock, portMAX_DELAY);
  2579. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2580. connection->connectionThread = NULL;
  2581. #if (CONFIG_USE_SEMAPHORES == 1)
  2582. xSemaphoreGive(connection->stateLock);
  2583. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2584. }
  2585. MasterConnection_deinit(connection);
  2586. self->openConnections--;
  2587. #if (CONFIG_USE_SEMAPHORES == 1)
  2588. xSemaphoreTake(connection->stateLock, portMAX_DELAY);
  2589. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2590. connection->isUsed = false;
  2591. #if (CONFIG_USE_SEMAPHORES == 1)
  2592. xSemaphoreGive(connection->stateLock);
  2593. #endif /* (CONFIG_USE_SEMAPHORES == 1) */
  2594. }
  2595. }
  2596. }
  2597. }
  2598. #if (CONFIG_USE_SEMAPHORES == 1)
  2599. xSemaphoreGive(self->openConnectionsLock);
  2600. #endif
  2601. }
  2602. if (self->serverSocket)
  2603. Socket_destroy((Socket) self->serverSocket);
  2604. #if (CONFIG_USE_SEMAPHORES == 1)
  2605. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2606. #endif
  2607. self->isRunning = false;
  2608. self->stopRunning = false;
  2609. #if (CONFIG_USE_SEMAPHORES == 1)
  2610. xSemaphoreGive(self->stateLock);
  2611. #endif
  2612. exit_function:
  2613. return;
  2614. }
  2615. #endif /* (CONFIG_USE_THREADS == 1) */
  2616. void
  2617. CS104_Slave_start(CS104_Slave self)
  2618. {
  2619. #if ((CONFIG_USE_THREADS == 1) && (CONFIG_USE_SEMAPHORES == 1))
  2620. if (isRunning(self) == false) {
  2621. #if (CONFIG_USE_SEMAPHORES == 1)
  2622. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2623. #endif
  2624. self->isStarting = true;
  2625. self->stopRunning = false;
  2626. #if (CONFIG_USE_SEMAPHORES == 1)
  2627. xSemaphoreGive(self->stateLock);
  2628. #endif
  2629. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  2630. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP)
  2631. initializeMessageQueues(self, self->maxLowPrioQueueSize, self->maxHighPrioQueueSize);
  2632. #endif
  2633. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  2634. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS)
  2635. initializeRedundancyGroups(self, self->maxLowPrioQueueSize, self->maxHighPrioQueueSize);
  2636. #endif
  2637. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  2638. if (self->serverMode == CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP)
  2639. initializeConnectionSpecificQueues(self);
  2640. #endif
  2641. self->listeningThread = platform_thread_init("masterStart",serverThread, self, 2048, 2, 50);
  2642. platform_thread_start(self->listeningThread);
  2643. while (isStarting(self))
  2644. delay_ms(1);
  2645. }
  2646. #else
  2647. printf("CS104 SLAVE: ERROR: CS104_Slave_start not supported when CONFIG_USE_TREADS = 0 or CONFIG_USE_SEMAPHORES = 0!\n");
  2648. #endif
  2649. }
  2650. static int
  2651. MessageQueue_countEntriesUntilEndOfBuffer(MessageQueue self, uint8_t* firstEntry)
  2652. {
  2653. int count = 0;
  2654. uint8_t* entryPtr = firstEntry;
  2655. while (entryPtr) {
  2656. struct sMessageQueueEntryInfo entryInfo;
  2657. memcpy(&entryInfo, entryPtr, sizeof(struct sMessageQueueEntryInfo));
  2658. count++;
  2659. /* move to next entry */
  2660. if (entryPtr == self->lastInBufferEntry)
  2661. break;
  2662. else
  2663. entryPtr = entryPtr + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  2664. }
  2665. return count;
  2666. }
  2667. /**
  2668. * Add an ASDU to the queue. When queue is full, override oldest entry.
  2669. */
  2670. static void
  2671. MessageQueue_enqueueASDU(MessageQueue self, CS101_ASDU asdu)
  2672. {
  2673. int asduSize = asdu->asduHeaderLength + asdu->payloadSize;
  2674. if (asduSize > 256 - IEC60870_5_104_APCI_LENGTH) {
  2675. printf("CS104 SLAVE: ASDU too large!\n");
  2676. return;
  2677. }
  2678. int entrySize = sizeof(struct sMessageQueueEntryInfo) + asduSize;
  2679. #if (CONFIG_USE_SEMAPHORES == 1)
  2680. xSemaphoreTake(self->queueLock, portMAX_DELAY);
  2681. #endif
  2682. struct sMessageQueueEntryInfo entryInfo;
  2683. uint8_t* nextMsgPtr;
  2684. if (self->entryCounter == 0) {
  2685. self->firstEntry = self->buffer;
  2686. self->lastInBufferEntry = self->firstEntry;
  2687. nextMsgPtr = self->buffer;
  2688. }
  2689. else {
  2690. memcpy(&entryInfo, self->lastEntry, sizeof(struct sMessageQueueEntryInfo));
  2691. nextMsgPtr = self->lastEntry + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  2692. /* Check if ASDU fits into the buffer */
  2693. if (nextMsgPtr + entrySize > self->buffer + self->size) {
  2694. /* remove all entries from last entry to end of buffer */
  2695. if (nextMsgPtr <= self->firstEntry) {
  2696. self->entryCounter -= MessageQueue_countEntriesUntilEndOfBuffer(self, self->firstEntry);
  2697. self->firstEntry = self->buffer;
  2698. }
  2699. /* put new message at beginning of buffer */
  2700. nextMsgPtr = self->buffer;
  2701. if (self->lastEntry > self->firstEntry)
  2702. self->lastInBufferEntry = self->lastEntry;
  2703. }
  2704. if (nextMsgPtr <= self->firstEntry) {
  2705. /* remove old entries until we have enough space for the new ASDU */
  2706. while ((nextMsgPtr + entrySize > self->firstEntry) && (self->entryCounter > 0)) {
  2707. self->entryCounter--;
  2708. if (self->firstEntry == self->lastInBufferEntry) {
  2709. self->firstEntry = self->buffer;
  2710. self->lastInBufferEntry = nextMsgPtr;
  2711. break;
  2712. }
  2713. else {
  2714. memcpy(&entryInfo, self->firstEntry, sizeof(struct sMessageQueueEntryInfo));
  2715. self->firstEntry = self->firstEntry + sizeof(struct sMessageQueueEntryInfo) + entryInfo.size;
  2716. }
  2717. }
  2718. }
  2719. }
  2720. self->lastEntry = nextMsgPtr;
  2721. if (self->lastEntry > self->lastInBufferEntry)
  2722. self->lastInBufferEntry = self->lastEntry;
  2723. self->entryCounter++;
  2724. struct sBufferFrame bufferFrame;
  2725. Frame frame = BufferFrame_initialize(&bufferFrame, nextMsgPtr + sizeof(struct sMessageQueueEntryInfo), 0);
  2726. CS101_ASDU_encode(asdu, frame);
  2727. entryInfo.size = asduSize;
  2728. entryInfo.entryId = self->entryId++;
  2729. entryInfo.entryState = QUEUE_ENTRY_STATE_WAITING_FOR_TRANSMISSION;
  2730. memcpy(nextMsgPtr, &entryInfo, sizeof(struct sMessageQueueEntryInfo));
  2731. printf("CS104 SLAVE: ASDUs in FIFO: %i (new(size=%i/%i): %p, first: %p, last: %p lastInBuf: %p)\n", self->entryCounter, entrySize, asduSize, nextMsgPtr,
  2732. self->firstEntry, self->lastEntry, self->lastInBufferEntry);
  2733. #if (CONFIG_USE_SEMAPHORES == 1)
  2734. xSemaphoreGive(self->queueLock);
  2735. #endif
  2736. }
  2737. void
  2738. CS104_Slave_enqueueASDU(CS104_Slave self, CS101_ASDU asdu)
  2739. {
  2740. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  2741. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP)
  2742. MessageQueue_enqueueASDU(self->asduQueue, asdu);
  2743. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1) */
  2744. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  2745. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS) {
  2746. /************************************************
  2747. * Dispatch event to all redundancy groups
  2748. ************************************************/
  2749. LinkedList element = LinkedList_getNext(self->redundancyGroups);
  2750. while (element) {
  2751. CS104_RedundancyGroup group = (CS104_RedundancyGroup) LinkedList_getData(element);
  2752. MessageQueue_enqueueASDU(group->asduQueue, asdu);
  2753. element = LinkedList_getNext(element);
  2754. }
  2755. }
  2756. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  2757. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  2758. if (self->serverMode == CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP) {
  2759. #if (CONFIG_USE_SEMAPHORES == 1)
  2760. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2761. #endif
  2762. /************************************************
  2763. * Dispatch event to all open client connections
  2764. ************************************************/
  2765. int i;
  2766. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  2767. MasterConnection con = self->masterConnections[i];
  2768. if (con)
  2769. MessageQueue_enqueueASDU(con->lowPrioQueue, asdu);
  2770. }
  2771. #if (CONFIG_USE_SEMAPHORES == 1)
  2772. xSemaphoreGive(self->openConnectionsLock);
  2773. #endif
  2774. }
  2775. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1) */
  2776. }
  2777. static void
  2778. CS104_Slave_closeAllConnections(CS104_Slave self)
  2779. {
  2780. #if (CONFIG_USE_SEMAPHORES)
  2781. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2782. #endif
  2783. int i;
  2784. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  2785. if (self->masterConnections[i]) {
  2786. if (self->masterConnections[i]->isUsed) {
  2787. self->masterConnections[i]->isUsed = false;
  2788. MasterConnection_deinit(self->masterConnections[i]);
  2789. }
  2790. }
  2791. }
  2792. self->openConnections = 0;
  2793. #if (CONFIG_USE_SEMAPHORES)
  2794. xSemaphoreGive(self->openConnectionsLock);
  2795. #endif
  2796. }
  2797. void
  2798. CS104_Slave_stopThreadless(CS104_Slave self)
  2799. {
  2800. self->isRunning = false;
  2801. if (self->serverSocket) {
  2802. ServerSocket_destroy(self->serverSocket);
  2803. self->serverSocket = NULL;
  2804. }
  2805. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  2806. if (self->serverMode == CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP) {
  2807. deleteConnectionSpecificQueues(self);
  2808. }
  2809. #endif
  2810. CS104_Slave_closeAllConnections(self);
  2811. }
  2812. void
  2813. CS104_Slave_stop(CS104_Slave self)
  2814. {
  2815. #if (CONFIG_USE_THREADS == 1)
  2816. if (self->isThreadlessMode) {
  2817. #endif
  2818. CS104_Slave_stopThreadless(self);
  2819. #if (CONFIG_USE_THREADS == 1)
  2820. }
  2821. else {
  2822. if (isRunning(self)) {
  2823. #if (CONFIG_USE_SEMAPHORES == 1)
  2824. xSemaphoreTake(self->stateLock, portMAX_DELAY);
  2825. #endif
  2826. self->stopRunning = true;
  2827. #if (CONFIG_USE_SEMAPHORES == 1)
  2828. xSemaphoreGive(self->stateLock);
  2829. #endif
  2830. while (isRunning(self))
  2831. delay_ms(1);
  2832. }
  2833. if (self->listeningThread) {
  2834. platform_thread_destroy(self->listeningThread);
  2835. }
  2836. /*
  2837. * Stop all connections
  2838. * */
  2839. {
  2840. int i;
  2841. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  2842. #if (CONFIG_USE_SEMAPHORES == 1)
  2843. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2844. #endif
  2845. MasterConnection connection = self->masterConnections[i];
  2846. if (connection) {
  2847. #if (CONFIG_USE_SEMAPHORES == 1)
  2848. xSemaphoreTake(connection->stateLock, portMAX_DELAY);
  2849. #endif
  2850. bool isUsed = connection->isUsed;
  2851. #if (CONFIG_USE_SEMAPHORES == 1)
  2852. xSemaphoreGive(connection->stateLock);
  2853. #endif
  2854. if (isUsed) {
  2855. MasterConnection_close(connection);
  2856. #if (CONFIG_USE_THREADS == 1)
  2857. if (connection->connectionThread) {
  2858. #if (CONFIG_USE_SEMAPHORES == 1)
  2859. xSemaphoreGive(self->openConnectionsLock);
  2860. #endif
  2861. platform_thread_destroy(connection->connectionThread);
  2862. #if (CONFIG_USE_SEMAPHORES == 1)
  2863. xSemaphoreTake(self->openConnectionsLock, portMAX_DELAY);
  2864. #endif
  2865. MasterConnection_deinit(connection);
  2866. connection->connectionThread = NULL;
  2867. }
  2868. #endif /* (CONFIG_USE_THREADS == 1) */
  2869. self->openConnections--;
  2870. }
  2871. }
  2872. #if (CONFIG_USE_SEMAPHORES == 1)
  2873. xSemaphoreGive(self->openConnectionsLock);
  2874. #endif
  2875. }
  2876. }
  2877. self->listeningThread = NULL;
  2878. }
  2879. #endif
  2880. }
  2881. void
  2882. CS104_Slave_setClockSyncHandler(CS104_Slave self, CS101_ClockSynchronizationHandler handler, void* parameter)
  2883. {
  2884. self->clockSyncHandler = handler;
  2885. self->clockSyncHandlerParameter = parameter;
  2886. }
  2887. void
  2888. CS104_Slave_setInterrogationHandler(CS104_Slave self, CS101_InterrogationHandler handler, void* parameter)
  2889. {
  2890. self->interrogationHandler = handler;
  2891. self->interrogationHandlerParameter = parameter;
  2892. }
  2893. void
  2894. CS104_Slave_setASDUHandler(CS104_Slave self, CS101_ASDUHandler handler, void* parameter)
  2895. {
  2896. self->asduHandler = handler;
  2897. self->asduHandlerParameter = parameter;
  2898. }
  2899. void
  2900. CS104_Slave_setConnectionRequestHandler(CS104_Slave self, CS104_ConnectionRequestHandler handler, void* parameter)
  2901. {
  2902. self->connectionRequestHandler = handler;
  2903. self->connectionRequestHandlerParameter = parameter;
  2904. }
  2905. void
  2906. CS104_Slave_setConnectionEventHandler(CS104_Slave self, CS104_ConnectionEventHandler handler, void* parameter)
  2907. {
  2908. self->connectionEventHandler = handler;
  2909. self->connectionEventHandlerParameter = parameter;
  2910. }
  2911. void
  2912. CS104_Slave_setRawMessageHandler(CS104_Slave self, CS104_SlaveRawMessageHandler handler, void* parameter)
  2913. {
  2914. self->rawMessageHandler = handler;
  2915. self->rawMessageHandlerParameter = parameter;
  2916. }
  2917. static void
  2918. MasterConnection_destroy(MasterConnection self)
  2919. {
  2920. if (self) {
  2921. GLOBAL_FREEMEM(self->sentASDUs);
  2922. #if (CONFIG_USE_SEMAPHORES == 1)
  2923. platform_mutex_destroy(self->sentASDUsLock);
  2924. platform_mutex_destroy(self->stateLock);
  2925. #endif
  2926. Handleset_destroy(self->handleSet);
  2927. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_CONNECTION_IS_REDUNDANCY_GROUP == 1)
  2928. if (self->slave->serverMode == CS104_MODE_CONNECTION_IS_REDUNDANCY_GROUP) {
  2929. MessageQueue_destroy(self->lowPrioQueue);
  2930. HighPriorityASDUQueue_destroy(self->highPrioQueue);
  2931. }
  2932. #endif
  2933. GLOBAL_FREEMEM(self);
  2934. }
  2935. }
  2936. void
  2937. CS104_RedundancyGroup_destroy(CS104_RedundancyGroup self)
  2938. {
  2939. if (self) {
  2940. if (self->name)
  2941. GLOBAL_FREEMEM(self->name);
  2942. MessageQueue_destroy(self->asduQueue);
  2943. HighPriorityASDUQueue_destroy(self->connectionAsduQueue);
  2944. if (self->allowedClients)
  2945. LinkedList_destroy(self->allowedClients);
  2946. GLOBAL_FREEMEM(self);
  2947. }
  2948. }
  2949. void
  2950. CS104_Slave_destroy(CS104_Slave self)
  2951. {
  2952. if (self) {
  2953. CS104_Slave_stop(self);
  2954. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  2955. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP) {
  2956. if (self->asduQueue)
  2957. MessageQueue_releaseAllQueuedASDUs(self->asduQueue);
  2958. }
  2959. #endif
  2960. if (self->localAddress != NULL)
  2961. GLOBAL_FREEMEM(self->localAddress);
  2962. #if (CONFIG_USE_SEMAPHORES == 1)
  2963. platform_mutex_destroy(self->openConnectionsLock);
  2964. platform_mutex_destroy(self->stateLock);
  2965. #endif
  2966. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1)
  2967. if (self->serverMode == CS104_MODE_SINGLE_REDUNDANCY_GROUP) {
  2968. MessageQueue_destroy(self->asduQueue);
  2969. HighPriorityASDUQueue_destroy(self->connectionAsduQueue);
  2970. }
  2971. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_SINGLE_REDUNDANCY_GROUP == 1) */
  2972. #if (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1)
  2973. if (self->serverMode == CS104_MODE_MULTIPLE_REDUNDANCY_GROUPS) {
  2974. if (self->redundancyGroups)
  2975. LinkedList_destroyDeep(self->redundancyGroups, (LinkedListValueDeleteFunction)CS104_RedundancyGroup_destroy);
  2976. }
  2977. #endif /* (CONFIG_CS104_SUPPORT_SERVER_MODE_MULTIPLE_REDUNDANCY_GROUPS == 1) */
  2978. {
  2979. int i;
  2980. for (i = 0; i < CONFIG_CS104_MAX_CLIENT_CONNECTIONS; i++) {
  2981. if (self->masterConnections[i]) {
  2982. MasterConnection_destroy(self->masterConnections[i]);
  2983. self->masterConnections[i] = NULL;
  2984. }
  2985. }
  2986. }
  2987. if (self->plugins) {
  2988. LinkedList_destroyStatic(self->plugins);
  2989. }
  2990. GLOBAL_FREEMEM(self);
  2991. }
  2992. }