MQTTPacket.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. /*******************************************************************************
  2. * Copyright (c) 2014 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Sergio R. Caprile - non-blocking packet read functions for stream transport
  16. *******************************************************************************/
  17. #include "StackTrace.h"
  18. #include "MQTTPacket.h"
  19. #include "gateway_message.h"
  20. #include <string.h>
  21. /**
  22. * Encodes the message length according to the MQTT algorithm
  23. * @param buf the buffer into which the encoded data is written
  24. * @param length the length to be encoded
  25. * @return the number of bytes written to buffer
  26. */
  27. int MQTTPacket_encode(unsigned char* buf, int length)
  28. {
  29. int rc = 0;
  30. FUNC_ENTRY;
  31. do
  32. {
  33. char d = length % 128;
  34. length /= 128;
  35. /* if there are more digits to encode, set the top bit of this digit */
  36. if (length > 0)
  37. d |= 0x80;
  38. buf[rc++] = d;
  39. } while (length > 0);
  40. FUNC_EXIT_RC(rc);
  41. return rc;
  42. }
  43. /**
  44. * Decodes the message length according to the MQTT algorithm
  45. * @param getcharfn pointer to function to read the next character from the data source
  46. * @param value the decoded length returned
  47. * @return the number of bytes read from the socket
  48. */
  49. int MQTTPacket_decode(int (*getcharfn)(unsigned char*, int), int* value)
  50. {
  51. unsigned char c;
  52. int multiplier = 1;
  53. int len = 0;
  54. #define MAX_NO_OF_REMAINING_LENGTH_BYTES 4
  55. FUNC_ENTRY;
  56. *value = 0;
  57. do
  58. {
  59. int rc = MQTTPACKET_READ_ERROR;
  60. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES)
  61. {
  62. rc = MQTTPACKET_READ_ERROR; /* bad data */
  63. goto exit;
  64. }
  65. rc = (*getcharfn)(&c, 1);
  66. if (rc != 1)
  67. goto exit;
  68. *value += (c & 127) * multiplier;
  69. multiplier *= 128;
  70. } while ((c & 128) != 0);
  71. exit:
  72. FUNC_EXIT_RC(len);
  73. return len;
  74. }
  75. int MQTTPacket_len(int rem_len)
  76. {
  77. rem_len += 1; /* header byte */
  78. /* now remaining_length field */
  79. if (rem_len < 128)
  80. rem_len += 1;
  81. else if (rem_len < 16384)
  82. rem_len += 2;
  83. else if (rem_len < 2097151)
  84. rem_len += 3;
  85. else
  86. rem_len += 4;
  87. return rem_len;
  88. }
  89. static unsigned char* bufptr;
  90. int bufchar(unsigned char* c, int count)
  91. {
  92. int i;
  93. for (i = 0; i < count; ++i)
  94. *c = *bufptr++;
  95. return count;
  96. }
  97. int MQTTPacket_decodeBuf(unsigned char* buf, int* value)
  98. {
  99. bufptr = buf;
  100. return MQTTPacket_decode(bufchar, value);
  101. }
  102. /**
  103. * Calculates an integer from two bytes read from the input buffer
  104. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  105. * @return the integer value calculated
  106. */
  107. int readInt(unsigned char** pptr)
  108. {
  109. unsigned char* ptr = *pptr;
  110. int len = 256*(*ptr) + (*(ptr+1));
  111. *pptr += 2;
  112. return len;
  113. }
  114. /**
  115. * Reads one character from the input buffer.
  116. * @param pptr pointer to the input buffer - incremented by the number of bytes used & returned
  117. * @return the character read
  118. */
  119. char readChar(unsigned char** pptr)
  120. {
  121. char c = **pptr;
  122. (*pptr)++;
  123. return c;
  124. }
  125. /**
  126. * Writes one character to an output buffer.
  127. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  128. * @param c the character to write
  129. */
  130. void writeChar(unsigned char** pptr, char c)
  131. {
  132. **pptr = c;
  133. (*pptr)++;
  134. }
  135. /**
  136. * Writes an integer as 2 bytes to an output buffer.
  137. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  138. * @param anInt the integer to write
  139. */
  140. void writeInt(unsigned char** pptr, int anInt)
  141. {
  142. **pptr = (unsigned char)(anInt / 256);
  143. (*pptr)++;
  144. **pptr = (unsigned char)(anInt % 256);
  145. (*pptr)++;
  146. }
  147. /**
  148. * Writes a "UTF" string to an output buffer. Converts C string to length-delimited.
  149. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  150. * @param string the C string to write
  151. */
  152. void writeCString(unsigned char** pptr, const char* string)
  153. {
  154. int len = strlen(string);
  155. writeInt(pptr, len);
  156. memcpy(*pptr, string, len);
  157. *pptr += len;
  158. }
  159. int getLenStringLen(char* ptr)
  160. {
  161. int len = 256*((unsigned char)(*ptr)) + (unsigned char)(*(ptr+1));
  162. return len;
  163. }
  164. void writeMQTTString(unsigned char** pptr, MQTTString mqttstring)
  165. {
  166. if (mqttstring.lenstring.len > 0)
  167. {
  168. writeInt(pptr, mqttstring.lenstring.len);
  169. memcpy(*pptr, mqttstring.lenstring.data, mqttstring.lenstring.len);
  170. *pptr += mqttstring.lenstring.len;
  171. }
  172. else if (mqttstring.cstring)
  173. writeCString(pptr, mqttstring.cstring);
  174. else
  175. writeInt(pptr, 0);
  176. }
  177. /**
  178. * @param mqttstring the MQTTString structure into which the data is to be read
  179. * @param pptr pointer to the output buffer - incremented by the number of bytes used & returned
  180. * @param enddata pointer to the end of the data: do not read beyond
  181. * @return 1 if successful, 0 if not
  182. */
  183. int readMQTTLenString(MQTTString* mqttstring, unsigned char** pptr, unsigned char* enddata)
  184. {
  185. int rc = 0;
  186. FUNC_ENTRY;
  187. /* the first two bytes are the length of the string */
  188. if (enddata - (*pptr) > 1) /* enough length to read the integer? */
  189. {
  190. mqttstring->lenstring.len = readInt(pptr); /* increments pptr to point past length */
  191. if (&(*pptr)[mqttstring->lenstring.len] <= enddata)
  192. {
  193. mqttstring->lenstring.data = (char*)*pptr;
  194. *pptr += mqttstring->lenstring.len;
  195. rc = 1;
  196. }
  197. }
  198. mqttstring->cstring = NULL;
  199. FUNC_EXIT_RC(rc);
  200. return rc;
  201. }
  202. /**
  203. * Return the length of the MQTTstring - C string if there is one, otherwise the length delimited string
  204. * @param mqttstring the string to return the length of
  205. * @return the length of the string
  206. */
  207. int MQTTstrlen(MQTTString mqttstring)
  208. {
  209. int rc = 0;
  210. if (mqttstring.cstring)
  211. rc = strlen(mqttstring.cstring);
  212. else
  213. rc = mqttstring.lenstring.len;
  214. return rc;
  215. }
  216. /**
  217. * Compares an MQTTString to a C string
  218. * @param a the MQTTString to compare
  219. * @param bptr the C string to compare
  220. * @return boolean - equal or not
  221. */
  222. int MQTTPacket_equals(MQTTString* a, char* bptr)
  223. {
  224. int alen = 0,
  225. blen = 0;
  226. char *aptr;
  227. if (a->cstring)
  228. {
  229. aptr = a->cstring;
  230. alen = strlen(a->cstring);
  231. }
  232. else
  233. {
  234. aptr = a->lenstring.data;
  235. alen = a->lenstring.len;
  236. }
  237. blen = strlen(bptr);
  238. return (alen == blen) && (strncmp(aptr, bptr, alen) == 0);
  239. }
  240. /**
  241. * Helper function to read packet data from some source into a buffer
  242. * @param buf the buffer into which the packet will be serialized
  243. * @param buflen the length in bytes of the supplied buffer
  244. * @param getfn pointer to a function which will read any number of bytes from the needed source
  245. * @return integer MQTT packet type, or -1 on error
  246. * @note the whole message must fit into the caller's buffer
  247. */
  248. int MQTTPacket_read(unsigned char* buf, int buflen, int (*getfn)(unsigned char*, int))
  249. {
  250. int rc = -1;
  251. MQTTHeader header = {0};
  252. int len = 0;
  253. int rem_len = 0;
  254. /* 1. read the header byte. This has the packet type in it */
  255. if ((*getfn)(buf, 1) != 1)
  256. goto exit;
  257. len = 1;
  258. /* 2. read the remaining length. This is variable in itself */
  259. MQTTPacket_decode(getfn, &rem_len);
  260. len += MQTTPacket_encode(buf + 1, rem_len); /* put the original remaining length back into the buffer */
  261. /* 3. read the rest of the buffer using a callback to supply the rest of the data */
  262. if((rem_len + len) > buflen)
  263. goto exit;
  264. if (rem_len && ((*getfn)(buf + len, rem_len) != rem_len))
  265. goto exit;
  266. header.byte = buf[0];
  267. rc = header.bits.type;
  268. exit:
  269. return rc;
  270. }
  271. /**
  272. * Decodes the message length according to the MQTT algorithm, non-blocking
  273. * @param trp pointer to a transport structure holding what is needed to solve getting data from it
  274. * @param value the decoded length returned
  275. * @return integer the number of bytes read from the socket, 0 for call again, or -1 on error
  276. */
  277. static int MQTTPacket_decodenb(MQTTTransport *trp)
  278. {
  279. unsigned char c;
  280. int rc = MQTTPACKET_READ_ERROR;
  281. FUNC_ENTRY;
  282. if(trp->len == 0){ /* initialize on first call */
  283. trp->multiplier = 1;
  284. trp->rem_len = 0;
  285. }
  286. do {
  287. int frc;
  288. if (trp->len >= MAX_NO_OF_REMAINING_LENGTH_BYTES)
  289. goto exit;
  290. if ((frc=(*trp->getfn)(trp->sck, &c, 1)) == -1)
  291. goto exit;
  292. if (frc == 0){
  293. rc = 0;
  294. goto exit;
  295. }
  296. ++(trp->len);
  297. trp->rem_len += (c & 127) * trp->multiplier;
  298. trp->multiplier *= 128;
  299. } while ((c & 128) != 0);
  300. rc = trp->len;
  301. exit:
  302. FUNC_EXIT_RC(rc);
  303. return rc;
  304. }
  305. /**
  306. * Helper function to read packet data from some source into a buffer, non-blocking
  307. * @param buf the buffer into which the packet will be serialized
  308. * @param buflen the length in bytes of the supplied buffer
  309. * @param trp pointer to a transport structure holding what is needed to solve getting data from it
  310. * @return integer MQTT packet type, 0 for call again, or -1 on error
  311. * @note the whole message must fit into the caller's buffer
  312. */
  313. int MQTTPacket_readnb(unsigned char* buf, int buflen, MQTTTransport *trp)
  314. {
  315. int rc = -1, frc;
  316. MQTTHeader header = {0};
  317. switch(trp->state){
  318. default:
  319. trp->state = 0;
  320. /*FALLTHROUGH*/
  321. case 0:
  322. /* read the header byte. This has the packet type in it */
  323. if ((frc=(*trp->getfn)(trp->sck, buf, 1)) == -1)
  324. goto exit;
  325. if (frc == 0)
  326. return 0;
  327. trp->len = 0;
  328. ++trp->state;
  329. /*FALLTHROUGH*/
  330. /* read the remaining length. This is variable in itself */
  331. case 1:
  332. if((frc=MQTTPacket_decodenb(trp)) == MQTTPACKET_READ_ERROR)
  333. goto exit;
  334. if(frc == 0)
  335. return 0;
  336. trp->len = 1 + MQTTPacket_encode(buf + 1, trp->rem_len); /* put the original remaining length back into the buffer */
  337. if((trp->rem_len + trp->len) > buflen)
  338. goto exit;
  339. ++trp->state;
  340. /*FALLTHROUGH*/
  341. case 2:
  342. if(trp->rem_len){
  343. /* read the rest of the buffer using a callback to supply the rest of the data */
  344. if ((frc=(*trp->getfn)(trp->sck, buf + trp->len, trp->rem_len)) == -1)
  345. goto exit;
  346. if (frc == 0)
  347. return 0;
  348. trp->rem_len -= frc;
  349. trp->len += frc;
  350. if(trp->rem_len)
  351. return 0;
  352. }
  353. header.byte = buf[0];
  354. rc = header.bits.type;
  355. break;
  356. }
  357. exit:
  358. trp->state = 0;
  359. return rc;
  360. }