mqttclient.c 49 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505
  1. /*
  2. * @Author: jiejie
  3. * @Github: https://github.com/jiejieTop
  4. * @Date: 2019-12-09 21:31:25
  5. * @LastEditTime : 2023-03-26 17:18:35
  6. * @Description: the code belongs to jiejie, please keep the author information and source code according to the license.
  7. */
  8. #include "mqttclient.h"
  9. #define MQTT_MIN_PAYLOAD_SIZE 2
  10. #define MQTT_MAX_PAYLOAD_SIZE 268435455 // MQTT imposes a maximum payload size of 268435455 bytes.
  11. static void default_msg_handler(void* client, message_data_t* msg)
  12. {
  13. MQTT_LOG_I("%s:%d %s()...\ntopic: %s, qos: %d, \nmessage:%s", __FILE__, __LINE__, __FUNCTION__,
  14. msg->topic_name, msg->message->qos, (char*)msg->message->payload);
  15. }
  16. static client_state_t mqtt_get_client_state(mqtt_client_t* c)
  17. {
  18. return c->mqtt_client_state;
  19. }
  20. static void mqtt_set_client_state(mqtt_client_t* c, client_state_t state)
  21. {
  22. platform_mutex_lock(&c->mqtt_global_lock);
  23. c->mqtt_client_state = state;
  24. platform_mutex_unlock(&c->mqtt_global_lock);
  25. }
  26. static int mqtt_is_connected(mqtt_client_t* c)
  27. {
  28. client_state_t state;
  29. state = mqtt_get_client_state(c);
  30. if (CLIENT_STATE_CLEAN_SESSION == state) {
  31. RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
  32. } else if (CLIENT_STATE_CONNECTED != state) {
  33. RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
  34. }
  35. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  36. }
  37. static int mqtt_set_publish_dup(mqtt_client_t* c, uint8_t dup)
  38. {
  39. uint8_t *read_data = c->mqtt_write_buf;
  40. uint8_t *write_data = c->mqtt_write_buf;
  41. MQTTHeader header = {0};
  42. if (NULL == c->mqtt_write_buf)
  43. RETURN_ERROR(MQTT_SET_PUBLISH_DUP_FAILED_ERROR);
  44. header.byte = readChar(&read_data); /* read header */
  45. if (header.bits.type != PUBLISH)
  46. RETURN_ERROR(MQTT_SET_PUBLISH_DUP_FAILED_ERROR);
  47. header.bits.dup = dup;
  48. writeChar(&write_data, header.byte); /* write header */
  49. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  50. }
  51. static int mqtt_ack_handler_is_maximum(mqtt_client_t* c)
  52. {
  53. return (c->mqtt_ack_handler_number >= MQTT_ACK_HANDLER_NUM_MAX) ? 1 : 0;
  54. }
  55. static void mqtt_add_ack_handler_num(mqtt_client_t* c)
  56. {
  57. platform_mutex_lock(&c->mqtt_global_lock);
  58. c->mqtt_ack_handler_number++;
  59. platform_mutex_unlock(&c->mqtt_global_lock);
  60. }
  61. static int mqtt_subtract_ack_handler_num(mqtt_client_t* c)
  62. {
  63. int rc = MQTT_SUCCESS_ERROR;
  64. platform_mutex_lock(&c->mqtt_global_lock);
  65. if (c->mqtt_ack_handler_number <= 0) {
  66. goto exit;
  67. }
  68. c->mqtt_ack_handler_number--;
  69. exit:
  70. platform_mutex_unlock(&c->mqtt_global_lock);
  71. RETURN_ERROR(rc);
  72. }
  73. static uint16_t mqtt_get_next_packet_id(mqtt_client_t *c)
  74. {
  75. platform_mutex_lock(&c->mqtt_global_lock);
  76. c->mqtt_packet_id = (c->mqtt_packet_id == MQTT_MAX_PACKET_ID) ? 1 : c->mqtt_packet_id + 1;
  77. platform_mutex_unlock(&c->mqtt_global_lock);
  78. return c->mqtt_packet_id;
  79. }
  80. static int mqtt_decode_packet(mqtt_client_t* c, int* value, int timeout)
  81. {
  82. uint8_t i;
  83. int multiplier = 1;
  84. int len = 0;
  85. const int MAX_NO_OF_REMAINING_LENGTH_BYTES = 4;
  86. *value = 0;
  87. do {
  88. int rc = MQTTPACKET_READ_ERROR;
  89. if (++len > MAX_NO_OF_REMAINING_LENGTH_BYTES) {
  90. rc = MQTTPACKET_READ_ERROR; /* bad data */
  91. goto exit;
  92. }
  93. rc = network_read(c->mqtt_network, &i, 1, timeout); /* read network data */
  94. if (rc != 1)
  95. goto exit;
  96. *value += (i & 127) * multiplier; /* decode data length according to mqtt protocol */
  97. multiplier *= 128;
  98. } while ((i & 128) != 0);
  99. exit:
  100. return len;
  101. }
  102. static void mqtt_packet_drain(mqtt_client_t* c, platform_timer_t *timer, int packet_len)
  103. {
  104. int total_bytes_read = 0, read_len = 0, bytes2read = 0;
  105. if (packet_len < c->mqtt_read_buf_size) {
  106. bytes2read = packet_len;
  107. } else {
  108. bytes2read = c->mqtt_read_buf_size;
  109. }
  110. do {
  111. read_len = network_read(c->mqtt_network, c->mqtt_read_buf, bytes2read, platform_timer_remain(timer));
  112. if (0 != read_len) {
  113. total_bytes_read += read_len;
  114. if ((packet_len - total_bytes_read) >= c->mqtt_read_buf_size) {
  115. bytes2read = c->mqtt_read_buf_size;
  116. } else {
  117. bytes2read = packet_len - total_bytes_read;
  118. }
  119. }
  120. } while ((total_bytes_read < packet_len) && (0 != read_len)); /* read and discard all corrupted data */
  121. }
  122. static int mqtt_read_packet(mqtt_client_t* c, int* packet_type, platform_timer_t* timer)
  123. {
  124. MQTTHeader header = {0};
  125. int rc;
  126. int len = 1;
  127. int remain_len = 0;
  128. if (NULL == packet_type)
  129. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  130. platform_timer_init(timer);
  131. platform_timer_cutdown(timer, c->mqtt_cmd_timeout);
  132. /* 1. read the header byte. This has the packet type in it */
  133. rc = network_read(c->mqtt_network, c->mqtt_read_buf, len, platform_timer_remain(timer));
  134. if (rc != len)
  135. RETURN_ERROR(MQTT_NOTHING_TO_READ_ERROR);
  136. /* 2. read the remaining length. This is variable in itself */
  137. mqtt_decode_packet(c, &remain_len, platform_timer_remain(timer));
  138. /* put the original remaining length back into the buffer */
  139. len += MQTTPacket_encode(c->mqtt_read_buf + len, remain_len);
  140. if ((len + remain_len) > c->mqtt_read_buf_size) {
  141. /* mqtt buffer is too short, read and discard all corrupted data */
  142. mqtt_packet_drain(c, timer, remain_len);
  143. RETURN_ERROR(MQTT_BUFFER_TOO_SHORT_ERROR);
  144. }
  145. /* 3. read the rest of the buffer using a callback to supply the rest of the data */
  146. if ((remain_len > 0) && ((rc = network_read(c->mqtt_network, c->mqtt_read_buf + len, remain_len, platform_timer_remain(timer))) != remain_len))
  147. RETURN_ERROR(MQTT_NOTHING_TO_READ_ERROR);
  148. header.byte = c->mqtt_read_buf[0];
  149. *packet_type = header.bits.type;
  150. platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000));
  151. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  152. }
  153. static int mqtt_send_packet(mqtt_client_t* c, int length, platform_timer_t* timer)
  154. {
  155. int len = 0;
  156. int sent = 0;
  157. platform_timer_init(timer);
  158. platform_timer_cutdown(timer, c->mqtt_cmd_timeout);
  159. /* send mqtt packet in a blocking manner or exit when it timer is expired */
  160. while ((sent < length) && (!platform_timer_is_expired(timer))) {
  161. len = network_write(c->mqtt_network, &c->mqtt_write_buf[sent], length, platform_timer_remain(timer));
  162. if (len <= 0) // there was an error writing the data
  163. break;
  164. sent += len;
  165. }
  166. if (sent == length) {
  167. platform_timer_cutdown(&c->mqtt_last_sent, (c->mqtt_keep_alive_interval * 1000));
  168. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  169. }
  170. RETURN_ERROR(MQTT_SEND_PACKET_ERROR);
  171. }
  172. static int mqtt_is_topic_equals(const char *topic_filter, const char *topic)
  173. {
  174. int topic_len = 0;
  175. topic_len = strlen(topic);
  176. if (strlen(topic_filter) != topic_len) {
  177. return 0;
  178. }
  179. if (strncmp(topic_filter, topic, topic_len) == 0) {
  180. return 1;
  181. }
  182. return 0;
  183. }
  184. static char mqtt_topic_is_matched(char* topic_filter, MQTTString* topic_name)
  185. {
  186. char* curf = topic_filter;
  187. char* curn = topic_name->lenstring.data;
  188. char* curn_end = curn + topic_name->lenstring.len;
  189. while (*curf && curn < curn_end)
  190. {
  191. if (*curn == '/' && *curf != '/')
  192. break;
  193. /* support wildcards for MQTT topics, such as '#' '+' */
  194. if (*curf != '+' && *curf != '#' && *curf != *curn)
  195. break;
  196. if (*curf == '+') {
  197. char* nextpos = curn + 1;
  198. while (nextpos < curn_end && *nextpos != '/')
  199. nextpos = ++curn + 1;
  200. }
  201. else if (*curf == '#')
  202. curn = curn_end - 1;
  203. curf++;
  204. curn++;
  205. };
  206. return (curn == curn_end) && (*curf == '\0');
  207. }
  208. static void mqtt_new_message_data(message_data_t* md, MQTTString* topic_name, mqtt_message_t* message)
  209. {
  210. int len;
  211. len = (topic_name->lenstring.len < MQTT_TOPIC_LEN_MAX - 1) ? topic_name->lenstring.len : MQTT_TOPIC_LEN_MAX - 1;
  212. memcpy(md->topic_name, topic_name->lenstring.data, len);
  213. md->topic_name[len] = '\0'; /* the topic name is too long and will be truncated */
  214. md->message = message;
  215. }
  216. static message_handlers_t *mqtt_get_msg_handler(mqtt_client_t* c, MQTTString* topic_name)
  217. {
  218. mqtt_list_t *curr, *next;
  219. message_handlers_t *msg_handler;
  220. /* traverse the msg_handler_list to find the matching message handler */
  221. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
  222. msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
  223. /* judge topic is equal or match, support wildcard, such as '#' '+' */
  224. if ((NULL != msg_handler->topic_filter) && ((MQTTPacket_equals(topic_name, (char*)msg_handler->topic_filter)) ||
  225. (mqtt_topic_is_matched((char*)msg_handler->topic_filter, topic_name)))) {
  226. return msg_handler;
  227. }
  228. }
  229. return NULL;
  230. }
  231. static int mqtt_deliver_message(mqtt_client_t* c, MQTTString* topic_name, mqtt_message_t* message)
  232. {
  233. int rc = MQTT_FAILED_ERROR;
  234. message_handlers_t *msg_handler;
  235. /* get mqtt message handler */
  236. msg_handler = mqtt_get_msg_handler(c, topic_name);
  237. if (NULL != msg_handler) {
  238. message_data_t md;
  239. mqtt_new_message_data(&md, topic_name, message); /* make a message data */
  240. msg_handler->handler(c, &md); /* deliver the message */
  241. rc = MQTT_SUCCESS_ERROR;
  242. } else if (NULL != c->mqtt_interceptor_handler) {
  243. message_data_t md;
  244. mqtt_new_message_data(&md, topic_name, message); /* make a message data */
  245. c->mqtt_interceptor_handler(c, &md);
  246. rc = MQTT_SUCCESS_ERROR;
  247. }
  248. memset(message->payload, 0, strlen(message->payload));
  249. memset(topic_name->lenstring.data, 0, topic_name->lenstring.len);
  250. RETURN_ERROR(rc);
  251. }
  252. static ack_handlers_t *mqtt_ack_handler_create(mqtt_client_t* c, int type, uint16_t packet_id, uint16_t payload_len, message_handlers_t* handler)
  253. {
  254. ack_handlers_t *ack_handler = NULL;
  255. ack_handler = (ack_handlers_t *) platform_memory_alloc(sizeof(ack_handlers_t) + payload_len);
  256. if (NULL == ack_handler)
  257. return NULL;
  258. mqtt_list_init(&ack_handler->list);
  259. platform_timer_init(&ack_handler->timer);
  260. platform_timer_cutdown(&ack_handler->timer, c->mqtt_cmd_timeout); /* No response within timeout will be destroyed or resent */
  261. ack_handler->type = type;
  262. ack_handler->packet_id = packet_id;
  263. ack_handler->payload_len = payload_len;
  264. ack_handler->payload = (uint8_t *)ack_handler + sizeof(ack_handlers_t);
  265. ack_handler->handler = handler;
  266. memcpy(ack_handler->payload, c->mqtt_write_buf, payload_len); /* save the data in ack handler*/
  267. return ack_handler;
  268. }
  269. static void mqtt_ack_handler_destroy(ack_handlers_t* ack_handler)
  270. {
  271. if (NULL != &ack_handler->list) {
  272. mqtt_list_del(&ack_handler->list);
  273. platform_memory_free(ack_handler); /* delete ack handler from the list, and free memory */
  274. }
  275. }
  276. static void mqtt_ack_handler_resend(mqtt_client_t* c, ack_handlers_t* ack_handler)
  277. {
  278. platform_timer_t timer;
  279. platform_timer_init(&timer);
  280. platform_timer_cutdown(&timer, c->mqtt_cmd_timeout);
  281. platform_timer_cutdown(&ack_handler->timer, c->mqtt_cmd_timeout); /* timeout, recutdown */
  282. platform_mutex_lock(&c->mqtt_write_lock);
  283. memcpy(c->mqtt_write_buf, ack_handler->payload, ack_handler->payload_len); /* copy data to write buf form ack handler */
  284. mqtt_send_packet(c, ack_handler->payload_len, &timer); /* resend data */
  285. platform_mutex_unlock(&c->mqtt_write_lock);
  286. MQTT_LOG_W("%s:%d %s()... resend %d package, packet_id is %d ", __FILE__, __LINE__, __FUNCTION__, ack_handler->type, ack_handler->packet_id);
  287. }
  288. static int mqtt_ack_list_node_is_exist(mqtt_client_t* c, int type, uint16_t packet_id)
  289. {
  290. mqtt_list_t *curr, *next;
  291. ack_handlers_t *ack_handler;
  292. if (mqtt_list_is_empty(&c->mqtt_ack_handler_list))
  293. return 0;
  294. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) {
  295. ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
  296. /* For mqtt packets of qos1 and qos2, you can use the packet id and type as the unique
  297. identifier to determine whether the node already exists and avoid repeated addition. */
  298. if ((packet_id == ack_handler->packet_id) && (type == ack_handler->type))
  299. return 1;
  300. }
  301. return 0;
  302. }
  303. static int mqtt_ack_list_record(mqtt_client_t* c, int type, uint16_t packet_id, uint16_t payload_len, message_handlers_t* handler)
  304. {
  305. int rc = MQTT_SUCCESS_ERROR;
  306. ack_handlers_t *ack_handler = NULL;
  307. /* Determine if the node already exists */
  308. if (mqtt_ack_list_node_is_exist(c, type, packet_id))
  309. RETURN_ERROR(MQTT_ACK_NODE_IS_EXIST_ERROR);
  310. /* create a ack handler node */
  311. ack_handler = mqtt_ack_handler_create(c, type, packet_id, payload_len, handler);
  312. if (NULL == ack_handler)
  313. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  314. mqtt_add_ack_handler_num(c);
  315. mqtt_list_add_tail(&ack_handler->list, &c->mqtt_ack_handler_list);
  316. RETURN_ERROR(rc);
  317. }
  318. static int mqtt_ack_list_unrecord(mqtt_client_t* c, int type, uint16_t packet_id, message_handlers_t **handler)
  319. {
  320. mqtt_list_t *curr, *next;
  321. ack_handlers_t *ack_handler;
  322. if (mqtt_list_is_empty(&c->mqtt_ack_handler_list))
  323. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  324. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) {
  325. ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
  326. if ((packet_id != ack_handler->packet_id) || (type != ack_handler->type))
  327. continue;
  328. if (handler)
  329. *handler = ack_handler->handler;
  330. /* destroy a ack handler node */
  331. mqtt_ack_handler_destroy(ack_handler);
  332. mqtt_subtract_ack_handler_num(c);
  333. }
  334. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  335. }
  336. static message_handlers_t *mqtt_msg_handler_create(const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
  337. {
  338. message_handlers_t *msg_handler = NULL;
  339. msg_handler = (message_handlers_t *) platform_memory_alloc(sizeof(message_handlers_t));
  340. if (NULL == msg_handler)
  341. return NULL;
  342. mqtt_list_init(&msg_handler->list);
  343. msg_handler->qos = qos;
  344. msg_handler->handler = handler; /* register callback handler */
  345. msg_handler->topic_filter = topic_filter;
  346. return msg_handler;
  347. }
  348. static void mqtt_msg_handler_destory(message_handlers_t *msg_handler)
  349. {
  350. if (NULL != &msg_handler->list) {
  351. mqtt_list_del(&msg_handler->list);
  352. platform_memory_free(msg_handler);
  353. }
  354. }
  355. static int mqtt_msg_handler_is_exist(mqtt_client_t* c, message_handlers_t *handler)
  356. {
  357. mqtt_list_t *curr, *next;
  358. message_handlers_t *msg_handler;
  359. if (mqtt_list_is_empty(&c->mqtt_msg_handler_list))
  360. return 0;
  361. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
  362. msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
  363. /* determine whether a node already exists by mqtt topic, but wildcards are not supported */
  364. if ((NULL != msg_handler->topic_filter) && (mqtt_is_topic_equals(msg_handler->topic_filter, handler->topic_filter))) {
  365. MQTT_LOG_W("%s:%d %s()...msg_handler->topic_filter: %s, handler->topic_filter: %s",
  366. __FILE__, __LINE__, __FUNCTION__, msg_handler->topic_filter, handler->topic_filter);
  367. return 1;
  368. }
  369. }
  370. return 0;
  371. }
  372. static int mqtt_msg_handlers_install(mqtt_client_t* c, message_handlers_t *handler)
  373. {
  374. if ((NULL == c) || (NULL == handler))
  375. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  376. if (mqtt_msg_handler_is_exist(c, handler)) {
  377. mqtt_msg_handler_destory(handler);
  378. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  379. }
  380. /* install to msg_handler_list*/
  381. mqtt_list_add_tail(&handler->list, &c->mqtt_msg_handler_list);
  382. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  383. }
  384. static void mqtt_clean_session(mqtt_client_t* c)
  385. {
  386. mqtt_list_t *curr, *next;
  387. ack_handlers_t *ack_handler;
  388. message_handlers_t *msg_handler;
  389. /* release all ack_handler_list memory */
  390. if (!(mqtt_list_is_empty(&c->mqtt_ack_handler_list))) {
  391. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) {
  392. ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
  393. mqtt_list_del(&ack_handler->list);
  394. //@lchnu, 2020-10-08, avoid socket disconnet when waiting for suback/unsuback....
  395. if(NULL != ack_handler->handler) {
  396. mqtt_msg_handler_destory(ack_handler->handler);
  397. ack_handler->handler = NULL;
  398. }
  399. platform_memory_free(ack_handler);
  400. }
  401. mqtt_list_del_init(&c->mqtt_ack_handler_list);
  402. }
  403. /* need clean mqtt_ack_handler_number value, find the bug by @lchnu */
  404. c->mqtt_ack_handler_number = 0;
  405. /* release all msg_handler_list memory */
  406. if (!(mqtt_list_is_empty(&c->mqtt_msg_handler_list))) {
  407. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
  408. msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
  409. mqtt_list_del(&msg_handler->list);
  410. msg_handler->topic_filter = NULL;
  411. platform_memory_free(msg_handler);
  412. }
  413. // MQTT_LOG_D("%s:%d %s() mqtt_msg_handler_list delete", __FILE__, __LINE__, __FUNCTION__);
  414. mqtt_list_del_init(&c->mqtt_msg_handler_list);
  415. }
  416. mqtt_set_client_state(c, CLIENT_STATE_INVALID);
  417. }
  418. /**
  419. * see if there is a message waiting for the server to answer in the ack list, if there is, then process it according to the flag.
  420. * flag : 0 means it does not need to wait for the timeout to process these packets immediately. usually immediately after reconnecting.
  421. * 1 means it needs to wait for timeout before processing these messages, usually timeout processing in a stable connection.
  422. */
  423. static void mqtt_ack_list_scan(mqtt_client_t* c, uint8_t flag)
  424. {
  425. mqtt_list_t *curr, *next;
  426. ack_handlers_t *ack_handler;
  427. if ((mqtt_list_is_empty(&c->mqtt_ack_handler_list)) || (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)))
  428. return;
  429. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_ack_handler_list) {
  430. ack_handler = LIST_ENTRY(curr, ack_handlers_t, list);
  431. if ((!platform_timer_is_expired(&ack_handler->timer)) && (flag == 1))
  432. continue;
  433. if ((ack_handler->type == PUBACK) || (ack_handler->type == PUBREC) || (ack_handler->type == PUBREL) || (ack_handler->type == PUBCOMP)) {
  434. /* timeout has occurred. for qos1 and qos2 packets, you need to resend them. */
  435. mqtt_ack_handler_resend(c, ack_handler);
  436. continue;
  437. } else if ((ack_handler->type == SUBACK) || (ack_handler->type == UNSUBACK)) {
  438. /*@lchnu, 2020-10-08, destory handler memory, if suback/unsuback is overdue!*/
  439. if (NULL != ack_handler->handler) {
  440. mqtt_msg_handler_destory(ack_handler->handler);
  441. ack_handler->handler = NULL;
  442. }
  443. }
  444. /* if it is not a qos1 or qos2 message, it will be destroyed in every processing */
  445. mqtt_ack_handler_destroy(ack_handler);
  446. mqtt_subtract_ack_handler_num(c); /*@lchnu, 2020-10-08 */
  447. }
  448. }
  449. static int mqtt_try_resubscribe(mqtt_client_t* c)
  450. {
  451. int rc = MQTT_RESUBSCRIBE_ERROR;
  452. mqtt_list_t *curr, *next;
  453. message_handlers_t *msg_handler;
  454. MQTT_LOG_W("%s:%d %s()... mqtt try resubscribe ...", __FILE__, __LINE__, __FUNCTION__);
  455. if (mqtt_list_is_empty(&c->mqtt_msg_handler_list)) {
  456. // MQTT_LOG_D("%s:%d %s() mqtt_msg_handler_list is empty", __FILE__, __LINE__, __FUNCTION__);
  457. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  458. }
  459. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
  460. msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
  461. /* resubscribe topic */
  462. if ((rc = mqtt_subscribe(c, msg_handler->topic_filter, msg_handler->qos, msg_handler->handler)) == MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR)
  463. MQTT_LOG_W("%s:%d %s()... mqtt ack handler num too much ...", __FILE__, __LINE__, __FUNCTION__);
  464. }
  465. RETURN_ERROR(rc);
  466. }
  467. static int mqtt_try_do_reconnect(mqtt_client_t* c)
  468. {
  469. int rc = MQTT_CONNECT_FAILED_ERROR;
  470. if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
  471. rc = mqtt_connect(c); /* reconnect */
  472. if (MQTT_SUCCESS_ERROR == rc) {
  473. rc = mqtt_try_resubscribe(c); /* resubscribe */
  474. /* process these ack messages immediately after reconnecting */
  475. mqtt_ack_list_scan(c, 0);
  476. }
  477. MQTT_LOG_D("%s:%d %s()... mqtt try connect result is -0x%04x", __FILE__, __LINE__, __FUNCTION__, -rc);
  478. RETURN_ERROR(rc);
  479. }
  480. static int mqtt_try_reconnect(mqtt_client_t* c)
  481. {
  482. int rc = MQTT_SUCCESS_ERROR;
  483. /*before connect, call reconnect handler, it can used to update the mqtt password, eg: onenet platform need*/
  484. if (NULL != c->mqtt_reconnect_handler) {
  485. c->mqtt_reconnect_handler(c, c->mqtt_reconnect_data);
  486. }
  487. rc = mqtt_try_do_reconnect(c);
  488. if(MQTT_SUCCESS_ERROR != rc) {
  489. /*connect fail must delay reconnect try duration time and let cpu time go out, the lowest priority task can run */
  490. mqtt_sleep_ms(c->mqtt_reconnect_try_duration);
  491. RETURN_ERROR(MQTT_RECONNECT_TIMEOUT_ERROR);
  492. }
  493. RETURN_ERROR(rc);
  494. }
  495. static int mqtt_publish_ack_packet(mqtt_client_t *c, uint16_t packet_id, int packet_type)
  496. {
  497. int len = 0;
  498. int rc = MQTT_SUCCESS_ERROR;
  499. platform_timer_t timer;
  500. platform_timer_init(&timer);
  501. platform_timer_cutdown(&timer, c->mqtt_cmd_timeout);
  502. platform_mutex_lock(&c->mqtt_write_lock);
  503. switch (packet_type) {
  504. case PUBREC:
  505. len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBREL, 0, packet_id); /* make a PUBREL ack packet */
  506. rc = mqtt_ack_list_record(c, PUBCOMP, packet_id, len, NULL); /* record ack, expect to receive PUBCOMP*/
  507. if (MQTT_SUCCESS_ERROR != rc)
  508. goto exit;
  509. break;
  510. case PUBREL:
  511. len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBCOMP, 0, packet_id); /* make a PUBCOMP ack packet */
  512. break;
  513. default:
  514. rc = MQTT_PUBLISH_ACK_TYPE_ERROR;
  515. goto exit;
  516. }
  517. if (len <= 0) {
  518. rc = MQTT_PUBLISH_ACK_PACKET_ERROR;
  519. goto exit;
  520. }
  521. rc = mqtt_send_packet(c, len, &timer);
  522. exit:
  523. platform_mutex_unlock(&c->mqtt_write_lock);
  524. RETURN_ERROR(rc);
  525. }
  526. static int mqtt_puback_and_pubcomp_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  527. {
  528. int rc = MQTT_FAILED_ERROR;
  529. uint16_t packet_id;
  530. uint8_t dup, packet_type;
  531. rc = mqtt_is_connected(c);
  532. if (MQTT_SUCCESS_ERROR != rc)
  533. RETURN_ERROR(rc);
  534. if (MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
  535. rc = MQTT_PUBREC_PACKET_ERROR;
  536. (void) dup;
  537. rc = mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL); /* unrecord ack handler */
  538. RETURN_ERROR(rc);
  539. }
  540. static int mqtt_suback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  541. {
  542. int rc = MQTT_FAILED_ERROR;
  543. int count = 0;
  544. int granted_qos = 0;
  545. uint16_t packet_id;
  546. int is_nack = 0;
  547. message_handlers_t *msg_handler = NULL;
  548. rc = mqtt_is_connected(c);
  549. if (MQTT_SUCCESS_ERROR != rc)
  550. RETURN_ERROR(rc);
  551. /* deserialize subscribe ack packet */
  552. if (MQTTDeserialize_suback(&packet_id, 1, &count, (int*)&granted_qos, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
  553. RETURN_ERROR(MQTT_SUBSCRIBE_ACK_PACKET_ERROR);
  554. is_nack = (granted_qos == SUBFAIL);
  555. rc = mqtt_ack_list_unrecord(c, SUBACK, packet_id, &msg_handler);
  556. if (!msg_handler)
  557. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  558. if (is_nack) {
  559. mqtt_msg_handler_destory(msg_handler); /* subscribe topic failed, destory message handler */
  560. MQTT_LOG_D("subscribe topic failed...");
  561. RETURN_ERROR(MQTT_SUBSCRIBE_NOT_ACK_ERROR);
  562. }
  563. rc = mqtt_msg_handlers_install(c, msg_handler);
  564. RETURN_ERROR(rc);
  565. }
  566. static int mqtt_unsuback_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  567. {
  568. int rc = MQTT_FAILED_ERROR;
  569. message_handlers_t *msg_handler;
  570. uint16_t packet_id = 0;
  571. rc = mqtt_is_connected(c);
  572. if (MQTT_SUCCESS_ERROR != rc)
  573. RETURN_ERROR(rc);
  574. if (MQTTDeserialize_unsuback(&packet_id, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
  575. RETURN_ERROR(MQTT_UNSUBSCRIBE_ACK_PACKET_ERROR);
  576. rc = mqtt_ack_list_unrecord(c, UNSUBACK, packet_id, &msg_handler); /* unrecord ack handler, and get message handler */
  577. if (!msg_handler)
  578. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  579. mqtt_msg_handler_destory(msg_handler); /* destory message handler */
  580. RETURN_ERROR(rc);
  581. }
  582. static int mqtt_publish_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  583. {
  584. int len = 0, rc = MQTT_SUCCESS_ERROR;
  585. MQTTString topic_name;
  586. mqtt_message_t msg;
  587. int qos;
  588. msg.payloadlen = 0;
  589. rc = mqtt_is_connected(c);
  590. if (MQTT_SUCCESS_ERROR != rc)
  591. RETURN_ERROR(rc);
  592. if (MQTTDeserialize_publish(&msg.dup, &qos, &msg.retained, &msg.id, &topic_name,
  593. (uint8_t**)&msg.payload, (int*)&msg.payloadlen, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
  594. RETURN_ERROR(MQTT_PUBLISH_PACKET_ERROR);
  595. msg.qos = (mqtt_qos_t)qos;
  596. /* for qos1 and qos2, you need to send a ack packet */
  597. if (msg.qos != QOS0) {
  598. platform_mutex_lock(&c->mqtt_write_lock);
  599. if (msg.qos == QOS1)
  600. len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBACK, 0, msg.id);
  601. else if (msg.qos == QOS2)
  602. len = MQTTSerialize_ack(c->mqtt_write_buf, c->mqtt_write_buf_size, PUBREC, 0, msg.id);
  603. if (len <= 0)
  604. rc = MQTT_SERIALIZE_PUBLISH_ACK_PACKET_ERROR;
  605. else
  606. rc = mqtt_send_packet(c, len, timer);
  607. platform_mutex_unlock(&c->mqtt_write_lock);
  608. }
  609. if (rc < 0)
  610. RETURN_ERROR(rc);
  611. if (msg.qos != QOS2)
  612. mqtt_deliver_message(c, &topic_name, &msg);
  613. else {
  614. /* record the received of a qos2 message and only processes it when the qos2 message is received for the first time */
  615. if ((rc = mqtt_ack_list_record(c, PUBREL, msg.id, len, NULL)) != MQTT_ACK_NODE_IS_EXIST_ERROR)
  616. mqtt_deliver_message(c, &topic_name, &msg);
  617. }
  618. RETURN_ERROR(rc);
  619. }
  620. static int mqtt_pubrec_and_pubrel_packet_handle(mqtt_client_t *c, platform_timer_t *timer)
  621. {
  622. int rc = MQTT_FAILED_ERROR;
  623. uint16_t packet_id;
  624. uint8_t dup, packet_type;
  625. rc = mqtt_is_connected(c);
  626. if (MQTT_SUCCESS_ERROR != rc)
  627. RETURN_ERROR(rc);
  628. if (MQTTDeserialize_ack(&packet_type, &dup, &packet_id, c->mqtt_read_buf, c->mqtt_read_buf_size) != 1)
  629. RETURN_ERROR(MQTT_PUBREC_PACKET_ERROR);
  630. (void) dup;
  631. rc = mqtt_publish_ack_packet(c, packet_id, packet_type); /* make a ack packet and send it */
  632. rc = mqtt_ack_list_unrecord(c, packet_type, packet_id, NULL);
  633. RETURN_ERROR(rc);
  634. }
  635. static int mqtt_packet_handle(mqtt_client_t* c, platform_timer_t* timer)
  636. {
  637. int rc = MQTT_SUCCESS_ERROR;
  638. int packet_type = 0;
  639. rc = mqtt_read_packet(c, &packet_type, timer);
  640. switch (packet_type) {
  641. case 0: /* timed out reading packet or an error occurred while reading data*/
  642. if (MQTT_BUFFER_TOO_SHORT_ERROR == rc) {
  643. MQTT_LOG_E("the client read buffer is too short, please call mqtt_set_read_buf_size() to reset the buffer size");
  644. /* don't return directly, you need to stay active, because there is data readable now, but the buffer is too small */
  645. }
  646. break;
  647. case CONNACK: /* has been processed */
  648. goto exit;
  649. case PUBACK:
  650. case PUBCOMP:
  651. rc = mqtt_puback_and_pubcomp_packet_handle(c, timer);
  652. break;
  653. case SUBACK:
  654. rc = mqtt_suback_packet_handle(c, timer);
  655. break;
  656. case UNSUBACK:
  657. rc = mqtt_unsuback_packet_handle(c, timer);
  658. break;
  659. case PUBLISH:
  660. rc = mqtt_publish_packet_handle(c, timer);
  661. break;
  662. case PUBREC:
  663. case PUBREL:
  664. rc = mqtt_pubrec_and_pubrel_packet_handle(c, timer);
  665. break;
  666. case PINGRESP:
  667. c->mqtt_ping_outstanding = 0; /* keep alive ping success */
  668. break;
  669. default:
  670. goto exit;
  671. }
  672. rc = mqtt_keep_alive(c);
  673. exit:
  674. if (rc == MQTT_SUCCESS_ERROR)
  675. rc = packet_type;
  676. RETURN_ERROR(rc);
  677. }
  678. static int mqtt_wait_packet(mqtt_client_t* c, int packet_type, platform_timer_t* timer)
  679. {
  680. int rc = MQTT_FAILED_ERROR;
  681. do {
  682. if (platform_timer_is_expired(timer))
  683. break;
  684. rc = mqtt_packet_handle(c, timer);
  685. } while (rc != packet_type && rc >= 0);
  686. RETURN_ERROR(rc);
  687. }
  688. static int mqtt_yield(mqtt_client_t* c, int timeout_ms)
  689. {
  690. int rc = MQTT_SUCCESS_ERROR;
  691. client_state_t state;
  692. platform_timer_t timer;
  693. if (NULL == c)
  694. RETURN_ERROR(MQTT_FAILED_ERROR);
  695. if (0 == timeout_ms)
  696. timeout_ms = c->mqtt_cmd_timeout;
  697. platform_timer_init(&timer);
  698. platform_timer_cutdown(&timer, timeout_ms);
  699. while (!platform_timer_is_expired(&timer)) {
  700. state = mqtt_get_client_state(c);
  701. if (CLIENT_STATE_CLEAN_SESSION == state) {
  702. RETURN_ERROR(MQTT_CLEAN_SESSION_ERROR);
  703. } else if (CLIENT_STATE_CONNECTED != state) {
  704. /* mqtt not connect, need reconnect */
  705. rc = mqtt_try_reconnect(c);
  706. if (MQTT_RECONNECT_TIMEOUT_ERROR == rc)
  707. RETURN_ERROR(rc);
  708. continue;
  709. }
  710. /* mqtt connected, handle mqtt packet */
  711. rc = mqtt_packet_handle(c, &timer);
  712. if (rc >= 0) {
  713. /* scan ack list, destroy ack handler that have timed out or resend them */
  714. mqtt_ack_list_scan(c, 1);
  715. } else if (MQTT_NOT_CONNECT_ERROR == rc) {
  716. MQTT_LOG_E("%s:%d %s()... mqtt not connect", __FILE__, __LINE__, __FUNCTION__);
  717. } else {
  718. break;
  719. }
  720. }
  721. RETURN_ERROR(rc);
  722. }
  723. static void mqtt_yield_thread(void *arg)
  724. {
  725. int rc;
  726. client_state_t state;
  727. mqtt_client_t *c = (mqtt_client_t *)arg;
  728. platform_thread_t *thread_to_be_destoried = NULL;
  729. state = mqtt_get_client_state(c);
  730. if (CLIENT_STATE_CONNECTED != state) {
  731. MQTT_LOG_W("%s:%d %s()..., mqtt is not connected to the server...", __FILE__, __LINE__, __FUNCTION__);
  732. platform_thread_stop(c->mqtt_thread); /* mqtt is not connected to the server, stop thread */
  733. }
  734. while (1) {
  735. rc = mqtt_yield(c, c->mqtt_cmd_timeout);
  736. if (MQTT_CLEAN_SESSION_ERROR == rc) {
  737. MQTT_LOG_W("%s:%d %s()..., mqtt clean session....", __FILE__, __LINE__, __FUNCTION__);
  738. network_disconnect(c->mqtt_network);
  739. mqtt_clean_session(c);
  740. goto exit;
  741. } else if (MQTT_RECONNECT_TIMEOUT_ERROR == rc) {
  742. MQTT_LOG_W("%s:%d %s()..., mqtt reconnect timeout....", __FILE__, __LINE__, __FUNCTION__);
  743. }
  744. }
  745. exit:
  746. thread_to_be_destoried = c->mqtt_thread;
  747. if(NULL != thread_to_be_destoried)
  748. {
  749. platform_thread_destroy(thread_to_be_destoried);
  750. platform_memory_free(thread_to_be_destoried);
  751. thread_to_be_destoried = NULL;
  752. }
  753. }
  754. static int mqtt_connect_with_results(mqtt_client_t* c)
  755. {
  756. int len = 0;
  757. int rc = MQTT_CONNECT_FAILED_ERROR;
  758. platform_timer_t connect_timer;
  759. mqtt_connack_data_t connack_data = {0};
  760. MQTTPacket_connectData connect_data = MQTTPacket_connectData_initializer;
  761. if (NULL == c)
  762. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  763. if (CLIENT_STATE_CONNECTED == mqtt_get_client_state(c))
  764. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  765. #ifndef MQTT_NETWORK_TYPE_NO_TLS
  766. rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, c->mqtt_ca);
  767. #else
  768. rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
  769. #endif
  770. rc = network_connect(c->mqtt_network);
  771. if (MQTT_SUCCESS_ERROR != rc) {
  772. if (NULL != c->mqtt_network) {
  773. network_release(c->mqtt_network);
  774. RETURN_ERROR(rc);
  775. }
  776. }
  777. MQTT_LOG_I("%s:%d %s()... mqtt connect success...", __FILE__, __LINE__, __FUNCTION__);
  778. connect_data.keepAliveInterval = c->mqtt_keep_alive_interval;
  779. connect_data.cleansession = c->mqtt_clean_session;
  780. connect_data.MQTTVersion = c->mqtt_version;
  781. connect_data.clientID.cstring= c->mqtt_client_id;
  782. connect_data.username.cstring = c->mqtt_user_name;
  783. connect_data.password.cstring = c->mqtt_password;
  784. if (c->mqtt_will_flag) {
  785. connect_data.willFlag = c->mqtt_will_flag;
  786. connect_data.will.message.cstring = c->mqtt_will_options->will_message;
  787. connect_data.will.qos = c->mqtt_will_options->will_qos;
  788. connect_data.will.retained = c->mqtt_will_options->will_retained;
  789. connect_data.will.topicName.cstring = c->mqtt_will_options->will_topic;
  790. }
  791. platform_timer_cutdown(&c->mqtt_last_received, (c->mqtt_keep_alive_interval * 1000));
  792. platform_mutex_lock(&c->mqtt_write_lock);
  793. /* serialize connect packet */
  794. if ((len = MQTTSerialize_connect(c->mqtt_write_buf, c->mqtt_write_buf_size, &connect_data)) <= 0)
  795. goto exit;
  796. platform_timer_cutdown(&connect_timer, c->mqtt_cmd_timeout);
  797. /* send connect packet */
  798. if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
  799. goto exit;
  800. if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
  801. if (MQTTDeserialize_connack(&connack_data.session_present, &connack_data.rc, c->mqtt_read_buf, c->mqtt_read_buf_size) == 1)
  802. rc = connack_data.rc;
  803. else
  804. rc = MQTT_CONNECT_FAILED_ERROR;
  805. } else
  806. rc = MQTT_CONNECT_FAILED_ERROR;
  807. exit:
  808. if (rc == MQTT_SUCCESS_ERROR) {
  809. if(NULL == c->mqtt_thread) {
  810. /* connect success, and need init mqtt thread */
  811. c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread, c, MQTT_THREAD_STACK_SIZE, MQTT_THREAD_PRIO, MQTT_THREAD_TICK);
  812. if (NULL != c->mqtt_thread) {
  813. mqtt_set_client_state(c, CLIENT_STATE_CONNECTED);
  814. platform_thread_startup(c->mqtt_thread);
  815. platform_thread_start(c->mqtt_thread); /* start run mqtt thread */
  816. } else {
  817. /*creat the thread fail and disconnect the mqtt socket connect*/
  818. network_release(c->mqtt_network);
  819. rc = MQTT_CONNECT_FAILED_ERROR;
  820. MQTT_LOG_W("%s:%d %s()... mqtt yield thread creat failed...", __FILE__, __LINE__, __FUNCTION__);
  821. }
  822. } else {
  823. mqtt_set_client_state(c, CLIENT_STATE_CONNECTED); /* reconnect, mqtt thread is already exists */
  824. }
  825. c->mqtt_ping_outstanding = 0; /* reset ping outstanding */
  826. } else {
  827. network_release(c->mqtt_network);
  828. mqtt_set_client_state(c, CLIENT_STATE_INITIALIZED); /* connect failed */
  829. }
  830. platform_mutex_unlock(&c->mqtt_write_lock);
  831. RETURN_ERROR(rc);
  832. }
  833. static uint32_t mqtt_read_buf_malloc(mqtt_client_t* c, uint32_t size)
  834. {
  835. MQTT_ROBUSTNESS_CHECK(c, 0);
  836. if (NULL != c->mqtt_read_buf)
  837. platform_memory_free(c->mqtt_read_buf);
  838. c->mqtt_read_buf_size = size;
  839. /* limit the size of the read buffer */
  840. if ((MQTT_MIN_PAYLOAD_SIZE >= c->mqtt_read_buf_size) || (MQTT_MAX_PAYLOAD_SIZE <= c->mqtt_read_buf_size))
  841. c->mqtt_read_buf_size = MQTT_DEFAULT_BUF_SIZE;
  842. c->mqtt_read_buf = (uint8_t*) platform_memory_alloc(c->mqtt_read_buf_size);
  843. if (NULL == c->mqtt_read_buf) {
  844. MQTT_LOG_E("%s:%d %s()... malloc read buf failed...", __FILE__, __LINE__, __FUNCTION__);
  845. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  846. }
  847. return c->mqtt_read_buf_size;
  848. }
  849. static uint32_t mqtt_write_buf_malloc(mqtt_client_t* c, uint32_t size)
  850. {
  851. MQTT_ROBUSTNESS_CHECK(c, 0);
  852. if (NULL != c->mqtt_write_buf)
  853. platform_memory_free(c->mqtt_write_buf);
  854. c->mqtt_write_buf_size = size;
  855. /* limit the size of the read buffer */
  856. if ((MQTT_MIN_PAYLOAD_SIZE >= c->mqtt_write_buf_size) || (MQTT_MAX_PAYLOAD_SIZE <= c->mqtt_write_buf_size))
  857. c->mqtt_write_buf_size = MQTT_DEFAULT_BUF_SIZE;
  858. c->mqtt_write_buf = (uint8_t*) platform_memory_alloc(c->mqtt_write_buf_size);
  859. if (NULL == c->mqtt_write_buf) {
  860. MQTT_LOG_E("%s:%d %s()... malloc write buf failed...", __FILE__, __LINE__, __FUNCTION__);
  861. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  862. }
  863. return c->mqtt_write_buf_size;
  864. }
  865. static int mqtt_init(mqtt_client_t* c)
  866. {
  867. /* network init */
  868. c->mqtt_network = (network_t*) platform_memory_alloc(sizeof(network_t));
  869. if (NULL == c->mqtt_network) {
  870. MQTT_LOG_E("%s:%d %s()... malloc memory failed...", __FILE__, __LINE__, __FUNCTION__);
  871. RETURN_ERROR(MQTT_MEM_NOT_ENOUGH_ERROR);
  872. }
  873. memset(c->mqtt_network, 0, sizeof(network_t));
  874. c->mqtt_packet_id = 1;
  875. c->mqtt_clean_session = 0; //no clear session by default
  876. c->mqtt_will_flag = 0;
  877. c->mqtt_cmd_timeout = MQTT_DEFAULT_CMD_TIMEOUT;
  878. c->mqtt_client_state = CLIENT_STATE_INITIALIZED;
  879. c->mqtt_ping_outstanding = 0;
  880. c->mqtt_ack_handler_number = 0;
  881. c->mqtt_client_id_len = 0;
  882. c->mqtt_user_name_len = 0;
  883. c->mqtt_password_len = 0;
  884. c->mqtt_keep_alive_interval = MQTT_KEEP_ALIVE_INTERVAL;
  885. c->mqtt_version = MQTT_VERSION;
  886. c->mqtt_reconnect_try_duration = MQTT_RECONNECT_DEFAULT_DURATION;
  887. c->mqtt_will_options = NULL;
  888. c->mqtt_reconnect_data = NULL;
  889. c->mqtt_reconnect_handler = NULL;
  890. c->mqtt_interceptor_handler = NULL;
  891. mqtt_read_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE);
  892. mqtt_write_buf_malloc(c, MQTT_DEFAULT_BUF_SIZE);
  893. mqtt_list_init(&c->mqtt_msg_handler_list);
  894. mqtt_list_init(&c->mqtt_ack_handler_list);
  895. platform_mutex_init(&c->mqtt_write_lock);
  896. platform_mutex_init(&c->mqtt_global_lock);
  897. platform_timer_init(&c->mqtt_last_sent);
  898. platform_timer_init(&c->mqtt_last_received);
  899. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  900. }
  901. /********************************************************* mqttclient global function ********************************************************/
  902. MQTT_CLIENT_SET_DEFINE(client_id, char*, NULL)
  903. MQTT_CLIENT_SET_DEFINE(user_name, char*, NULL)
  904. MQTT_CLIENT_SET_DEFINE(password, char*, NULL)
  905. MQTT_CLIENT_SET_DEFINE(host, char*, NULL)
  906. MQTT_CLIENT_SET_DEFINE(port, char*, NULL)
  907. MQTT_CLIENT_SET_DEFINE(ca, char*, NULL)
  908. MQTT_CLIENT_SET_DEFINE(reconnect_data, void*, NULL)
  909. MQTT_CLIENT_SET_DEFINE(keep_alive_interval, uint16_t, 0)
  910. MQTT_CLIENT_SET_DEFINE(will_flag, uint32_t, 0)
  911. MQTT_CLIENT_SET_DEFINE(clean_session, uint32_t, 0)
  912. MQTT_CLIENT_SET_DEFINE(version, uint32_t, 0)
  913. MQTT_CLIENT_SET_DEFINE(cmd_timeout, uint32_t, 0)
  914. MQTT_CLIENT_SET_DEFINE(reconnect_try_duration, uint32_t, 0)
  915. MQTT_CLIENT_SET_DEFINE(reconnect_handler, reconnect_handler_t, NULL)
  916. MQTT_CLIENT_SET_DEFINE(interceptor_handler, interceptor_handler_t, NULL)
  917. uint32_t mqtt_set_read_buf_size(mqtt_client_t *c, uint32_t size)
  918. {
  919. return mqtt_read_buf_malloc(c, size);
  920. }
  921. uint32_t mqtt_set_write_buf_size(mqtt_client_t *c, uint32_t size)
  922. {
  923. return mqtt_write_buf_malloc(c, size);
  924. }
  925. void mqtt_sleep_ms(int ms)
  926. {
  927. platform_timer_usleep(ms * 1000);
  928. }
  929. int mqtt_keep_alive(mqtt_client_t* c)
  930. {
  931. int rc = MQTT_SUCCESS_ERROR;
  932. rc = mqtt_is_connected(c);
  933. if (MQTT_SUCCESS_ERROR != rc)
  934. RETURN_ERROR(rc);
  935. if (platform_timer_is_expired(&c->mqtt_last_sent) || platform_timer_is_expired(&c->mqtt_last_received)) {
  936. if (c->mqtt_ping_outstanding) {
  937. MQTT_LOG_W("%s:%d %s()... ping outstanding", __FILE__, __LINE__, __FUNCTION__);
  938. /*must realse the socket file descriptor zhaoshimin 20200629*/
  939. network_release(c->mqtt_network);
  940. mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
  941. rc = MQTT_NOT_CONNECT_ERROR; /* PINGRESP not received in keepalive interval */
  942. } else {
  943. platform_timer_t timer;
  944. int len = MQTTSerialize_pingreq(c->mqtt_write_buf, c->mqtt_write_buf_size);
  945. if (len > 0)
  946. rc = mqtt_send_packet(c, len, &timer); // 100ask, send the ping packet
  947. c->mqtt_ping_outstanding++;
  948. }
  949. }
  950. RETURN_ERROR(rc);
  951. }
  952. mqtt_client_t *mqtt_lease(void)
  953. {
  954. int rc;
  955. mqtt_client_t* c;
  956. c = (mqtt_client_t *)platform_memory_alloc(sizeof(mqtt_client_t));
  957. if (NULL == c)
  958. return NULL;
  959. memset(c, 0, sizeof(mqtt_client_t));
  960. rc = mqtt_init(c);
  961. if (MQTT_SUCCESS_ERROR != rc)
  962. return NULL;
  963. return c;
  964. }
  965. int mqtt_release(mqtt_client_t* c)
  966. {
  967. platform_timer_t timer;
  968. if (NULL == c)
  969. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  970. platform_timer_init(&timer);
  971. platform_timer_cutdown(&timer, c->mqtt_cmd_timeout);
  972. /* wait for the clean session to complete */
  973. while ((CLIENT_STATE_INVALID != mqtt_get_client_state(c))) {
  974. // platform_timer_usleep(1000); // 1ms avoid compiler optimization.
  975. if (platform_timer_is_expired(&timer)) {
  976. MQTT_LOG_E("%s:%d %s()... mqtt release failed...", __FILE__, __LINE__, __FUNCTION__);
  977. RETURN_ERROR(MQTT_FAILED_ERROR)
  978. }
  979. }
  980. if (NULL != c->mqtt_network) {
  981. platform_memory_free(c->mqtt_network);
  982. c->mqtt_network = NULL;
  983. }
  984. if (NULL != c->mqtt_read_buf) {
  985. platform_memory_free(c->mqtt_read_buf);
  986. c->mqtt_read_buf = NULL;
  987. }
  988. if (NULL != c->mqtt_write_buf) {
  989. platform_memory_free(c->mqtt_write_buf);
  990. c->mqtt_write_buf = NULL;
  991. }
  992. platform_mutex_destroy(&c->mqtt_write_lock);
  993. platform_mutex_destroy(&c->mqtt_global_lock);
  994. memset(c, 0, sizeof(mqtt_client_t));
  995. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  996. }
  997. int mqtt_connect(mqtt_client_t* c)
  998. {
  999. /* connect server in blocking mode and wait for connection result */
  1000. return mqtt_connect_with_results(c);
  1001. }
  1002. int mqtt_disconnect(mqtt_client_t* c)
  1003. {
  1004. int rc = MQTT_FAILED_ERROR;
  1005. platform_timer_t timer;
  1006. int len = 0;
  1007. platform_timer_init(&timer);
  1008. platform_timer_cutdown(&timer, c->mqtt_cmd_timeout);
  1009. platform_mutex_lock(&c->mqtt_write_lock);
  1010. /* serialize disconnect packet and send it */
  1011. len = MQTTSerialize_disconnect(c->mqtt_write_buf, c->mqtt_write_buf_size);
  1012. if (len > 0)
  1013. rc = mqtt_send_packet(c, len, &timer);
  1014. platform_mutex_unlock(&c->mqtt_write_lock);
  1015. mqtt_set_client_state(c, CLIENT_STATE_CLEAN_SESSION);
  1016. RETURN_ERROR(rc);
  1017. }
  1018. int mqtt_subscribe(mqtt_client_t* c, const char* topic_filter, mqtt_qos_t qos, message_handler_t handler)
  1019. {
  1020. int rc = MQTT_SUBSCRIBE_ERROR;
  1021. int len = 0;
  1022. int qos_level = qos; /* This can avoid alignment bugs, because enum is not int on some compilers */
  1023. uint16_t packet_id;
  1024. platform_timer_t timer;
  1025. MQTTString topic = MQTTString_initializer;
  1026. topic.cstring = (char *)topic_filter;
  1027. message_handlers_t *msg_handler = NULL;
  1028. if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
  1029. RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
  1030. platform_mutex_lock(&c->mqtt_write_lock);
  1031. packet_id = mqtt_get_next_packet_id(c);
  1032. /* serialize subscribe packet and send it */
  1033. len = MQTTSerialize_subscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic, (int *)&qos_level);
  1034. if (len <= 0)
  1035. goto exit;
  1036. if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
  1037. goto exit;
  1038. if (NULL == handler)
  1039. handler = default_msg_handler; /* if handler is not specified, the default handler is used */
  1040. /* create a message and record it */
  1041. msg_handler = mqtt_msg_handler_create(topic_filter, qos, handler);
  1042. if (NULL == msg_handler) {
  1043. rc = MQTT_MEM_NOT_ENOUGH_ERROR;
  1044. goto exit;
  1045. }
  1046. rc = mqtt_ack_list_record(c, SUBACK, packet_id, len, msg_handler);
  1047. exit:
  1048. platform_mutex_unlock(&c->mqtt_write_lock);
  1049. RETURN_ERROR(rc);
  1050. }
  1051. int mqtt_unsubscribe(mqtt_client_t* c, const char* topic_filter)
  1052. {
  1053. int len = 0;
  1054. int rc = MQTT_FAILED_ERROR;
  1055. uint16_t packet_id;
  1056. platform_timer_t timer;
  1057. MQTTString topic = MQTTString_initializer;
  1058. topic.cstring = (char *)topic_filter;
  1059. message_handlers_t *msg_handler = NULL;
  1060. if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c))
  1061. RETURN_ERROR(MQTT_NOT_CONNECT_ERROR);
  1062. platform_mutex_lock(&c->mqtt_write_lock);
  1063. packet_id = mqtt_get_next_packet_id(c);
  1064. /* serialize unsubscribe packet and send it */
  1065. if ((len = MQTTSerialize_unsubscribe(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, packet_id, 1, &topic)) <= 0)
  1066. goto exit;
  1067. if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
  1068. goto exit;
  1069. /* get a already subscribed message handler */
  1070. msg_handler = mqtt_get_msg_handler(c, &topic);
  1071. if (NULL == msg_handler) {
  1072. rc = MQTT_MEM_NOT_ENOUGH_ERROR;
  1073. goto exit;
  1074. }
  1075. rc = mqtt_ack_list_record(c, UNSUBACK, packet_id, len, msg_handler);
  1076. exit:
  1077. platform_mutex_unlock(&c->mqtt_write_lock);
  1078. RETURN_ERROR(rc);
  1079. }
  1080. int mqtt_publish(mqtt_client_t* c, const char* topic_filter, mqtt_message_t* msg)
  1081. {
  1082. int len = 0;
  1083. int rc = MQTT_FAILED_ERROR;
  1084. platform_timer_t timer;
  1085. MQTTString topic = MQTTString_initializer;
  1086. topic.cstring = (char *)topic_filter;
  1087. if (CLIENT_STATE_CONNECTED != mqtt_get_client_state(c)) {
  1088. msg->payloadlen = 0; // clear
  1089. rc = MQTT_NOT_CONNECT_ERROR;
  1090. RETURN_ERROR(rc);
  1091. // goto exit; /* 100ask */
  1092. }
  1093. if ((NULL != msg->payload) && (0 == msg->payloadlen))
  1094. msg->payloadlen = strlen((char*)msg->payload);
  1095. if (msg->payloadlen > c->mqtt_write_buf_size) {
  1096. MQTT_LOG_E("publish payload len is is greater than client write buffer...");
  1097. RETURN_ERROR(MQTT_BUFFER_TOO_SHORT_ERROR);
  1098. }
  1099. platform_mutex_lock(&c->mqtt_write_lock);
  1100. if (QOS0 != msg->qos) {
  1101. if (mqtt_ack_handler_is_maximum(c)) {
  1102. rc = MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR; /* the recorded ack handler has reached the maximum */
  1103. goto exit;
  1104. }
  1105. msg->id = mqtt_get_next_packet_id(c);
  1106. }
  1107. /* serialize publish packet and send it */
  1108. len = MQTTSerialize_publish(c->mqtt_write_buf, c->mqtt_write_buf_size, 0, msg->qos, msg->retained, msg->id,
  1109. topic, (uint8_t*)msg->payload, msg->payloadlen);
  1110. if (len <= 0)
  1111. goto exit;
  1112. if ((rc = mqtt_send_packet(c, len, &timer)) != MQTT_SUCCESS_ERROR)
  1113. goto exit;
  1114. if (QOS0 != msg->qos) {
  1115. mqtt_set_publish_dup(c, 1); /* may resend this data, set the dup flag in advance */
  1116. if (QOS1 == msg->qos) {
  1117. /* expect to receive PUBACK, otherwise data will be resent */
  1118. rc = mqtt_ack_list_record(c, PUBACK, msg->id, len, NULL);
  1119. } else if (QOS2 == msg->qos) {
  1120. /* expect to receive PUBREC, otherwise data will be resent */
  1121. rc = mqtt_ack_list_record(c, PUBREC, msg->id, len, NULL);
  1122. }
  1123. }
  1124. exit:
  1125. msg->payloadlen = 0; // clear
  1126. platform_mutex_unlock(&c->mqtt_write_lock);
  1127. if ((MQTT_ACK_HANDLER_NUM_TOO_MUCH_ERROR == rc) || (MQTT_MEM_NOT_ENOUGH_ERROR == rc)) {
  1128. MQTT_LOG_W("%s:%d %s()... there is not enough memory space to record...", __FILE__, __LINE__, __FUNCTION__);
  1129. /*must realse the socket file descriptor zhaoshimin 20200629*/
  1130. network_release(c->mqtt_network);
  1131. /* record too much retransmitted data, may be disconnected, need to reconnect */
  1132. mqtt_set_client_state(c, CLIENT_STATE_DISCONNECTED);
  1133. }
  1134. RETURN_ERROR(rc);
  1135. }
  1136. int mqtt_list_subscribe_topic(mqtt_client_t* c)
  1137. {
  1138. int i = 0;
  1139. mqtt_list_t *curr, *next;
  1140. message_handlers_t *msg_handler;
  1141. if (NULL == c)
  1142. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  1143. if (mqtt_list_is_empty(&c->mqtt_msg_handler_list))
  1144. MQTT_LOG_I("%s:%d %s()... there are no subscribed topics...", __FILE__, __LINE__, __FUNCTION__);
  1145. LIST_FOR_EACH_SAFE(curr, next, &c->mqtt_msg_handler_list) {
  1146. msg_handler = LIST_ENTRY(curr, message_handlers_t, list);
  1147. /* determine whether a node already exists by mqtt topic, but wildcards are not supported */
  1148. if (NULL != msg_handler->topic_filter) {
  1149. MQTT_LOG_I("%s:%d %s()...[%d] subscribe topic: %s", __FILE__, __LINE__, __FUNCTION__, ++i ,msg_handler->topic_filter);
  1150. }
  1151. }
  1152. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  1153. }
  1154. int mqtt_set_will_options(mqtt_client_t* c, char *topic, mqtt_qos_t qos, uint8_t retained, char *message)
  1155. {
  1156. if ((NULL == c) || (NULL == topic))
  1157. RETURN_ERROR(MQTT_NULL_VALUE_ERROR);
  1158. if (NULL == c->mqtt_will_options) {
  1159. c->mqtt_will_options = (mqtt_will_options_t *)platform_memory_alloc(sizeof(mqtt_will_options_t));
  1160. MQTT_ROBUSTNESS_CHECK(c->mqtt_will_options, MQTT_MEM_NOT_ENOUGH_ERROR);
  1161. }
  1162. if (0 == c->mqtt_will_flag)
  1163. c->mqtt_will_flag = 1;
  1164. c->mqtt_will_options->will_topic = topic;
  1165. c->mqtt_will_options->will_qos = qos;
  1166. c->mqtt_will_options->will_retained = retained;
  1167. c->mqtt_will_options->will_message = message;
  1168. RETURN_ERROR(MQTT_SUCCESS_ERROR);
  1169. }