From d6c77b0323bb0c8c294d632d3931b7f4b5fc7547 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 20 Jan 2014 19:11:25 -0500 Subject: [PATCH] Cleanup a bunch of the code and the interface itself --- limelight-common/Config.cpp | 2 +- limelight-common/Connection.cpp | 111 ++++++++++++++++++ limelight-common/ControlStream.cpp | 67 ++++++++--- limelight-common/Handshake.cpp | 2 +- limelight-common/Limelight-internal.h | 32 +++++ limelight-common/Limelight.h | 48 +++----- limelight-common/LinkedBlockingQueue.cpp | 39 ++++-- limelight-common/LinkedBlockingQueue.h | 13 +- limelight-common/Platform.h | 19 +++ limelight-common/PlatformSockets.cpp | 18 ++- limelight-common/PlatformSockets.h | 8 +- limelight-common/PlatformThreads.cpp | 62 +++++----- limelight-common/PlatformThreads.h | 13 +- limelight-common/Video.h | 13 +- limelight-common/VideoDepacketizer.cpp | 30 ++++- limelight-common/VideoStream.cpp | 108 +++++++++++++++-- limelight-common/limelight-common.vcxproj | 2 + .../limelight-common.vcxproj.filters | 12 +- 18 files changed, 473 insertions(+), 126 deletions(-) create mode 100644 limelight-common/Connection.cpp create mode 100644 limelight-common/Limelight-internal.h diff --git a/limelight-common/Config.cpp b/limelight-common/Config.cpp index da69e2a..1e41b37 100644 --- a/limelight-common/Config.cpp +++ b/limelight-common/Config.cpp @@ -1,4 +1,4 @@ -#include "Limelight.h" +#include "Limelight-internal.h" #include "ByteBuffer.h" diff --git a/limelight-common/Connection.cpp b/limelight-common/Connection.cpp new file mode 100644 index 0000000..b1a4f72 --- /dev/null +++ b/limelight-common/Connection.cpp @@ -0,0 +1,111 @@ +#include "Limelight-internal.h" +#include "Platform.h" + +#define STAGE_NONE 0 +#define STAGE_PLATFORM_INIT 1 +#define STAGE_HANDSHAKE 2 +#define STAGE_CONTROL_STREAM_INIT 3 +#define STAGE_VIDEO_STREAM_INIT 4 +#define STAGE_CONTROL_STREAM_START 5 +#define STAGE_VIDEO_STREAM_START 6 + +int stage = STAGE_NONE; + +void LiStopConnection(void) { + if (stage == STAGE_VIDEO_STREAM_START) { + Limelog("Stopping video stream..."); + stopVideoStream(); + stage--; + Limelog("done\n"); + } + if (stage == STAGE_CONTROL_STREAM_START) { + Limelog("Stopping control stream..."); + stopControlStream(); + stage--; + Limelog("done\n"); + } + if (stage == STAGE_VIDEO_STREAM_INIT) { + Limelog("Cleaning up video stream..."); + destroyVideoStream(); + stage--; + Limelog("done\n"); + } + if (stage == STAGE_CONTROL_STREAM_INIT) { + Limelog("Cleaning up control stream..."); + destroyControlStream(); + stage--; + Limelog("done\n"); + } + if (stage == STAGE_HANDSHAKE) { + stage--; + } + if (stage == STAGE_PLATFORM_INIT) { + Limelog("Cleaning up platform..."); + cleanupPlatformSockets(); + stage--; + } + LC_ASSERT(stage == STAGE_NONE); +} + +int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks) { + int err; + + Limelog("Initializing platform..."); + err = initializePlatformSockets(); + if (err != 0) { + Limelog("failed: %d\n", err); + goto Cleanup; + } + stage++; + LC_ASSERT(stage == STAGE_PLATFORM_INIT); + Limelog("done\n"); + + Limelog("Starting handshake..."); + err = performHandshake(host); + if (err != 0) { + Limelog("failed: %d\n", err); + goto Cleanup; + } + stage++; + LC_ASSERT(stage == STAGE_HANDSHAKE); + Limelog("done\n"); + + Limelog("Initializing video stream..."); + initializeVideoStream(host, streamConfig, drCallbacks); + stage++; + LC_ASSERT(stage == STAGE_VIDEO_STREAM_INIT); + Limelog("done\n"); + + Limelog("Initializing control stream..."); + err = initializeControlStream(host, streamConfig); + if (err != 0) { + Limelog("failed: %d\n", err); + goto Cleanup; + } + stage++; + LC_ASSERT(stage == STAGE_CONTROL_STREAM_INIT); + Limelog("done\n"); + + Limelog("Starting control stream..."); + err = startControlStream(); + if (err != 0) { + Limelog("failed: %d\n", err); + goto Cleanup; + } + stage++; + LC_ASSERT(stage == STAGE_CONTROL_STREAM_START); + Limelog("done\n"); + + Limelog("Starting video stream..."); + err = startVideoStream(NULL, 0); + if (err != 0) { + Limelog("Video stream start failed: %d\n", err); + goto Cleanup; + } + stage++; + LC_ASSERT(stage == STAGE_VIDEO_STREAM_START); + Limelog("done\n"); + +Cleanup: + return err; +} \ No newline at end of file diff --git a/limelight-common/ControlStream.cpp b/limelight-common/ControlStream.cpp index 7b8685d..1c1e84b 100644 --- a/limelight-common/ControlStream.cpp +++ b/limelight-common/ControlStream.cpp @@ -1,4 +1,4 @@ -#include "Limelight.h" +#include "Limelight-internal.h" #include "PlatformSockets.h" #include "PlatformThreads.h" @@ -7,6 +7,7 @@ typedef struct _NVCTL_PACKET_HEADER { unsigned short payloadLength; } NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER; +IP_ADDRESS host; SOCKET ctlSock; STREAM_CONFIGURATION streamConfig; PLT_THREAD heartbeatThread; @@ -29,21 +30,20 @@ const short PPAYLEN_RESYNC = 16; const short PTYPE_JITTER = 0x140c; const short PPAYLEN_JITTER = 0x10; -int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfigPtr) { - ctlSock = connectTcpSocket(host, 47995); - if (ctlSock == INVALID_SOCKET) { - return LastSocketError(); - } - - enableNoDelay(ctlSock); - +int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr) { memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr)); PltCreateEvent(&resyncEvent); + host = addr; + return 0; } +void destroyControlStream(void) { + PltCloseEvent(&resyncEvent); +} + void requestIdrFrame(void) { PltSetEvent(&resyncEvent); } @@ -166,15 +166,43 @@ static void resyncThreadFunc(void* context) { } int stopControlStream(void) { - closesocket(ctlSock); + if (heartbeatThread != NULL) { + PltInterruptThread(&heartbeatThread); + } + if (jitterThread != NULL) { + PltInterruptThread(&jitterThread); + } + if (resyncThread != NULL) { + PltInterruptThread(&resyncThread); + } - PltJoinThread(&heartbeatThread); - PltJoinThread(&jitterThread); - PltJoinThread(&resyncThread); + if (ctlSock != INVALID_SOCKET) { + closesocket(ctlSock); + ctlSock = INVALID_SOCKET; + } - PltCloseThread(&heartbeatThread); - PltCloseThread(&jitterThread); - PltCloseThread(&resyncThread); + if (heartbeatThread != NULL) { + PltJoinThread(&heartbeatThread); + } + if (jitterThread != NULL) { + PltJoinThread(&jitterThread); + } + if (resyncThread != NULL) { + PltJoinThread(&resyncThread) ; + } + + if (heartbeatThread != NULL) { + PltCloseThread(&heartbeatThread); + heartbeatThread = NULL; + } + if (jitterThread != NULL) { + PltCloseThread(&jitterThread); + jitterThread = NULL; + } + if (resyncThread != NULL) { + PltCloseThread(&resyncThread); + resyncThread = NULL; + } return 0; } @@ -185,6 +213,13 @@ int startControlStream(void) { int configSize; PNVCTL_PACKET_HEADER response; + ctlSock = connectTcpSocket(host, 47995); + if (ctlSock == INVALID_SOCKET) { + return LastSocketError(); + } + + enableNoDelay(ctlSock); + configSize = getConfigDataSize(&streamConfig); config = allocateConfigDataForStreamConfig(&streamConfig); if (config == NULL) { diff --git a/limelight-common/Handshake.cpp b/limelight-common/Handshake.cpp index f41075c..36f0d32 100644 --- a/limelight-common/Handshake.cpp +++ b/limelight-common/Handshake.cpp @@ -1,4 +1,4 @@ - +#include "Limelight.h" #include "PlatformSockets.h" #include "PlatformThreads.h" diff --git a/limelight-common/Limelight-internal.h b/limelight-common/Limelight-internal.h new file mode 100644 index 0000000..c4b70eb --- /dev/null +++ b/limelight-common/Limelight-internal.h @@ -0,0 +1,32 @@ +#pragma once + +#include "Limelight.h" +#include "Platform.h" +#include "PlatformSockets.h" +#include "Video.h" + +#include +#define Limelog printf + +char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig); +int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig); + +int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig); +int startControlStream(void); +int stopControlStream(void); +void destroyControlStream(void); +void requestIdrFrame(void); + +int performHandshake(IP_ADDRESS host); + +void initializeVideoDepacketizer(void); +void destroyVideoDepacketizer(void); +void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length); +int getNextDecodeUnit(PDECODE_UNIT *du); +void freeDecodeUnit(PDECODE_UNIT decodeUnit); +void queueRtpPacket(PRTP_PACKET rtpPacket, int length); + +void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks); +void destroyVideoStream(void); +int startVideoStream(void* rendererContext, int drFlags); +void stopVideoStream(void); \ No newline at end of file diff --git a/limelight-common/Limelight.h b/limelight-common/Limelight.h index c5f0f7f..aac10da 100644 --- a/limelight-common/Limelight.h +++ b/limelight-common/Limelight.h @@ -1,8 +1,6 @@ #pragma once -#include "Platform.h" -#include "PlatformSockets.h" -#include "Video.h" +#define IP_ADDRESS unsigned int typedef struct _STREAM_CONFIGURATION { int width; @@ -10,11 +8,22 @@ typedef struct _STREAM_CONFIGURATION { int fps; } STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION; -typedef void (*DecoderRendererSetup)(int width, int height, int redrawRate, void* context, int drFlags); -typedef void (*DecoderRendererStart)(void); -typedef void (*DecoderRendererStop)(void); -typedef void (*DecoderRendererRelease)(void); -typedef void (*DecoderRendererSubmitDecodeUnit)(PDECODE_UNIT decodeUnit); +typedef struct _LENTRY { + struct _LENTRY *next; + char* data; + int length; +} LENTRY, *PLENTRY; + +typedef struct _DECODE_UNIT { + int fullLength; + PLENTRY bufferList; +} DECODE_UNIT, *PDECODE_UNIT; + +typedef void(*DecoderRendererSetup)(int width, int height, int redrawRate, void* context, int drFlags); +typedef void(*DecoderRendererStart)(void); +typedef void(*DecoderRendererStop)(void); +typedef void(*DecoderRendererRelease)(void); +typedef void(*DecoderRendererSubmitDecodeUnit)(PDECODE_UNIT decodeUnit); typedef struct _DECODER_RENDERER_CALLBACKS { DecoderRendererSetup setup; @@ -24,24 +33,5 @@ typedef struct _DECODER_RENDERER_CALLBACKS { DecoderRendererSubmitDecodeUnit submitDecodeUnit; } DECODER_RENDERER_CALLBACKS, *PDECODER_RENDERER_CALLBACKS; -#include -#define Limelog printf - -char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig); -int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig); - -int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig); -int startControlStream(void); -int stopControlStream(void); -void requestIdrFrame(void); - -int performHandshake(IP_ADDRESS host); - -void initializeVideoDepacketizer(void); -void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length); -PDECODE_UNIT getNextDecodeUnit(void); -void freeDecodeUnit(PDECODE_UNIT decodeUnit); -void queueRtpPacket(PRTP_PACKET rtpPacket, int length); - -void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks); -int startVideoStream(void* rendererContext, int drFlags); \ No newline at end of file +int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks); +void LiStopConnection(void); \ No newline at end of file diff --git a/limelight-common/LinkedBlockingQueue.cpp b/limelight-common/LinkedBlockingQueue.cpp index a69dabc..8a6400a 100644 --- a/limelight-common/LinkedBlockingQueue.cpp +++ b/limelight-common/LinkedBlockingQueue.cpp @@ -1,6 +1,13 @@ #include "LinkedBlockingQueue.h" -int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound) { +PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) { + PltDeleteMutex(&queueHead->mutex); + PltCloseEvent(&queueHead->containsDataEvent); + + return queueHead->head; +} + +int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound) { int err; err = PltCreateEvent(&queueHead->containsDataEvent); @@ -15,16 +22,17 @@ int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBoun queueHead->head = NULL; queueHead->sizeBound = sizeBound; + queueHead->currentSize = 0; return 0; } -int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { +int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { PLINKED_BLOCKING_QUEUE_ENTRY entry, lastEntry; entry = (PLINKED_BLOCKING_QUEUE_ENTRY) malloc(sizeof(*entry)); if (entry == NULL) { - return 0; + return LBQ_NO_MEMORY; } entry->next = NULL; @@ -32,10 +40,18 @@ int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { PltLockMutex(&queueHead->mutex); + if (queueHead->currentSize == queueHead->sizeBound) { + PltUnlockMutex(&queueHead->mutex); + free(entry); + return LBQ_BOUND_EXCEEDED; + } + if (queueHead->head == NULL) { + LC_ASSERT(queueHead->currentSize == 0); queueHead->head = entry; } else { + LC_ASSERT(queueHead->currentSize >= 1); lastEntry = queueHead->head; while (lastEntry->next != NULL) { lastEntry = lastEntry->next; @@ -43,19 +59,24 @@ int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { lastEntry->next = entry; } + queueHead->currentSize++; + PltUnlockMutex(&queueHead->mutex); PltSetEvent(&queueHead->containsDataEvent); - return 1; + return LBQ_SUCCESS; } -void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) { +int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; - void* data; + int err; for (;;) { - PltWaitForEvent(&queueHead->containsDataEvent); + err = PltWaitForEvent(&queueHead->containsDataEvent); + if (err != PLT_WAIT_SUCCESS) { + return LBQ_INTERRUPTED; + } PltLockMutex(&queueHead->mutex); @@ -67,7 +88,7 @@ void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) { entry = queueHead->head; queueHead->head = entry->next; - data = entry->data; + *data = entry->data; free(entry); @@ -80,5 +101,5 @@ void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) { break; } - return data; + return LBQ_SUCCESS; } diff --git a/limelight-common/LinkedBlockingQueue.h b/limelight-common/LinkedBlockingQueue.h index 15a1076..142addb 100644 --- a/limelight-common/LinkedBlockingQueue.h +++ b/limelight-common/LinkedBlockingQueue.h @@ -3,6 +3,11 @@ #include "platform.h" #include "PlatformThreads.h" +#define LBQ_SUCCESS 0 +#define LBQ_INTERRUPTED 1 +#define LBQ_BOUND_EXCEEDED 2 +#define LBQ_NO_MEMORY 3 + typedef struct _LINKED_BLOCKING_QUEUE_ENTRY { struct _LINKED_BLOCKING_QUEUE_ENTRY *next; void* data; @@ -12,9 +17,11 @@ typedef struct _LINKED_BLOCKING_QUEUE { PLT_MUTEX mutex; PLT_EVENT containsDataEvent; int sizeBound; + int currentSize; PLINKED_BLOCKING_QUEUE_ENTRY head; } LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE; -int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound); -int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data); -void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead); \ No newline at end of file +int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound); +int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data); +int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); +PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead); \ No newline at end of file diff --git a/limelight-common/Platform.h b/limelight-common/Platform.h index 00de7b2..02df517 100644 --- a/limelight-common/Platform.h +++ b/limelight-common/Platform.h @@ -1,5 +1,6 @@ #pragma once + #ifdef _WIN32 #include #else @@ -7,4 +8,22 @@ #include #include #include +#endif + + +#ifdef _WIN32 +# if WINAPI_FAMILY == WINAPI_FAMILY_PHONE_APP +# define LC_WINDOWS_PHONE +# elif WINAPI_FAMILY == WINAPI_FAMILY_DESKTOP_APP +# define LC_WINDOWS +# endif +#else +# define LC_POSIX +#endif + +#if defined(LC_WINDOWS_PHONE) || defined(LC_WINDOWS) +#include +#define LC_ASSERT _ASSERTE +#else +#define LC_ASSERT #endif \ No newline at end of file diff --git a/limelight-common/PlatformSockets.cpp b/limelight-common/PlatformSockets.cpp index f0af9ca..3188dc6 100644 --- a/limelight-common/PlatformSockets.cpp +++ b/limelight-common/PlatformSockets.cpp @@ -1,5 +1,5 @@ #include "PlatformSockets.h" -#include "Limelight.h" +#include "Limelight-internal.h" SOCKET bindUdpSocket(unsigned short port) { SOCKET s; @@ -57,4 +57,20 @@ int enableNoDelay(SOCKET s) { } return 0; +} + +int initializePlatformSockets(void) { +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + WSADATA data; + return WSAStartup(MAKEWORD(2, 0), &data); +#else + return 0; +#endif +} + +void cleanupPlatformSockets(void) { +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + WSACleanup(); +#else +#endif } \ No newline at end of file diff --git a/limelight-common/PlatformSockets.h b/limelight-common/PlatformSockets.h index 340d5a5..5cea1dc 100644 --- a/limelight-common/PlatformSockets.h +++ b/limelight-common/PlatformSockets.h @@ -1,5 +1,7 @@ #pragma once +#include "Limelight.h" + #ifdef _WIN32 #include #define LastSocketError() WSAGetLastError() @@ -16,8 +18,8 @@ #define closesocket(x) close(x) #endif -#define IP_ADDRESS unsigned int - SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port); SOCKET bindUdpSocket(unsigned short port); -int enableNoDelay(SOCKET s); \ No newline at end of file +int enableNoDelay(SOCKET s); +int initializePlatformSockets(void); +void cleanupPlatformSockets(void); \ No newline at end of file diff --git a/limelight-common/PlatformThreads.cpp b/limelight-common/PlatformThreads.cpp index 1016b62..351c67b 100644 --- a/limelight-common/PlatformThreads.cpp +++ b/limelight-common/PlatformThreads.cpp @@ -6,7 +6,7 @@ struct thread_context { void* context; }; -#ifdef _WIN32 +#ifdef LC_WINDOWS DWORD WINAPI ThreadProc(LPVOID lpParameter) { struct thread_context *ctx = (struct thread_context *)lpParameter; @@ -29,7 +29,7 @@ void* ThreadProc(void* context) { #endif void PltSleepMs(int ms) { -#ifdef _WIN32 +#ifdef LC_WINDOWS Sleep(ms); #else long usecs = (long)ms * 1000; @@ -38,8 +38,8 @@ void PltSleepMs(int ms) { } int PltCreateMutex(PLT_MUTEX *mutex) { -#ifdef _WIN32 - *mutex = CreateMutex(NULL, FALSE, NULL); +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + *mutex = CreateMutexEx(NULL, NULL, 0, 0); if (!*mutex) { return -1; } @@ -50,7 +50,7 @@ int PltCreateMutex(PLT_MUTEX *mutex) { } void PltDeleteMutex(PLT_MUTEX *mutex) { -#ifdef _WIN32 +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) CloseHandle(*mutex); #else pthread_mutex_destroy(mutex); @@ -58,15 +58,15 @@ void PltDeleteMutex(PLT_MUTEX *mutex) { } void PltLockMutex(PLT_MUTEX *mutex) { -#ifdef _WIN32 - WaitForSingleObject(*mutex, INFINITE); +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + WaitForSingleObjectEx(*mutex, INFINITE, FALSE); #else pthread_mutex_lock(mutex); #endif } void PltUnlockMutex(PLT_MUTEX *mutex) { -#ifdef _WIN32 +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) ReleaseMutex(*mutex); #else pthread_mutex_unlock(mutex); @@ -74,20 +74,28 @@ void PltUnlockMutex(PLT_MUTEX *mutex) { } void PltJoinThread(PLT_THREAD *thread) { -#ifdef _WIN32 - WaitForSingleObject(*thread, INFINITE); +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + WaitForSingleObjectEx(*thread, INFINITE, FALSE); #else pthread_join(*thread, NULL); #endif } void PltCloseThread(PLT_THREAD *thread) { -#ifdef _WIN32 +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) CloseHandle(*thread); #else #endif } +void PltInterruptThread(PLT_THREAD *thread) { +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + CloseHandle(*thread); +#else + pthread_cancel(*thread); +#endif +} + int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) { struct thread_context *ctx; int err; @@ -125,8 +133,8 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) { } int PltCreateEvent(PLT_EVENT *event) { -#ifdef _WIN32 - *event = CreateEvent(NULL, TRUE, FALSE, NULL); +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) + *event = CreateEventEx(NULL, NULL, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS); if (!*event) { return -1; } @@ -166,25 +174,25 @@ void PltClearEvent(PLT_EVENT *event) { #endif } -void PltPulseEvent(PLT_EVENT *event) { +int PltWaitForEvent(PLT_EVENT *event) { #ifdef _WIN32 - PulseEvent(*event); -#else - pthread_mutex_lock(&event->mutex); - event->signalled = 1; - pthread_cond_broadcast(&event->cond); - event->signalled = 0; - pthread_mutex_unlock(&event->mutex); -#endif -} - -void PltWaitForEvent(PLT_EVENT *event) { -#ifdef _WIN32 - WaitForSingleObject(*event, INFINITE); + DWORD error = WaitForSingleObjectEx(*event, INFINITE, FALSE); + if (error == STATUS_WAIT_0) { + return PLT_WAIT_SUCCESS; + } + else if (error == WAIT_IO_COMPLETION) { + return PLT_WAIT_INTERRUPTED; + } + else { + LC_ASSERT(0); + return -1; + } #else + pthread_mutex_lock(&event->mutex); while (!event->signalled) { pthread_cond_wait(&event->cond, &event->mutex); } + pthread_mutex_unlock(&event->mutex); #endif } diff --git a/limelight-common/PlatformThreads.h b/limelight-common/PlatformThreads.h index 9326cf8..0d3461e 100644 --- a/limelight-common/PlatformThreads.h +++ b/limelight-common/PlatformThreads.h @@ -4,11 +4,11 @@ typedef void (*ThreadEntry)(void *context); -#ifdef _WIN32 +#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE) typedef HANDLE PLT_THREAD; typedef HANDLE PLT_MUTEX; typedef HANDLE PLT_EVENT; -#else +#elif defined (LC_POSIX) typedef pthread_t PLT_THREAD; typedef pthread_mutex_t PLT_MUTEX; typedef struct _PLT_EVENT { @@ -16,6 +16,8 @@ typedef struct _PLT_EVENT { pthread_cond_t cond; int signalled; } PLT_EVENT; +#else +#error Unsupported platform #endif int PltCreateMutex(PLT_MUTEX *mutex); @@ -25,13 +27,16 @@ void PltUnlockMutex(PLT_MUTEX *mutex); int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread); void PltCloseThread(PLT_THREAD *thread); +void PltInterruptThread(PLT_THREAD *thread); void PltJoinThread(PLT_THREAD *thread); int PltCreateEvent(PLT_EVENT *event); void PltCloseEvent(PLT_EVENT *event); void PltSetEvent(PLT_EVENT *event); void PltClearEvent(PLT_EVENT *event); -void PltPulseEvent(PLT_EVENT *event); -void PltWaitForEvent(PLT_EVENT *event); +int PltWaitForEvent(PLT_EVENT *event); + +#define PLT_WAIT_SUCCESS 0 +#define PLT_WAIT_INTERRUPTED 1 void PltSleepMs(int ms); \ No newline at end of file diff --git a/limelight-common/Video.h b/limelight-common/Video.h index 57eabe3..368fc99 100644 --- a/limelight-common/Video.h +++ b/limelight-common/Video.h @@ -1,11 +1,5 @@ #pragma once -typedef struct _LENTRY { - struct _LENTRY *next; - char* data; - int length; -} LENTRY, *PLENTRY; - typedef struct _NV_VIDEO_PACKET { int frameIndex; int packetIndex; @@ -20,9 +14,4 @@ typedef struct _RTP_PACKET { char packetType; unsigned short sequenceNumber; char reserved[8]; -} RTP_PACKET, *PRTP_PACKET; - -typedef struct _DECODE_UNIT { - int fullLength; - PLENTRY bufferList; -} DECODE_UNIT, *PDECODE_UNIT; \ No newline at end of file +} RTP_PACKET, *PRTP_PACKET; \ No newline at end of file diff --git a/limelight-common/VideoDepacketizer.cpp b/limelight-common/VideoDepacketizer.cpp index 58e0a13..49c607d 100644 --- a/limelight-common/VideoDepacketizer.cpp +++ b/limelight-common/VideoDepacketizer.cpp @@ -1,5 +1,5 @@ #include "Platform.h" -#include "Limelight.h" +#include "Limelight-internal.h" #include "LinkedBlockingQueue.h" #include "Video.h" @@ -18,7 +18,7 @@ typedef struct _BUFFER_DESC { } BUFFER_DESC, *PBUFFER_DESC; void initializeVideoDepacketizer(void) { - initializeLinkedBlockingQueue(&decodeUnitQueue, 15); + LbqInitializeLinkedBlockingQueue(&decodeUnitQueue, 15); } static void clearAvcNalState(void) { @@ -34,6 +34,20 @@ static void clearAvcNalState(void) { nalChainDataLength = 0; } +void destroyVideoDepacketizer(void) { + PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry; + + entry = LbqDestroyLinkedBlockingQueue(&decodeUnitQueue); + while (entry != NULL) { + nextEntry = entry->next; + free(entry->data); + free(entry); + entry = nextEntry; + } + + clearAvcNalState(); +} + static int isSeqFrameStart(PBUFFER_DESC candidate) { return (candidate->length == 4 && candidate->data[candidate->offset + candidate->length - 1] == 1); } @@ -92,7 +106,7 @@ static void reassembleFrame(void) { nalChainHead = NULL; nalChainDataLength = 0; - if (!offerQueueItem(&decodeUnitQueue, du)) { + if (!LbqOfferQueueItem(&decodeUnitQueue, du)) { nalChainHead = du->bufferList; nalChainDataLength = du->fullLength; free(du); @@ -105,8 +119,14 @@ static void reassembleFrame(void) { } } -PDECODE_UNIT getNextDecodeUnit(void) { - return (PDECODE_UNIT) waitForQueueElement(&decodeUnitQueue); +int getNextDecodeUnit(PDECODE_UNIT *du) { + int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**)du); + if (err == LBQ_SUCCESS) { + return 1; + } + else { + return 0; + } } void freeDecodeUnit(PDECODE_UNIT decodeUnit) { diff --git a/limelight-common/VideoStream.cpp b/limelight-common/VideoStream.cpp index 436a1b2..98abf40 100644 --- a/limelight-common/VideoStream.cpp +++ b/limelight-common/VideoStream.cpp @@ -1,4 +1,4 @@ -#include "Limelight.h" +#include "Limelight-internal.h" #include "PlatformSockets.h" #include "PlatformThreads.h" #include "LinkedBlockingQueue.h" @@ -8,6 +8,7 @@ PSTREAM_CONFIGURATION configuration; IP_ADDRESS remoteHost; SOCKET rtpSocket; +SOCKET firstFrameSocket; LINKED_BLOCKING_QUEUE packetQueue; @@ -21,7 +22,24 @@ void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, configuration = streamConfig; remoteHost = host; - initializeLinkedBlockingQueue(&packetQueue, 15); + LbqInitializeLinkedBlockingQueue(&packetQueue, 30); + + initializeVideoDepacketizer(); +} + +void destroyVideoStream(void) { + PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry; + + destroyVideoDepacketizer(); + + entry = LbqDestroyLinkedBlockingQueue(&packetQueue); + + while (entry != NULL) { + nextEntry = entry->next; + free(entry->data); + free(entry); + entry = nextEntry; + } } static void UdpPingThreadProc(void *context) { @@ -58,23 +76,38 @@ static void ReceiveThreadProc(void* context) { err = recv(rtpSocket, &buffer[sizeof(int)], 1500, 0); if (err <= 0) { Limelog("Receive thread terminating #2\n"); + free(buffer); return; } memcpy(buffer, &err, sizeof(err)); - if (!offerQueueItem(&packetQueue, buffer)) { + err = LbqOfferQueueItem(&packetQueue, buffer); + if (err != LBQ_SUCCESS) { free(buffer); - Limelog("Packet queue overflow\n"); + } + + if (err == LBQ_BOUND_EXCEEDED) { + Limelog("Video packet queue overflow\n"); + } + else if (err == LBQ_INTERRUPTED) { + Limelog("Receive thread terminating #2\n"); + return; } } } static void DepacketizerThreadProc(void* context) { int length; + int err; + char *data; for (;;) { - char* data = (char*) waitForQueueElement(&packetQueue); + err = LbqWaitForQueueElement(&packetQueue, (void**)&data); + if (err != LBQ_SUCCESS) { + Limelog("Depacketizer thread terminating\n"); + return; + } memcpy(&length, data, sizeof(int)); queueRtpPacket((PRTP_PACKET) &data[sizeof(int)], length); @@ -84,8 +117,12 @@ static void DepacketizerThreadProc(void* context) { } static void DecoderThreadProc(void* context) { + PDECODE_UNIT du; for (;;) { - PDECODE_UNIT du = getNextDecodeUnit(); + if (!getNextDecodeUnit(&du)) { + printf("Decoder thread terminating\n"); + return; + } callbacks->submitDecodeUnit(du); @@ -97,16 +134,15 @@ int readFirstFrame(void) { char firstFrame[1000]; int err; int offset = 0; - SOCKET s; - s = connectTcpSocket(remoteHost, 47996); - if (s == INVALID_SOCKET) { + firstFrameSocket = connectTcpSocket(remoteHost, 47996); + if (firstFrameSocket == INVALID_SOCKET) { return LastSocketError(); } Limelog("Waiting for first frame\n"); for (;;) { - err = recv(s, &firstFrame[offset], sizeof(firstFrame) - offset, 0); + err = recv(firstFrameSocket, &firstFrame[offset], sizeof(firstFrame) - offset, 0); if (err <= 0) { break; } @@ -120,6 +156,56 @@ int readFirstFrame(void) { return 0; } +void stopVideoStream(void) { + if (udpPingThread != NULL) { + PltInterruptThread(&udpPingThread); + } + if (receiveThread != NULL) { + PltInterruptThread(&receiveThread); + } + if (depacketizerThread != NULL) { + PltInterruptThread(&depacketizerThread); + } + if (decoderThread != NULL) { + PltInterruptThread(&decoderThread); + } + + if (firstFrameSocket != INVALID_SOCKET) { + closesocket(firstFrameSocket); + firstFrameSocket = INVALID_SOCKET; + } + if (rtpSocket != INVALID_SOCKET) { + closesocket(rtpSocket); + rtpSocket = INVALID_SOCKET; + } + + if (udpPingThread != NULL) { + PltJoinThread(&udpPingThread); + } + if (receiveThread != NULL) { + PltJoinThread(&receiveThread); + } + if (depacketizerThread != NULL) { + PltJoinThread(&depacketizerThread); + } + if (decoderThread != NULL) { + PltJoinThread(&decoderThread); + } + + if (udpPingThread != NULL) { + PltCloseThread(&udpPingThread); + } + if (receiveThread != NULL) { + PltCloseThread(&receiveThread); + } + if (depacketizerThread != NULL) { + PltCloseThread(&depacketizerThread); + } + if (decoderThread != NULL) { + PltCloseThread(&decoderThread); + } +} + int startVideoStream(void* rendererContext, int drFlags) { int err; @@ -128,8 +214,6 @@ int startVideoStream(void* rendererContext, int drFlags) { configuration->height, 60, rendererContext, drFlags); } - initializeVideoDepacketizer(); - // FIXME: Set socket options here rtpSocket = bindUdpSocket(47998); diff --git a/limelight-common/limelight-common.vcxproj b/limelight-common/limelight-common.vcxproj index e7943ad..5afd4dc 100644 --- a/limelight-common/limelight-common.vcxproj +++ b/limelight-common/limelight-common.vcxproj @@ -76,6 +76,7 @@ + @@ -86,6 +87,7 @@ + diff --git a/limelight-common/limelight-common.vcxproj.filters b/limelight-common/limelight-common.vcxproj.filters index cb39ece..892f58e 100644 --- a/limelight-common/limelight-common.vcxproj.filters +++ b/limelight-common/limelight-common.vcxproj.filters @@ -45,6 +45,9 @@ Source Files + + Source Files + @@ -53,9 +56,6 @@ Source Files - - Source Files - Source Files @@ -68,5 +68,11 @@ Source Files + + Source Files + + + Source Files + \ No newline at end of file