/******************************************************************************* * Copyright (c) 2014 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * http://www.eclipse.org/legal/epl-v10.html * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial API and implementation and/or initial documentation * Sergio R. Caprile - "commonalization" from prior samples and/or documentation extension *******************************************************************************/ #include "transport.h" #include "lwip/opt.h" #include "lwip/arch.h" #include "lwip/api.h" #include "lwip/inet.h" #include "lwip/sockets.h" #include "string.h" /** This simple low-level implementation assumes a single connection for a single thread. Thus, a static variable is used for that connection. On other scenarios, the user must solve this by taking into account that the current implementation of MQTTPacket_read() has a function pointer for a function call to get the data to a buffer, but no provisions to know the caller or other indicator (the socket id): int (*getfn)(unsigned char*, int) */ static unsigned char transport_recvBuf[3000]; static int transport_dataStart; static int transport_dataLength; //从缓存中读取数据,为mqtt协议栈底层提供数据接口 int transport_getdata(unsigned char *buf, int cnt) { int len = 0; if(cnt < transport_dataLength) { len = cnt; memcpy(buf, transport_recvBuf + transport_dataStart, len); transport_dataStart += len; transport_dataLength -= len; } else { if(transport_dataLength == 0) len = 0; else { len = transport_dataLength; memcpy(buf, transport_recvBuf + transport_dataStart, len); transport_dataStart = 0; transport_dataLength = 0; } } return len; } //mqtt发送数据 int transport_send(int sock, unsigned char* buf, int buflen) { return write(sock, buf, buflen); } //mqtt接收数据 int transport_receive(int sock) { int rc; socklen_t optlen = 4; int optval; rc = read(sock, transport_recvBuf, sizeof(transport_recvBuf)); if(rc > 0) transport_dataLength = rc; else { transport_dataLength = 0; getsockopt(sock, SOL_SOCKET, SO_ERROR, &optval, &optlen); rc = optval; } transport_dataStart = 0; return rc; } //为mqtt创建一个socket int transport_open(char* serverip, int port) { struct sockaddr_in address; int sock = -1; int rc = -1; int timeout; memset(&address, 0, sizeof(address)); address.sin_len = sizeof(address); address.sin_family = AF_INET; address.sin_port = htons(port); address.sin_addr.s_addr = inet_addr((char*)serverip); sock = socket(AF_INET, SOCK_STREAM, 0); if(sock < 0) return -1; rc = connect(sock, (struct sockaddr*)&address, sizeof(address)); if(rc != 0) { close(sock); return -1; } timeout = 5000; setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(int)); setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &timeout, sizeof(int)); return sock; } //关闭socket接口 int transport_close(int sock) { int rc; rc = shutdown(sock, SHUT_WR); rc = recv(sock, NULL, (size_t)0, 0); rc = close(sock); return rc; }