Fix thread termination issues

This commit is contained in:
Cameron Gutman 2014-01-20 20:33:43 -05:00
parent 2f83dba24c
commit 3740fd295c
4 changed files with 55 additions and 81 deletions

View File

@ -93,12 +93,12 @@ static void heartbeatThreadFunc(void* context) {
int err; int err;
NVCTL_PACKET_HEADER header; NVCTL_PACKET_HEADER header;
for (;;) { while (!PltIsThreadInterrupted(&heartbeatThread)) {
header.type = PTYPE_HEARTBEAT; header.type = PTYPE_HEARTBEAT;
header.payloadLength = PPAYLEN_HEARTBEAT; header.payloadLength = PPAYLEN_HEARTBEAT;
err = send(ctlSock, (char*) &header, sizeof(header), 0); err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) { if (err != sizeof(header)) {
Limelog("Heartbeat thread terminating\n"); Limelog("Heartbeat thread terminating #1\n");
return; return;
} }
@ -113,7 +113,7 @@ static void jitterThreadFunc(void* context) {
header.type = PTYPE_JITTER; header.type = PTYPE_JITTER;
header.payloadLength = PPAYLEN_JITTER; header.payloadLength = PPAYLEN_JITTER;
for (;;) { while (!PltIsThreadInterrupted(&jitterThread)) {
err = send(ctlSock, (char*) &header, sizeof(header), 0); err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) { if (err != sizeof(header)) {
Limelog("Jitter thread terminating #1\n"); Limelog("Jitter thread terminating #1\n");
@ -166,43 +166,22 @@ static void resyncThreadFunc(void* context) {
} }
int stopControlStream(void) { int stopControlStream(void) {
if (heartbeatThread != NULL) {
PltInterruptThread(&heartbeatThread); PltInterruptThread(&heartbeatThread);
}
if (jitterThread != NULL) {
PltInterruptThread(&jitterThread); PltInterruptThread(&jitterThread);
}
if (resyncThread != NULL) {
PltInterruptThread(&resyncThread); PltInterruptThread(&resyncThread);
}
if (ctlSock != INVALID_SOCKET) { if (ctlSock != INVALID_SOCKET) {
closesocket(ctlSock); closesocket(ctlSock);
ctlSock = INVALID_SOCKET; ctlSock = INVALID_SOCKET;
} }
if (heartbeatThread != NULL) {
PltJoinThread(&heartbeatThread); PltJoinThread(&heartbeatThread);
}
if (jitterThread != NULL) {
PltJoinThread(&jitterThread); PltJoinThread(&jitterThread);
}
if (resyncThread != NULL) {
PltJoinThread(&resyncThread); PltJoinThread(&resyncThread);
}
if (heartbeatThread != NULL) {
PltCloseThread(&heartbeatThread); PltCloseThread(&heartbeatThread);
heartbeatThread = NULL;
}
if (jitterThread != NULL) {
PltCloseThread(&jitterThread); PltCloseThread(&jitterThread);
jitterThread = NULL;
}
if (resyncThread != NULL) {
PltCloseThread(&resyncThread); PltCloseThread(&resyncThread);
resyncThread = NULL;
}
return 0; return 0;
} }

View File

@ -6,6 +6,12 @@ struct thread_context {
void* context; void* context;
}; };
#if defined(LC_WINDOWS)
VOID WINAPI ApcFunc(ULONG_PTR parameter) {
return;
}
#endif
#ifdef LC_WINDOWS #ifdef LC_WINDOWS
DWORD WINAPI ThreadProc(LPVOID lpParameter) { DWORD WINAPI ThreadProc(LPVOID lpParameter) {
struct thread_context *ctx = (struct thread_context *)lpParameter; struct thread_context *ctx = (struct thread_context *)lpParameter;
@ -75,7 +81,7 @@ void PltUnlockMutex(PLT_MUTEX *mutex) {
void PltJoinThread(PLT_THREAD *thread) { void PltJoinThread(PLT_THREAD *thread) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
WaitForSingleObjectEx(*thread, INFINITE, FALSE); WaitForSingleObjectEx(thread->handle, INFINITE, FALSE);
#else #else
pthread_join(*thread, NULL); pthread_join(*thread, NULL);
#endif #endif
@ -83,14 +89,23 @@ void PltJoinThread(PLT_THREAD *thread) {
void PltCloseThread(PLT_THREAD *thread) { void PltCloseThread(PLT_THREAD *thread) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
CloseHandle(*thread); CloseHandle(thread->handle);
#else
#endif
}
int PltIsThreadInterrupted(PLT_THREAD *thread) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
return thread->cancelled;
#else #else
#endif #endif
} }
void PltInterruptThread(PLT_THREAD *thread) { void PltInterruptThread(PLT_THREAD *thread) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) #if defined(LC_WINDOWS)
CloseHandle(*thread); thread->cancelled = 1;
QueueUserAPC(ApcFunc, thread->handle, 0);
#elif defined(LC_WINDOWS_PHONE)
#else #else
pthread_cancel(*thread); pthread_cancel(*thread);
#endif #endif
@ -110,13 +125,13 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) {
#ifdef _WIN32 #ifdef _WIN32
{ {
HANDLE hThread = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL); thread->cancelled = 0;
if (hThread == NULL) { thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL);
if (thread->handle == NULL) {
free(ctx); free(ctx);
return -1; return -1;
} }
else { else {
CloseHandle(hThread);
err = 0; err = 0;
} }
} }
@ -176,7 +191,7 @@ void PltClearEvent(PLT_EVENT *event) {
int PltWaitForEvent(PLT_EVENT *event) { int PltWaitForEvent(PLT_EVENT *event) {
#ifdef _WIN32 #ifdef _WIN32
DWORD error = WaitForSingleObjectEx(*event, INFINITE, FALSE); DWORD error = WaitForSingleObjectEx(*event, INFINITE, TRUE);
if (error == STATUS_WAIT_0) { if (error == STATUS_WAIT_0) {
return PLT_WAIT_SUCCESS; return PLT_WAIT_SUCCESS;
} }

View File

@ -5,7 +5,10 @@
typedef void (*ThreadEntry)(void *context); typedef void (*ThreadEntry)(void *context);
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
typedef HANDLE PLT_THREAD; typedef struct _PLT_THREAD {
HANDLE handle;
int cancelled;
} PLT_THREAD;
typedef HANDLE PLT_MUTEX; typedef HANDLE PLT_MUTEX;
typedef HANDLE PLT_EVENT; typedef HANDLE PLT_EVENT;
#elif defined (LC_POSIX) #elif defined (LC_POSIX)
@ -28,6 +31,7 @@ void PltUnlockMutex(PLT_MUTEX *mutex);
int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread); int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread);
void PltCloseThread(PLT_THREAD *thread); void PltCloseThread(PLT_THREAD *thread);
void PltInterruptThread(PLT_THREAD *thread); void PltInterruptThread(PLT_THREAD *thread);
int PltIsThreadInterrupted(PLT_THREAD *thread);
void PltJoinThread(PLT_THREAD *thread); void PltJoinThread(PLT_THREAD *thread);
int PltCreateEvent(PLT_EVENT *event); int PltCreateEvent(PLT_EVENT *event);
@ -39,4 +43,4 @@ int PltWaitForEvent(PLT_EVENT *event);
#define PLT_WAIT_SUCCESS 0 #define PLT_WAIT_SUCCESS 0
#define PLT_WAIT_INTERRUPTED 1 #define PLT_WAIT_INTERRUPTED 1
void PltSleepMs(int ms); int PltSleepMs(int ms);

View File

@ -52,10 +52,10 @@ static void UdpPingThreadProc(void *context) {
saddr.sin_port = htons(47998); saddr.sin_port = htons(47998);
memcpy(&saddr.sin_addr, &remoteHost, sizeof(remoteHost)); memcpy(&saddr.sin_addr, &remoteHost, sizeof(remoteHost));
for (;;) { while (!PltIsThreadInterrupted(&udpPingThread)) {
err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr)); err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr));
if (err != sizeof(pingData)) { if (err != sizeof(pingData)) {
Limelog("UDP ping thread terminating\n"); Limelog("UDP ping thread terminating #1\n");
return; return;
} }
@ -157,18 +157,10 @@ int readFirstFrame(void) {
} }
void stopVideoStream(void) { void stopVideoStream(void) {
if (udpPingThread != NULL) {
PltInterruptThread(&udpPingThread); PltInterruptThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltInterruptThread(&receiveThread); PltInterruptThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltInterruptThread(&depacketizerThread); PltInterruptThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltInterruptThread(&decoderThread); PltInterruptThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) { if (firstFrameSocket != INVALID_SOCKET) {
closesocket(firstFrameSocket); closesocket(firstFrameSocket);
@ -179,32 +171,16 @@ void stopVideoStream(void) {
rtpSocket = INVALID_SOCKET; rtpSocket = INVALID_SOCKET;
} }
if (udpPingThread != NULL) {
PltJoinThread(&udpPingThread); PltJoinThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltJoinThread(&receiveThread); PltJoinThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltJoinThread(&depacketizerThread); PltJoinThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltJoinThread(&decoderThread); PltJoinThread(&decoderThread);
}
if (udpPingThread != NULL) {
PltCloseThread(&udpPingThread); PltCloseThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltCloseThread(&receiveThread); PltCloseThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltCloseThread(&depacketizerThread); PltCloseThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltCloseThread(&decoderThread); PltCloseThread(&decoderThread);
} }
}
int startVideoStream(void* rendererContext, int drFlags) { int startVideoStream(void* rendererContext, int drFlags) {
int err; int err;