transport.c 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. /*******************************************************************************
  2. * Copyright (c) 2014 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v1.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * http://www.eclipse.org/legal/epl-v10.html
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial API and implementation and/or initial documentation
  15. * Sergio R. Caprile - "commonalization" from prior samples and/or documentation extension
  16. *******************************************************************************/
  17. #include "transport.h"
  18. #include "lwip/opt.h"
  19. #include "lwip/arch.h"
  20. #include "lwip/api.h"
  21. #include "lwip/inet.h"
  22. #include "lwip/sockets.h"
  23. #include "string.h"
  24. /**
  25. This simple low-level implementation assumes a single connection for a single thread. Thus, a static
  26. variable is used for that connection.
  27. On other scenarios, the user must solve this by taking into account that the current implementation of
  28. MQTTPacket_read() has a function pointer for a function call to get the data to a buffer, but no provisions
  29. to know the caller or other indicator (the socket id): int (*getfn)(unsigned char*, int)
  30. */
  31. static unsigned char transport_recvBuf[3000];
  32. static int transport_dataStart;
  33. static int transport_dataLength;
  34. //从缓存中读取数据,为mqtt协议栈底层提供数据接口
  35. int transport_getdata(unsigned char *buf, int cnt)
  36. {
  37. int len = 0;
  38. if(cnt < transport_dataLength)
  39. {
  40. len = cnt;
  41. memcpy(buf, transport_recvBuf + transport_dataStart, len);
  42. transport_dataStart += len;
  43. transport_dataLength -= len;
  44. }
  45. else
  46. {
  47. if(transport_dataLength == 0) len = 0;
  48. else
  49. {
  50. len = transport_dataLength;
  51. memcpy(buf, transport_recvBuf + transport_dataStart, len);
  52. transport_dataStart = 0;
  53. transport_dataLength = 0;
  54. }
  55. }
  56. return len;
  57. }
  58. //mqtt发送数据
  59. int transport_send(int sock, unsigned char* buf, int buflen)
  60. {
  61. return write(sock, buf, buflen);
  62. }
  63. //mqtt接收数据
  64. int transport_receive(int sock)
  65. {
  66. int rc;
  67. socklen_t optlen = 4;
  68. int optval;
  69. rc = read(sock, transport_recvBuf, sizeof(transport_recvBuf));
  70. if(rc > 0) transport_dataLength = rc;
  71. else
  72. {
  73. transport_dataLength = 0;
  74. getsockopt(sock, SOL_SOCKET, SO_ERROR, &optval, &optlen);
  75. rc = optval;
  76. }
  77. transport_dataStart = 0;
  78. return rc;
  79. }
  80. //为mqtt创建一个socket
  81. int transport_open(char* serverip, int port)
  82. {
  83. struct sockaddr_in address;
  84. int sock = -1;
  85. int rc = -1;
  86. int timeout;
  87. memset(&address, 0, sizeof(address));
  88. address.sin_len = sizeof(address);
  89. address.sin_family = AF_INET;
  90. address.sin_port = htons(port);
  91. address.sin_addr.s_addr = inet_addr((char*)serverip);
  92. sock = socket(AF_INET, SOCK_STREAM, 0);
  93. if(sock < 0) return -1;
  94. rc = connect(sock, (struct sockaddr*)&address, sizeof(address));
  95. if(rc != 0)
  96. {
  97. close(sock);
  98. return -1;
  99. }
  100. timeout = 5000;
  101. setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(int));
  102. setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &timeout, sizeof(int));
  103. return sock;
  104. }
  105. //关闭socket接口
  106. int transport_close(int sock)
  107. {
  108. int rc;
  109. rc = shutdown(sock, SHUT_WR);
  110. rc = recv(sock, NULL, (size_t)0, 0);
  111. rc = close(sock);
  112. return rc;
  113. }