Begin work to remove use of thread cancellation for NaCl

This commit is contained in:
Cameron Gutman 2016-02-15 15:58:42 -05:00
parent b2be459ff3
commit d7f40ec39b
9 changed files with 103 additions and 76 deletions

View File

@ -245,7 +245,7 @@ void stopAudioStream(void) {
}
if (rtpSocket != INVALID_SOCKET) {
closesocket(rtpSocket);
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}

View File

@ -384,7 +384,7 @@ int stopControlStream(void) {
PltInterruptThread(&invalidateRefFramesThread);
if (ctlSock != INVALID_SOCKET) {
closesocket(ctlSock);
closeSocket(ctlSock);
ctlSock = INVALID_SOCKET;
}

View File

@ -292,7 +292,7 @@ int stopInputStream(void) {
PltInterruptThread(&inputSendThread);
if (inputSock != INVALID_SOCKET) {
closesocket(inputSock);
closeSocket(inputSock);
inputSock = INVALID_SOCKET;
}

View File

@ -4,6 +4,13 @@
int initializePlatformSockets(void);
void cleanupPlatformSockets(void);
struct thread_context {
ThreadEntry entry;
void* context;
PLT_THREAD* thread;
};
#if defined(LC_WINDOWS)
void LimelogWindows(char* Format, ...) {
va_list va;
@ -17,30 +24,33 @@ void LimelogWindows(char* Format, ...) {
}
#endif
#if defined(LC_WINDOWS)
PLT_MUTEX thread_list_lock;
PLT_THREAD* thread_head;
#if defined(LC_WINDOWS)
DWORD WINAPI ThreadProc(LPVOID lpParameter) {
struct thread_context* ctx = (struct thread_context*)lpParameter;
ctx->entry(ctx->context);
free(ctx);
return 0;
}
#else
void* ThreadProc(void* context) {
struct thread_context* ctx = (struct thread_context*)context;
#endif
// Add this thread to the thread list
PltLockMutex(&thread_list_lock);
ctx->thread->next = thread_head;
thread_head = ctx->thread;
PltUnlockMutex(&thread_list_lock);
ctx->entry(ctx->context);
free(ctx);
#if defined(LC_WINDOWS)
return 0;
#else
return NULL;
}
#endif
}
void PltSleepMs(int ms) {
#if defined(LC_WINDOWS)
@ -51,6 +61,31 @@ void PltSleepMs(int ms) {
#endif
}
#if defined(LC_WINDOWS)
static PLT_THREAD* findCurrentThread(void) {
PLT_THREAD* current_thread;
PltLockMutex(&thread_list_lock);
current_thread = thread_head;
while (current_thread != NULL) {
#if defined(LC_WINDOWS)
if (current_thread->tid == GetCurrentThreadId()) {
#else
if (pthread_equal(current_thread->thread, pthread_self())) {
#endif
break;
}
current_thread = current_thread->next;
}
PltUnlockMutex(&thread_list_lock);
LC_ASSERT(current_thread != NULL);
return current_thread;
}
#endif
int PltCreateMutex(PLT_MUTEX* mutex) {
#if defined(LC_WINDOWS)
*mutex = CreateMutexEx(NULL, NULL, 0, MUTEX_ALL_ACCESS);
@ -92,15 +127,15 @@ void PltUnlockMutex(PLT_MUTEX* mutex) {
}
void PltJoinThread(PLT_THREAD* thread) {
LC_ASSERT(thread->cancelled != 0);
#if defined(LC_WINDOWS)
WaitForSingleObjectEx(thread->handle, INFINITE, FALSE);
#else
pthread_join(*thread, NULL);
pthread_join(thread->thread, NULL);
#endif
}
void PltCloseThread(PLT_THREAD* thread) {
#if defined(LC_WINDOWS)
PLT_THREAD* current_thread;
PltLockMutex(&thread_list_lock);
@ -130,28 +165,20 @@ void PltCloseThread(PLT_THREAD* thread) {
PltUnlockMutex(&thread_list_lock);
#if defined(LC_WINDOWS)
CloseHandle(thread->termRequested);
CloseHandle(thread->handle);
#else
#endif
}
int PltIsThreadInterrupted(PLT_THREAD* thread) {
#if defined(LC_WINDOWS)
return thread->cancelled;
#else
// The thread will die here if a cancellation was requested
pthread_testcancel();
return 0;
#endif
}
void PltInterruptThread(PLT_THREAD* thread) {
#if defined(LC_WINDOWS)
thread->cancelled = 1;
#if defined(LC_WINDOWS)
SetEvent(thread->termRequested);
#else
pthread_cancel(*thread);
#endif
}
@ -166,6 +193,7 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD* thread) {
ctx->entry = entry;
ctx->context = context;
ctx->thread = thread;
#if defined(LC_WINDOWS)
{
@ -177,28 +205,19 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD* thread) {
thread->cancelled = 0;
thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, CREATE_SUSPENDED, &thread->tid);
thread->handle = CreateThread(NULL, 0, ThreadProc, ctx, 0, &thread->tid);
if (thread->handle == NULL) {
CloseHandle(thread->termRequested);
free(ctx);
return -1;
}
else {
// Add this thread to the thread list
PltLockMutex(&thread_list_lock);
thread->next = thread_head;
thread_head = thread;
PltUnlockMutex(&thread_list_lock);
// Now the thread can run
ResumeThread(thread->handle);
err = 0;
}
}
#else
{
err = pthread_create(thread, NULL, ThreadProc, ctx);
err = pthread_create(&thread->thread, NULL, ThreadProc, ctx);
if (err != 0) {
free(ctx);
}
@ -220,6 +239,7 @@ int PltCreateEvent(PLT_EVENT* event) {
pthread_mutex_init(&event->mutex, NULL);
pthread_cond_init(&event->cond, NULL);
event->signalled = 0;
event->cancelled = 0;
return 0;
#endif
}
@ -228,6 +248,12 @@ void PltCloseEvent(PLT_EVENT* event) {
#if defined(LC_WINDOWS)
CloseHandle(*event);
#else
// Wake up anyone going to wait on this event.
// There's an inherent race condition in this technique
// but it shouldn't be too problematic for our purposes.
event->cancelled = 1;
pthread_cond_broadcast(&event->cond);
pthread_mutex_destroy(&event->mutex);
pthread_cond_destroy(&event->cond);
#endif
@ -253,24 +279,10 @@ void PltClearEvent(PLT_EVENT* event) {
int PltWaitForEvent(PLT_EVENT* event) {
#if defined(LC_WINDOWS)
DWORD error;
PLT_THREAD* current_thread;
HANDLE objects[2];
PltLockMutex(&thread_list_lock);
current_thread = thread_head;
while (current_thread != NULL) {
if (current_thread->tid == GetCurrentThreadId()) {
break;
}
current_thread = current_thread->next;
}
PltUnlockMutex(&thread_list_lock);
LC_ASSERT(current_thread != NULL);
objects[0] = *event;
objects[1] = current_thread->termRequested;
objects[1] = findCurrentThread()->termRequested;
error = WaitForMultipleObjectsEx(2, objects, FALSE, INFINITE, FALSE);
if (error == WAIT_OBJECT_0) {
return PLT_WAIT_SUCCESS;
@ -283,12 +295,20 @@ int PltWaitForEvent(PLT_EVENT* event) {
return -1;
}
#else
int ret;
if (event->cancelled) {
return PLT_WAIT_INTERRUPTED;
}
pthread_mutex_lock(&event->mutex);
while (!event->signalled) {
while (!event->signalled && !event->cancelled) {
pthread_cond_wait(&event->cond, &event->mutex);
}
ret = event->cancelled ? PLT_WAIT_INTERRUPTED : PLT_WAIT_SUCCESS;
pthread_mutex_unlock(&event->mutex);
return PLT_WAIT_SUCCESS;
return ret;
#endif
}
@ -312,20 +332,13 @@ int initializePlatform(void) {
return err;
}
#if defined(LC_WINDOWS)
return PltCreateMutex(&thread_list_lock);
#else
return 0;
#endif
}
void cleanupPlatform(void) {
cleanupPlatformSockets();
#if defined(LC_WINDOWS)
LC_ASSERT(thread_head == NULL);
PltDeleteMutex(&thread_list_lock);
#else
#endif
}

View File

@ -21,6 +21,19 @@ void addrToUrlSafeString(struct sockaddr_storage* addr, char* string)
}
}
void closeSocket(SOCKET s) {
// Calling shutdown() prior to close wakes up callers
// blocked in connect(), recv(), and friends.
shutdown(s, SHUT_RDWR);
// Now close the socket fd
#ifdef _WIN32
closesocket(s);
#else
close(s);
#endif
}
SOCKET bindUdpSocket(int addrfamily, int bufferSize) {
SOCKET s;
struct sockaddr_storage addr;
@ -45,7 +58,7 @@ SOCKET bindUdpSocket(int addrfamily, int bufferSize) {
sizeof(struct sockaddr_in6)) == SOCKET_ERROR) {
err = LastSocketError();
Limelog("bind() failed: %d\n", err);
closesocket(s);
closeSocket(s);
SetLastSocketError(err);
return INVALID_SOCKET;
}
@ -86,7 +99,7 @@ SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen,
if (connect(s, (struct sockaddr*) &addr, addrlen) == SOCKET_ERROR) {
err = LastSocketError();
Limelog("connect() failed: %d\n", err);
closesocket(s);
closeSocket(s);
SetLastSocketError(err);
return INVALID_SOCKET;
}

View File

@ -26,7 +26,6 @@ typedef int SOCKADDR_LEN;
#define SetLastSocketError(x) errno = x
#define INVALID_SOCKET -1
#define SOCKET_ERROR -1
#define closesocket(x) close(x)
typedef int SOCKET;
typedef ssize_t SOCK_RET;
@ -42,3 +41,4 @@ void addrToUrlSafeString(struct sockaddr_storage* addr, char* string);
SOCKET connectTcpSocket(struct sockaddr_storage* dstaddr, SOCKADDR_LEN addrlen, unsigned short port);
SOCKET bindUdpSocket(int addrfamily, int bufferSize);
int enableNoDelay(SOCKET s);
void closeSocket(SOCKET s);

View File

@ -5,11 +5,6 @@
typedef void(*ThreadEntry)(void* context);
struct thread_context {
ThreadEntry entry;
void* context;
};
#if defined(LC_WINDOWS)
typedef struct _PLT_THREAD {
HANDLE handle;
@ -22,12 +17,18 @@ typedef struct _PLT_THREAD {
typedef HANDLE PLT_MUTEX;
typedef HANDLE PLT_EVENT;
#elif defined (LC_POSIX)
typedef pthread_t PLT_THREAD;
typedef struct _PLT_THREAD {
pthread_t thread;
int cancelled;
struct _PLT_THREAD* next;
} PLT_THREAD;
typedef pthread_mutex_t PLT_MUTEX;
typedef struct _PLT_EVENT {
pthread_mutex_t mutex;
pthread_cond_t cond;
int signalled;
int cancelled;
} PLT_EVENT;
#else
#error Unsupported platform

View File

@ -96,7 +96,7 @@ static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, in
serializedMessage = serializeRtspMessage(request, &messageLen);
if (serializedMessage == NULL) {
closesocket(sock);
closeSocket(sock);
sock = INVALID_SOCKET;
return ret;
}
@ -139,7 +139,7 @@ Exit:
free(serializedMessage);
}
closesocket(sock);
closeSocket(sock);
sock = INVALID_SOCKET;
return ret;
}
@ -147,7 +147,7 @@ Exit:
// Terminate the RTSP Handshake process by closing the socket
void terminateRtspHandshake(void) {
if (sock != INVALID_SOCKET) {
closesocket(sock);
closeSocket(sock);
sock = INVALID_SOCKET;
}
}

View File

@ -142,7 +142,7 @@ int readFirstFrame(void) {
// All that matters is that we close this socket.
// This starts the flow of video on Gen 3 servers.
closesocket(firstFrameSocket);
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
return 0;
@ -157,11 +157,11 @@ void stopVideoStream(void) {
}
if (firstFrameSocket != INVALID_SOCKET) {
closesocket(firstFrameSocket);
closeSocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
if (rtpSocket != INVALID_SOCKET) {
closesocket(rtpSocket);
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}