Add timeouts for connect operations

This commit is contained in:
Cameron Gutman 2016-02-16 10:15:01 -05:00
parent 42af179770
commit 1662ce1ee8
7 changed files with 71 additions and 16 deletions

View File

@ -33,6 +33,8 @@ static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
#define IDX_INVALIDATE_REF_FRAMES 2 #define IDX_INVALIDATE_REF_FRAMES 2
#define IDX_LOSS_STATS 3 #define IDX_LOSS_STATS 3
#define CONTROL_STREAM_TIMEOUT_SEC 10
static const short packetTypesGen3[] = { static const short packetTypesGen3[] = {
0x140b, // Start A 0x140b, // Start A
0x1410, // Start B 0x1410, // Start B
@ -417,7 +419,8 @@ int stopControlStream(void) {
int startControlStream(void) { int startControlStream(void) {
int err; int err;
ctlSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, 47995); ctlSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen,
47995, CONTROL_STREAM_TIMEOUT_SEC);
if (ctlSock == INVALID_SOCKET) { if (ctlSock == INVALID_SOCKET) {
return LastSocketFail(); return LastSocketFail();
} }

View File

@ -15,6 +15,7 @@ static PLT_THREAD inputSendThread;
static OAES_CTX* oaesContext; static OAES_CTX* oaesContext;
#define MAX_INPUT_PACKET_SIZE 128 #define MAX_INPUT_PACKET_SIZE 128
#define INPUT_STREAM_TIMEOUT_SEC 10
// Contains input stream packets // Contains input stream packets
typedef struct _PACKET_HOLDER { typedef struct _PACKET_HOLDER {
@ -272,7 +273,8 @@ static void inputSendThreadProc(void* context) {
int startInputStream(void) { int startInputStream(void) {
int err; int err;
inputSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, 35043); inputSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen,
35043, INPUT_STREAM_TIMEOUT_SEC);
if (inputSock == INVALID_SOCKET) { if (inputSock == INVALID_SOCKET) {
return LastSocketFail(); return LastSocketFail();
} }

View File

@ -13,6 +13,7 @@
#include <unistd.h> #include <unistd.h>
#include <pthread.h> #include <pthread.h>
#include <sys/time.h> #include <sys/time.h>
#include <sys/ioctl.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/in.h> #include <netinet/in.h>
#endif #endif

View File

@ -110,34 +110,80 @@ SOCKET bindUdpSocket(int addrfamily, int bufferSize) {
return s; return s;
} }
SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port) { SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port, int timeoutSec) {
SOCKET s; SOCKET s;
struct sockaddr_in6 addr; struct sockaddr_in6 addr;
int err; int err;
int val;
struct fd_set writefds, exceptfds;
struct timeval tv;
s = socket(dstaddr->ss_family, SOCK_STREAM, IPPROTO_TCP); s = socket(dstaddr->ss_family, SOCK_STREAM, IPPROTO_TCP);
if (s == INVALID_SOCKET) { if (s == INVALID_SOCKET) {
Limelog("socket() failed: %d\n", (int)LastSocketError()); Limelog("socket() failed: %d\n", (int)LastSocketError());
return INVALID_SOCKET; return INVALID_SOCKET;
} }
#ifdef LC_DARWIN #ifdef LC_DARWIN
{ // Disable SIGPIPE on iOS
// Disable SIGPIPE on iOS val = 1;
int val = 1; setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (char*)&val, sizeof(val));
setsockopt(s, SOL_SOCKET, SO_NOSIGPIPE, (char*)&val, sizeof(val));
}
#endif #endif
// Enable non-blocking I/O for connect timeout support
val = 1;
ioctlsocket(s, FIONBIO, &val);
// Start async connection
memcpy(&addr, dstaddr, sizeof(addr)); memcpy(&addr, dstaddr, sizeof(addr));
addr.sin6_port = htons(port); addr.sin6_port = htons(port);
if (connect(s, (struct sockaddr*) &addr, addrlen) == SOCKET_ERROR) { connect(s, (struct sockaddr*) &addr, addrlen);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
FD_SET(s, &writefds);
FD_SET(s, &exceptfds);
tv.tv_sec = timeoutSec;
tv.tv_usec = 0;
// Wait for the connection to complete or the timeout to elapse
err = select(s + 1, NULL, &writefds, &exceptfds, &tv);
if (err < 0) {
// select() failed
err = LastSocketError(); err = LastSocketError();
Limelog("connect() failed: %d\n", err); Limelog("select() failed: %d\n", err);
closeSocket(s); closeSocket(s);
SetLastSocketError(err); SetLastSocketError(err);
return INVALID_SOCKET; return INVALID_SOCKET;
} }
else if (err == 0) {
// select() timed out
Limelog("select() timed out after %d seconds\n", timeoutSec);
closeSocket(s);
#if defined(LC_WINDOWS)
SetLastSocketError(WSAEWOULDBLOCK);
#else
SetLastSocketError(EWOULDBLOCK);
#endif
return INVALID_SOCKET;
}
else if (FD_ISSET(s, &writefds) || FD_ISSET(s, &exceptfds)) {
// The socket was signalled
SOCKADDR_LEN len = sizeof(err);
getsockopt(s, SOL_SOCKET, SO_ERROR, (char*)&err, &len);
if (err != 0 || FD_ISSET(s, &exceptfds)) {
err = (err != 0) ? err : LastSocketFail();
Limelog("connect() failed: %d\n", err);
closeSocket(s);
SetLastSocketError(err);
return INVALID_SOCKET;
}
}
// Disable non-blocking I/O now that the connection is established
val = 0;
ioctlsocket(s, FIONBIO, &val);
return s; return s;
} }

View File

@ -25,6 +25,7 @@ typedef int SOCKADDR_LEN;
#include <errno.h> #include <errno.h>
#include <signal.h> #include <signal.h>
#define ioctlsocket ioctl
#define LastSocketError() errno #define LastSocketError() errno
#define SetLastSocketError(x) errno = x #define SetLastSocketError(x) errno = x
#define INVALID_SOCKET -1 #define INVALID_SOCKET -1
@ -41,7 +42,7 @@ typedef socklen_t SOCKADDR_LEN;
#define URLSAFESTRING_LEN (INET6_ADDRSTRLEN+2) #define URLSAFESTRING_LEN (INET6_ADDRSTRLEN+2)
void addrToUrlSafeString(struct sockaddr_storage* addr, char* string); void addrToUrlSafeString(struct sockaddr_storage* addr, char* string);
SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port); SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port, int timeoutSec);
SOCKET bindUdpSocket(int addrfamily, int bufferSize); SOCKET bindUdpSocket(int addrfamily, int bufferSize);
int enableNoDelay(SOCKET s); int enableNoDelay(SOCKET s);
int recvUdpSocket(SOCKET s, char* buffer, int size); int recvUdpSocket(SOCKET s, char* buffer, int size);

