diff --git a/limelight-common/AudioStream.c b/limelight-common/AudioStream.c index de34df7..748d06f 100644 --- a/limelight-common/AudioStream.c +++ b/limelight-common/AudioStream.c @@ -245,7 +245,7 @@ void stopAudioStream(void) { } if (rtpSocket != INVALID_SOCKET) { - closesocket(rtpSocket); + closeSocket(rtpSocket); rtpSocket = INVALID_SOCKET; } diff --git a/limelight-common/ControlStream.c b/limelight-common/ControlStream.c index 0d43c75..2fda738 100644 --- a/limelight-common/ControlStream.c +++ b/limelight-common/ControlStream.c @@ -384,7 +384,7 @@ int stopControlStream(void) { PltInterruptThread(&invalidateRefFramesThread); if (ctlSock != INVALID_SOCKET) { - closesocket(ctlSock); + closeSocket(ctlSock); ctlSock = INVALID_SOCKET; } diff --git a/limelight-common/InputStream.c b/limelight-common/InputStream.c index b0d05ff..60e431c 100644 --- a/limelight-common/InputStream.c +++ b/limelight-common/InputStream.c @@ -292,7 +292,7 @@ int stopInputStream(void) { PltInterruptThread(&inputSendThread); if (inputSock != INVALID_SOCKET) { - closesocket(inputSock); + closeSocket(inputSock); inputSock = INVALID_SOCKET; } diff --git a/limelight-common/Platform.c b/limelight-common/Platform.c index d9bde67..a6fe891 100644 --- a/limelight-common/Platform.c +++ b/limelight-common/Platform.c @@ -4,6 +4,13 @@ int initializePlatformSockets(void); void cleanupPlatformSockets(void); + +struct thread_context { + ThreadEntry entry; + void* context; + PLT_THREAD* thread; +}; + #if defined(LC_WINDOWS) void LimelogWindows(char* Format, ...) { va_list va; @@ -17,30 +24,33 @@ void LimelogWindows(char* Format, ...) { } #endif -#if defined(LC_WINDOWS) PLT_MUTEX thread_list_lock; PLT_THREAD* thread_head; +#if defined(LC_WINDOWS) DWORD WINAPI ThreadProc(LPVOID lpParameter) { struct thread_context* ctx = (struct thread_context*)lpParameter; - - ctx->entry(ctx->context); - - free(ctx); - - return 0; -} #else void* ThreadProc(void* context) { struct thread_context* ctx = (struct thread_context*)context; +#endif + + // Add this thread to the thread list + PltLockMutex(&thread_list_lock); + ctx->thread->next = thread_head; + thread_head = ctx->thread; + PltUnlockMutex(&thread_list_lock); ctx->entry(ctx->context); free(ctx); +#if defined(LC_WINDOWS) + return 0; +#else return NULL; -} #endif +} void PltSleepMs(int ms) { #if defined(LC_WINDOWS) @@ -51,6 +61,31 @@ void PltSleepMs(int ms) { #endif } +#if defined(LC_WINDOWS) +static PLT_THREAD* findCurrentThread(void) { + PLT_THREAD* current_thread; + + PltLockMutex(&thread_list_lock); + current_thread = thread_head; + while (current_thread != NULL) { +#if defined(LC_WINDOWS) + if (current_thread->tid == GetCurrentThreadId()) { +#else + if (pthread_equal(current_thread->thread, pthread_self())) { +#endif + break; + } + + current_thread = current_thread->next; + } + PltUnlockMutex(&thread_list_lock); + + LC_ASSERT(current_thread != NULL); + + return current_thread; +} +#endif + int PltCreateMutex(PLT_MUTEX* mutex) { #if defined(LC_WINDOWS) *mutex = CreateMutexEx(NULL, NULL, 0, MUTEX_ALL_ACCESS); @@ -92,15 +127,15 @@ void PltUnlockMutex(PLT_MUTEX* mutex) { } void PltJoinThread(PLT_THREAD* thread) { + LC_ASSERT(thread->cancelled != 0); #if defined(LC_WINDOWS) WaitForSingleObjectEx(thread->handle, INFINITE, FALSE); #else - pthread_join(*thread, NULL); + pthread_join(thread->thread, NULL); #endif } void PltCloseThread(PLT_THREAD* thread) { -#if defined(LC_WINDOWS) PLT_THREAD* current_thread; PltLockMutex(&thread_list_lock); @@ -130,28 +165,20 @@ void PltCloseThread(PLT_THREAD* thread) { PltUnlockMutex(&thread_list_lock); +#if defined(LC_WINDOWS) CloseHandle(thread->termRequested); CloseHandle(thread->handle); -#else #endif } int PltIsThreadInterrupted(PLT_THREAD* thread) { -#if defined(LC_WINDOWS) return thread->cancelled; -#else - // The thread will die here if a cancellation was requested - pthread_testcancel(); - return 0; -#endif } void PltInterruptThread(PLT_THREAD* thread) { -#if defined(LC_WINDOWS) thread->cancelled = 1; +#if defined(LC_WINDOWS) SetEvent(thread->termRequested); -#else - pthread_cancel(*thread); #endif } @@ -166,6 +193,7 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD* thread) { ctx->entry = entry; ctx->context = context; + ctx->thread = thread; #if defined(LC_WINDOWS) { @@ -177,28 +205,19 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD* thread) { thread->cancelled = 0; - thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, CREATE_SUSPENDED, &thread->tid); + thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, 0, &thread->tid); if (thread->handle == NULL) { CloseHandle(thread->termRequested); free(ctx); return -1; } - else { - // Add this thread to the thread list - PltLockMutex(&thread_list_lock); - thread->next = thread_head; - thread_head = thread; - PltUnlockMutex(&thread_list_lock); - - // Now the thread can run - ResumeThread(thread->handle); - + else { err = 0; } } #else { - err = pthread_create(thread, NULL, ThreadProc, ctx); + err = pthread_create(&thread->thread, NULL, ThreadProc, ctx); if (err != 0) { free(ctx); } @@ -220,6 +239,7 @@ int PltCreateEvent(PLT_EVENT* event) { pthread_mutex_init(&event->mutex, NULL); pthread_cond_init(&event->cond, NULL); event->signalled = 0; + event->cancelled = 0; return 0; #endif } @@ -228,6 +248,12 @@ void PltCloseEvent(PLT_EVENT* event) { #if defined(LC_WINDOWS) CloseHandle(*event); #else + // Wake up anyone going to wait on this event. + // There's an inherent race condition in this technique + // but it shouldn't be too problematic for our purposes. + event->cancelled = 1; + pthread_cond_broadcast(&event->cond); + pthread_mutex_destroy(&event->mutex); pthread_cond_destroy(&event->cond); #endif @@ -253,24 +279,10 @@ void PltClearEvent(PLT_EVENT* event) { int PltWaitForEvent(PLT_EVENT* event) { #if defined(LC_WINDOWS) DWORD error; - PLT_THREAD* current_thread; HANDLE objects[2]; - PltLockMutex(&thread_list_lock); - current_thread = thread_head; - while (current_thread != NULL) { - if (current_thread->tid == GetCurrentThreadId()) { - break; - } - - current_thread = current_thread->next; - } - PltUnlockMutex(&thread_list_lock); - - LC_ASSERT(current_thread != NULL); - objects[0] = *event; - objects[1] = current_thread->termRequested; + objects[1] = findCurrentThread()->termRequested; error = WaitForMultipleObjectsEx(2, objects, FALSE, INFINITE, FALSE); if (error == WAIT_OBJECT_0) { return PLT_WAIT_SUCCESS; @@ -283,12 +295,20 @@ int PltWaitForEvent(PLT_EVENT* event) { return -1; } #else + int ret; + + if (event->cancelled) { + return PLT_WAIT_INTERRUPTED; + } + pthread_mutex_lock(&event->mutex); - while (!event->signalled) { + while (!event->signalled && !event->cancelled) { pthread_cond_wait(&event->cond, &event->mutex); } + ret = event->cancelled ? PLT_WAIT_INTERRUPTED : PLT_WAIT_SUCCESS; pthread_mutex_unlock(&event->mutex); - return PLT_WAIT_SUCCESS; + + return ret; #endif } @@ -312,20 +332,13 @@ int initializePlatform(void) { return err; } -#if defined(LC_WINDOWS) return PltCreateMutex(&thread_list_lock); -#else - return 0; -#endif } void cleanupPlatform(void) { cleanupPlatformSockets(); -#if defined(LC_WINDOWS) LC_ASSERT(thread_head == NULL); PltDeleteMutex(&thread_list_lock); -#else -#endif } \ No newline at end of file diff --git a/limelight-common/PlatformSockets.c b/limelight-common/PlatformSockets.c index e960e5f..2b6fc87 100644 --- a/limelight-common/PlatformSockets.c +++ b/limelight-common/PlatformSockets.c @@ -21,6 +21,19 @@ void addrToUrlSafeString(struct sockaddr_storage* addr, char* string) } } +void closeSocket(SOCKET s) { + // Calling shutdown() prior to close wakes up callers + // blocked in connect(), recv(), and friends. + shutdown(s, SHUT_RDWR); + + // Now close the socket fd +#ifdef _WIN32 + closesocket(s); +#else + close(s); +#endif +} + SOCKET bindUdpSocket(int addrfamily, int bufferSize) { SOCKET s; struct sockaddr_storage addr; @@ -45,7 +58,7 @@ SOCKET bindUdpSocket(int addrfamily, int bufferSize) { sizeof(struct sockaddr_in6)) == SOCKET_ERROR) { err = LastSocketError(); Limelog("bind() failed: %d\n", err); - closesocket(s); + closeSocket(s); SetLastSocketError(err); return INVALID_SOCKET; } @@ -86,7 +99,7 @@ SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, if (connect(s, (struct sockaddr*) &addr, addrlen) == SOCKET_ERROR) { err = LastSocketError(); Limelog("connect() failed: %d\n", err); - closesocket(s); + closeSocket(s); SetLastSocketError(err); return INVALID_SOCKET; } diff --git a/limelight-common/PlatformSockets.h b/limelight-common/PlatformSockets.h index 9912074..8157f77 100644 --- a/limelight-common/PlatformSockets.h +++ b/limelight-common/PlatformSockets.h @@ -26,7 +26,6 @@ typedef int SOCKADDR_LEN; #define SetLastSocketError(x) errno = x #define INVALID_SOCKET -1 #define SOCKET_ERROR -1 -#define closesocket(x) close(x) typedef int SOCKET; typedef ssize_t SOCK_RET; @@ -41,4 +40,5 @@ void addrToUrlSafeString(struct sockaddr_storage* addr, char* string); SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port); SOCKET bindUdpSocket(int addrfamily, int bufferSize); -int enableNoDelay(SOCKET s); \ No newline at end of file +int enableNoDelay(SOCKET s); +void closeSocket(SOCKET s); \ No newline at end of file diff --git a/limelight-common/PlatformThreads.h b/limelight-common/PlatformThreads.h index a99c3f7..1222886 100644 --- a/limelight-common/PlatformThreads.h +++ b/limelight-common/PlatformThreads.h @@ -5,11 +5,6 @@ typedef void(*ThreadEntry)(void* context); -struct thread_context { - ThreadEntry entry; - void* context; -}; - #if defined(LC_WINDOWS) typedef struct _PLT_THREAD { HANDLE handle; @@ -22,12 +17,18 @@ typedef struct _PLT_THREAD { typedef HANDLE PLT_MUTEX; typedef HANDLE PLT_EVENT; #elif defined (LC_POSIX) -typedef pthread_t PLT_THREAD; +typedef struct _PLT_THREAD { + pthread_t thread; + int cancelled; + + struct _PLT_THREAD* next; +} PLT_THREAD; typedef pthread_mutex_t PLT_MUTEX; typedef struct _PLT_EVENT { pthread_mutex_t mutex; pthread_cond_t cond; int signalled; + int cancelled; } PLT_EVENT; #else #error Unsupported platform diff --git a/limelight-common/RtspConnection.c b/limelight-common/RtspConnection.c index 85235c6..cbfa1f9 100644 --- a/limelight-common/RtspConnection.c +++ b/limelight-common/RtspConnection.c @@ -96,7 +96,7 @@ static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, in serializedMessage = serializeRtspMessage(request, &messageLen); if (serializedMessage == NULL) { - closesocket(sock); + closeSocket(sock); sock = INVALID_SOCKET; return ret; } @@ -139,7 +139,7 @@ Exit: free(serializedMessage); } - closesocket(sock); + closeSocket(sock); sock = INVALID_SOCKET; return ret; } @@ -147,7 +147,7 @@ Exit: // Terminate the RTSP Handshake process by closing the socket void terminateRtspHandshake(void) { if (sock != INVALID_SOCKET) { - closesocket(sock); + closeSocket(sock); sock = INVALID_SOCKET; } } diff --git a/limelight-common/VideoStream.c b/limelight-common/VideoStream.c index 226366f..f52f3df 100644 --- a/limelight-common/VideoStream.c +++ b/limelight-common/VideoStream.c @@ -142,7 +142,7 @@ int readFirstFrame(void) { // All that matters is that we close this socket. // This starts the flow of video on Gen 3 servers. - closesocket(firstFrameSocket); + closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; return 0; @@ -157,11 +157,11 @@ void stopVideoStream(void) { } if (firstFrameSocket != INVALID_SOCKET) { - closesocket(firstFrameSocket); + closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; } if (rtpSocket != INVALID_SOCKET) { - closesocket(rtpSocket); + closeSocket(rtpSocket); rtpSocket = INVALID_SOCKET; }