123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- /*******************************************************************************
- * Copyright (c) 2014 IBM Corp.
- *
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Eclipse Distribution License v1.0 which accompany this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- * and the Eclipse Distribution License is available at
- * http://www.eclipse.org/org/documents/edl-v10.php.
- *
- * Contributors:
- * Ian Craggs - initial API and implementation and/or initial documentation
- * Sergio R. Caprile - "commonalization" from prior samples and/or documentation extension
- *******************************************************************************/
- #include "transport.h"
- #include "lwip/opt.h"
- #include "lwip/arch.h"
- #include "lwip/api.h"
- #include "lwip/inet.h"
- #include "lwip/sockets.h"
- #include "string.h"
- /**
- This simple low-level implementation assumes a single connection for a single thread. Thus, a static
- variable is used for that connection.
- On other scenarios, the user must solve this by taking into account that the current implementation of
- MQTTPacket_read() has a function pointer for a function call to get the data to a buffer, but no provisions
- to know the caller or other indicator (the socket id): int (*getfn)(unsigned char*, int)
- */
- static unsigned char transport_recvBuf[3000];
- static int transport_dataStart;
- static int transport_dataLength;
- //从缓存中读取数据,为mqtt协议栈底层提供数据接口
- int transport_getdata(unsigned char *buf, int cnt)
- {
- int len = 0;
-
- if(cnt < transport_dataLength)
- {
- len = cnt;
- memcpy(buf, transport_recvBuf + transport_dataStart, len);
- transport_dataStart += len;
- transport_dataLength -= len;
- }
- else
- {
- if(transport_dataLength == 0) len = 0;
- else
- {
- len = transport_dataLength;
- memcpy(buf, transport_recvBuf + transport_dataStart, len);
- transport_dataStart = 0;
- transport_dataLength = 0;
- }
- }
-
- return len;
- }
- //mqtt发送数据
- int transport_send(int sock, unsigned char* buf, int buflen)
- {
- return write(sock, buf, buflen);
- }
- //mqtt接收数据
- int transport_receive(int sock)
- {
- int rc;
- socklen_t optlen = 4;
- int optval;
-
- rc = read(sock, transport_recvBuf, sizeof(transport_recvBuf));
- if(rc > 0) transport_dataLength = rc;
- else
- {
- transport_dataLength = 0;
- getsockopt(sock, SOL_SOCKET, SO_ERROR, &optval, &optlen);
- rc = optval;
- }
- transport_dataStart = 0;
-
- return rc;
- }
- //为mqtt创建一个socket
- int transport_open(char* serverip, int port)
- {
- struct sockaddr_in address;
- int sock = -1;
- int rc = -1;
- int timeout;
-
- memset(&address, 0, sizeof(address));
- address.sin_len = sizeof(address);
- address.sin_family = AF_INET;
- address.sin_port = htons(port);
- address.sin_addr.s_addr = inet_addr((char*)serverip);
- sock = socket(AF_INET, SOCK_STREAM, 0);
- if(sock < 0) return -1;
-
- rc = connect(sock, (struct sockaddr*)&address, sizeof(address));
- if(rc != 0)
- {
- close(sock);
- return -1;
- }
-
- timeout = 5000;
- setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(int));
- setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &timeout, sizeof(int));
-
- return sock;
- }
- //关闭socket接口
- int transport_close(int sock)
- {
- int rc;
- rc = shutdown(sock, SHUT_WR);
- rc = recv(sock, NULL, (size_t)0, 0);
- rc = close(sock);
- return rc;
- }
|