View File

@ -2,7 +2,7 @@
#include "Rtsp.h" #include "Rtsp.h"
#define RTSP_MAX_RESP_SIZE 32768 #define RTSP_MAX_RESP_SIZE 32768
#define RTSP_READ_TIMEOUT_SEC 10 #define RTSP_TIMEOUT_SEC 10
static SOCKET sock = INVALID_SOCKET; static SOCKET sock = INVALID_SOCKET;
static int currentSeqNumber; static int currentSeqNumber;
@ -88,13 +88,13 @@ static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, in
*error = -1; *error = -1;
sock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, 48010); sock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, 48010, RTSP_TIMEOUT_SEC);
if (sock == INVALID_SOCKET) { if (sock == INVALID_SOCKET) {
*error = LastSocketError(); *error = LastSocketError();
return ret; return ret;
} }
enableNoDelay(sock); enableNoDelay(sock);
setRecvTimeout(sock, RTSP_READ_TIMEOUT_SEC); setRecvTimeout(sock, RTSP_TIMEOUT_SEC);
serializedMessage = serializeRtspMessage(request, &messageLen); serializedMessage = serializeRtspMessage(request, &messageLen);
if (serializedMessage == NULL) { if (serializedMessage == NULL) {

View File

@ -4,6 +4,7 @@
#include "RtpReorderQueue.h" #include "RtpReorderQueue.h"
#define FIRST_FRAME_MAX 1500 #define FIRST_FRAME_MAX 1500
#define FIRST_FRAME_TIMEOUT_SEC 10
#define RTP_PORT 47998 #define RTP_PORT 47998
#define FIRST_FRAME_PORT 47996 #define FIRST_FRAME_PORT 47996
@ -218,7 +219,8 @@ int startVideoStream(void* rendererContext, int drFlags) {
if (ServerMajorVersion == 3) { if (ServerMajorVersion == 3) {
// Connect this socket to open port 47998 for our ping thread // Connect this socket to open port 47998 for our ping thread
firstFrameSocket = connectTcpSocket(&RemoteAddr, RemoteAddrLen, FIRST_FRAME_PORT); firstFrameSocket = connectTcpSocket(&RemoteAddr, RemoteAddrLen,
FIRST_FRAME_PORT, FIRST_FRAME_TIMEOUT_SEC);
if (firstFrameSocket == INVALID_SOCKET) { if (firstFrameSocket == INVALID_SOCKET) {
return LastSocketError(); return LastSocketError();
} }