From 3740fd295c347b116dbee8216701386570bbe69a Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 20 Jan 2014 20:33:43 -0500 Subject: [PATCH] Fix thread termination issues --- limelight-common/ControlStream.cpp | 45 +++++++----------------- limelight-common/PlatformThreads.cpp | 31 ++++++++++++----- limelight-common/PlatformThreads.h | 8 +++-- limelight-common/VideoStream.cpp | 52 ++++++++-------------------- 4 files changed, 55 insertions(+), 81 deletions(-) diff --git a/limelight-common/ControlStream.cpp b/limelight-common/ControlStream.cpp index 6b3bf12..f88982e 100644 --- a/limelight-common/ControlStream.cpp +++ b/limelight-common/ControlStream.cpp @@ -93,12 +93,12 @@ static void heartbeatThreadFunc(void* context) { int err; NVCTL_PACKET_HEADER header; - for (;;) { + while (!PltIsThreadInterrupted(&heartbeatThread)) { header.type = PTYPE_HEARTBEAT; header.payloadLength = PPAYLEN_HEARTBEAT; err = send(ctlSock, (char*) &header, sizeof(header), 0); if (err != sizeof(header)) { - Limelog("Heartbeat thread terminating\n"); + Limelog("Heartbeat thread terminating #1\n"); return; } @@ -113,7 +113,7 @@ static void jitterThreadFunc(void* context) { header.type = PTYPE_JITTER; header.payloadLength = PPAYLEN_JITTER; - for (;;) { + while (!PltIsThreadInterrupted(&jitterThread)) { err = send(ctlSock, (char*) &header, sizeof(header), 0); if (err != sizeof(header)) { Limelog("Jitter thread terminating #1\n"); @@ -166,43 +166,22 @@ static void resyncThreadFunc(void* context) { } int stopControlStream(void) { - if (heartbeatThread != NULL) { - PltInterruptThread(&heartbeatThread); - } - if (jitterThread != NULL) { - PltInterruptThread(&jitterThread); - } - if (resyncThread != NULL) { - PltInterruptThread(&resyncThread); - } + PltInterruptThread(&heartbeatThread); + PltInterruptThread(&jitterThread); + PltInterruptThread(&resyncThread); if (ctlSock != INVALID_SOCKET) { closesocket(ctlSock); ctlSock = INVALID_SOCKET; } - if (heartbeatThread != NULL) { - PltJoinThread(&heartbeatThread); - } - if (jitterThread != NULL) { - PltJoinThread(&jitterThread); - } - if (resyncThread != NULL) { - PltJoinThread(&resyncThread) ; - } + PltJoinThread(&heartbeatThread); + PltJoinThread(&jitterThread); + PltJoinThread(&resyncThread); - if (heartbeatThread != NULL) { - PltCloseThread(&heartbeatThread); - heartbeatThread = NULL; - } - if (jitterThread != NULL) { - PltCloseThread(&jitterThread); - jitterThread = NULL; - } - if (resyncThread != NULL) { - PltCloseThread(&resyncThread); - resyncThread = NULL; - } + PltCloseThread(&heartbeatThread); + PltCloseThread(&jitterThread); + PltCloseThread(&resyncThread); return 0; } diff --git a/limelight-common/PlatformThreads.cpp b/limelight-common/PlatformThreads.cpp index 351c67b..7e2de08 100644 --- a/limelight-common/PlatformThreads.cpp +++ b/limelight-common/PlatformThreads.cpp @@ -6,6 +6,12 @@ struct thread_context { void* context; }; +#if defined(LC_WINDOWS) +VOID WINAPI ApcFunc(ULONG_PTR parameter) { + return; +} +#endif + #ifdef LC_WINDOWS DWORD WINAPI ThreadProc(LPVOID lpParameter) { struct thread_context *ctx = (struct thread_context *)lpParameter; @@ -75,7 +81,7 @@ void PltUnlockMutex(PLT_MUTEX *mutex) { void PltJoinThread(PLT_THREAD *thread) { #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) - WaitForSingleObjectEx(*thread, INFINITE, FALSE); + WaitForSingleObjectEx(thread->handle, INFINITE, FALSE); #else pthread_join(*thread, NULL); #endif @@ -83,14 +89,23 @@ void PltJoinThread(PLT_THREAD *thread) { void PltCloseThread(PLT_THREAD *thread) { #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) - CloseHandle(*thread); + CloseHandle(thread->handle); +#else +#endif +} + +int PltIsThreadInterrupted(PLT_THREAD *thread) { +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + return thread->cancelled; #else #endif } void PltInterruptThread(PLT_THREAD *thread) { -#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) - CloseHandle(*thread); +#if defined(LC_WINDOWS) + thread->cancelled = 1; + QueueUserAPC(ApcFunc, thread->handle, 0); +#elif defined(LC_WINDOWS_PHONE) #else pthread_cancel(*thread); #endif @@ -110,13 +125,13 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) { #ifdef _WIN32 { - HANDLE hThread = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL); - if (hThread == NULL) { + thread->cancelled = 0; + thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL); + if (thread->handle == NULL) { free(ctx); return -1; } else { - CloseHandle(hThread); err = 0; } } @@ -176,7 +191,7 @@ void PltClearEvent(PLT_EVENT *event) { int PltWaitForEvent(PLT_EVENT *event) { #ifdef _WIN32 - DWORD error = WaitForSingleObjectEx(*event, INFINITE, FALSE); + DWORD error = WaitForSingleObjectEx(*event, INFINITE, TRUE); if (error == STATUS_WAIT_0) { return PLT_WAIT_SUCCESS; } diff --git a/limelight-common/PlatformThreads.h b/limelight-common/PlatformThreads.h index 0d3461e..d700d56 100644 --- a/limelight-common/PlatformThreads.h +++ b/limelight-common/PlatformThreads.h @@ -5,7 +5,10 @@ typedef void (*ThreadEntry)(void *context); #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) -typedef HANDLE PLT_THREAD; +typedef struct _PLT_THREAD { + HANDLE handle; + int cancelled; +} PLT_THREAD; typedef HANDLE PLT_MUTEX; typedef HANDLE PLT_EVENT; #elif defined (LC_POSIX) @@ -28,6 +31,7 @@ void PltUnlockMutex(PLT_MUTEX *mutex); int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread); void PltCloseThread(PLT_THREAD *thread); void PltInterruptThread(PLT_THREAD *thread); +int PltIsThreadInterrupted(PLT_THREAD *thread); void PltJoinThread(PLT_THREAD *thread); int PltCreateEvent(PLT_EVENT *event); @@ -39,4 +43,4 @@ int PltWaitForEvent(PLT_EVENT *event); #define PLT_WAIT_SUCCESS 0 #define PLT_WAIT_INTERRUPTED 1 -void PltSleepMs(int ms); \ No newline at end of file +int PltSleepMs(int ms); \ No newline at end of file diff --git a/limelight-common/VideoStream.cpp b/limelight-common/VideoStream.cpp index 731ac19..fc0b367 100644 --- a/limelight-common/VideoStream.cpp +++ b/limelight-common/VideoStream.cpp @@ -52,10 +52,10 @@ static void UdpPingThreadProc(void *context) { saddr.sin_port = htons(47998); memcpy(&saddr.sin_addr, &remoteHost, sizeof(remoteHost)); - for (;;) { + while (!PltIsThreadInterrupted(&udpPingThread)) { err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr)); if (err != sizeof(pingData)) { - Limelog("UDP ping thread terminating\n"); + Limelog("UDP ping thread terminating #1\n"); return; } @@ -157,18 +157,10 @@ int readFirstFrame(void) { } void stopVideoStream(void) { - if (udpPingThread != NULL) { - PltInterruptThread(&udpPingThread); - } - if (receiveThread != NULL) { - PltInterruptThread(&receiveThread); - } - if (depacketizerThread != NULL) { - PltInterruptThread(&depacketizerThread); - } - if (decoderThread != NULL) { - PltInterruptThread(&decoderThread); - } + PltInterruptThread(&udpPingThread); + PltInterruptThread(&receiveThread); + PltInterruptThread(&depacketizerThread); + PltInterruptThread(&decoderThread); if (firstFrameSocket != INVALID_SOCKET) { closesocket(firstFrameSocket); @@ -179,31 +171,15 @@ void stopVideoStream(void) { rtpSocket = INVALID_SOCKET; } - if (udpPingThread != NULL) { - PltJoinThread(&udpPingThread); - } - if (receiveThread != NULL) { - PltJoinThread(&receiveThread); - } - if (depacketizerThread != NULL) { - PltJoinThread(&depacketizerThread); - } - if (decoderThread != NULL) { - PltJoinThread(&decoderThread); - } + PltJoinThread(&udpPingThread); + PltJoinThread(&receiveThread); + PltJoinThread(&depacketizerThread); + PltJoinThread(&decoderThread); - if (udpPingThread != NULL) { - PltCloseThread(&udpPingThread); - } - if (receiveThread != NULL) { - PltCloseThread(&receiveThread); - } - if (depacketizerThread != NULL) { - PltCloseThread(&depacketizerThread); - } - if (decoderThread != NULL) { - PltCloseThread(&decoderThread); - } + PltCloseThread(&udpPingThread); + PltCloseThread(&receiveThread); + PltCloseThread(&depacketizerThread); + PltCloseThread(&decoderThread); } int startVideoStream(void* rendererContext, int drFlags) {