Cleanup a bunch of the code and the interface itself

This commit is contained in:
Cameron Gutman 2014-01-20 19:11:25 -05:00
parent 3b057e4a6b
commit d6c77b0323
18 changed files with 473 additions and 126 deletions

View File

@ -1,4 +1,4 @@
#include "Limelight.h" #include "Limelight-internal.h"
#include "ByteBuffer.h" #include "ByteBuffer.h"

View File

@ -0,0 +1,111 @@
#include "Limelight-internal.h"
#include "Platform.h"
#define STAGE_NONE 0
#define STAGE_PLATFORM_INIT 1
#define STAGE_HANDSHAKE 2
#define STAGE_CONTROL_STREAM_INIT 3
#define STAGE_VIDEO_STREAM_INIT 4
#define STAGE_CONTROL_STREAM_START 5
#define STAGE_VIDEO_STREAM_START 6
int stage = STAGE_NONE;
void LiStopConnection(void) {
if (stage == STAGE_VIDEO_STREAM_START) {
Limelog("Stopping video stream...");
stopVideoStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_CONTROL_STREAM_START) {
Limelog("Stopping control stream...");
stopControlStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_VIDEO_STREAM_INIT) {
Limelog("Cleaning up video stream...");
destroyVideoStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_CONTROL_STREAM_INIT) {
Limelog("Cleaning up control stream...");
destroyControlStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_HANDSHAKE) {
stage--;
}
if (stage == STAGE_PLATFORM_INIT) {
Limelog("Cleaning up platform...");
cleanupPlatformSockets();
stage--;
}
LC_ASSERT(stage == STAGE_NONE);
}
int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks) {
int err;
Limelog("Initializing platform...");
err = initializePlatformSockets();
if (err != 0) {
Limelog("failed: %d\n", err);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_PLATFORM_INIT);
Limelog("done\n");
Limelog("Starting handshake...");
err = performHandshake(host);
if (err != 0) {
Limelog("failed: %d\n", err);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_HANDSHAKE);
Limelog("done\n");
Limelog("Initializing video stream...");
initializeVideoStream(host, streamConfig, drCallbacks);
stage++;
LC_ASSERT(stage == STAGE_VIDEO_STREAM_INIT);
Limelog("done\n");
Limelog("Initializing control stream...");
err = initializeControlStream(host, streamConfig);
if (err != 0) {
Limelog("failed: %d\n", err);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_CONTROL_STREAM_INIT);
Limelog("done\n");
Limelog("Starting control stream...");
err = startControlStream();
if (err != 0) {
Limelog("failed: %d\n", err);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_CONTROL_STREAM_START);
Limelog("done\n");
Limelog("Starting video stream...");
err = startVideoStream(NULL, 0);
if (err != 0) {
Limelog("Video stream start failed: %d\n", err);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_VIDEO_STREAM_START);
Limelog("done\n");
Cleanup:
return err;
}

View File

@ -1,4 +1,4 @@
#include "Limelight.h" #include "Limelight-internal.h"
#include "PlatformSockets.h" #include "PlatformSockets.h"
#include "PlatformThreads.h" #include "PlatformThreads.h"
@ -7,6 +7,7 @@ typedef struct _NVCTL_PACKET_HEADER {
unsigned short payloadLength; unsigned short payloadLength;
} NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER; } NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER;
IP_ADDRESS host;
SOCKET ctlSock; SOCKET ctlSock;
STREAM_CONFIGURATION streamConfig; STREAM_CONFIGURATION streamConfig;
PLT_THREAD heartbeatThread; PLT_THREAD heartbeatThread;
@ -29,21 +30,20 @@ const short PPAYLEN_RESYNC = 16;
const short PTYPE_JITTER = 0x140c; const short PTYPE_JITTER = 0x140c;
const short PPAYLEN_JITTER = 0x10; const short PPAYLEN_JITTER = 0x10;
int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfigPtr) { int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr) {
ctlSock = connectTcpSocket(host, 47995);
if (ctlSock == INVALID_SOCKET) {
return LastSocketError();
}
enableNoDelay(ctlSock);
memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr)); memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr));
PltCreateEvent(&resyncEvent); PltCreateEvent(&resyncEvent);
host = addr;
return 0; return 0;
} }
void destroyControlStream(void) {
PltCloseEvent(&resyncEvent);
}
void requestIdrFrame(void) { void requestIdrFrame(void) {
PltSetEvent(&resyncEvent); PltSetEvent(&resyncEvent);
} }
@ -166,15 +166,43 @@ static void resyncThreadFunc(void* context) {
} }
int stopControlStream(void) { int stopControlStream(void) {
closesocket(ctlSock); if (heartbeatThread != NULL) {
PltInterruptThread(&heartbeatThread);
}
if (jitterThread != NULL) {
PltInterruptThread(&jitterThread);
}
if (resyncThread != NULL) {
PltInterruptThread(&resyncThread);
}
PltJoinThread(&heartbeatThread); if (ctlSock != INVALID_SOCKET) {
PltJoinThread(&jitterThread); closesocket(ctlSock);
PltJoinThread(&resyncThread); ctlSock = INVALID_SOCKET;
}
PltCloseThread(&heartbeatThread); if (heartbeatThread != NULL) {
PltCloseThread(&jitterThread); PltJoinThread(&heartbeatThread);
PltCloseThread(&resyncThread); }
if (jitterThread != NULL) {
PltJoinThread(&jitterThread);
}
if (resyncThread != NULL) {
PltJoinThread(&resyncThread) ;
}
if (heartbeatThread != NULL) {
PltCloseThread(&heartbeatThread);
heartbeatThread = NULL;
}
if (jitterThread != NULL) {
PltCloseThread(&jitterThread);
jitterThread = NULL;
}
if (resyncThread != NULL) {
PltCloseThread(&resyncThread);
resyncThread = NULL;
}
return 0; return 0;
} }
@ -185,6 +213,13 @@ int startControlStream(void) {
int configSize; int configSize;
PNVCTL_PACKET_HEADER response; PNVCTL_PACKET_HEADER response;
ctlSock = connectTcpSocket(host, 47995);
if (ctlSock == INVALID_SOCKET) {
return LastSocketError();
}
enableNoDelay(ctlSock);
configSize = getConfigDataSize(&streamConfig); configSize = getConfigDataSize(&streamConfig);
config = allocateConfigDataForStreamConfig(&streamConfig); config = allocateConfigDataForStreamConfig(&streamConfig);
if (config == NULL) { if (config == NULL) {

View File

@ -1,4 +1,4 @@
#include "Limelight.h"
#include "PlatformSockets.h" #include "PlatformSockets.h"
#include "PlatformThreads.h" #include "PlatformThreads.h"

View File

@ -0,0 +1,32 @@
#pragma once
#include "Limelight.h"
#include "Platform.h"
#include "PlatformSockets.h"
#include "Video.h"
#include <stdio.h>
#define Limelog printf
char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig);
int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig);
int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig);
int startControlStream(void);
int stopControlStream(void);
void destroyControlStream(void);
void requestIdrFrame(void);
int performHandshake(IP_ADDRESS host);
void initializeVideoDepacketizer(void);
void destroyVideoDepacketizer(void);
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length);
int getNextDecodeUnit(PDECODE_UNIT *du);
void freeDecodeUnit(PDECODE_UNIT decodeUnit);
void queueRtpPacket(PRTP_PACKET rtpPacket, int length);
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks);
void destroyVideoStream(void);
int startVideoStream(void* rendererContext, int drFlags);
void stopVideoStream(void);

View File

@ -1,8 +1,6 @@
#pragma once #pragma once
#include "Platform.h" #define IP_ADDRESS unsigned int
#include "PlatformSockets.h"
#include "Video.h"
typedef struct _STREAM_CONFIGURATION { typedef struct _STREAM_CONFIGURATION {
int width; int width;
@ -10,11 +8,22 @@ typedef struct _STREAM_CONFIGURATION {
int fps; int fps;
} STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION; } STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION;
typedef void (*DecoderRendererSetup)(int width, int height, int redrawRate, void* context, int drFlags); typedef struct _LENTRY {
typedef void (*DecoderRendererStart)(void); struct _LENTRY *next;
typedef void (*DecoderRendererStop)(void); char* data;
typedef void (*DecoderRendererRelease)(void); int length;
typedef void (*DecoderRendererSubmitDecodeUnit)(PDECODE_UNIT decodeUnit); } LENTRY, *PLENTRY;
typedef struct _DECODE_UNIT {
int fullLength;
PLENTRY bufferList;
} DECODE_UNIT, *PDECODE_UNIT;
typedef void(*DecoderRendererSetup)(int width, int height, int redrawRate, void* context, int drFlags);
typedef void(*DecoderRendererStart)(void);
typedef void(*DecoderRendererStop)(void);
typedef void(*DecoderRendererRelease)(void);
typedef void(*DecoderRendererSubmitDecodeUnit)(PDECODE_UNIT decodeUnit);
typedef struct _DECODER_RENDERER_CALLBACKS { typedef struct _DECODER_RENDERER_CALLBACKS {
DecoderRendererSetup setup; DecoderRendererSetup setup;
@ -24,24 +33,5 @@ typedef struct _DECODER_RENDERER_CALLBACKS {
DecoderRendererSubmitDecodeUnit submitDecodeUnit; DecoderRendererSubmitDecodeUnit submitDecodeUnit;
} DECODER_RENDERER_CALLBACKS, *PDECODER_RENDERER_CALLBACKS; } DECODER_RENDERER_CALLBACKS, *PDECODER_RENDERER_CALLBACKS;
#include <stdio.h> int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks);
#define Limelog printf void LiStopConnection(void);
char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig);
int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig);
int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig);
int startControlStream(void);
int stopControlStream(void);
void requestIdrFrame(void);
int performHandshake(IP_ADDRESS host);
void initializeVideoDepacketizer(void);
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length);
PDECODE_UNIT getNextDecodeUnit(void);
void freeDecodeUnit(PDECODE_UNIT decodeUnit);
void queueRtpPacket(PRTP_PACKET rtpPacket, int length);
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks);
int startVideoStream(void* rendererContext, int drFlags);

View File

@ -1,6 +1,13 @@
#include "LinkedBlockingQueue.h" #include "LinkedBlockingQueue.h"
int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound) { PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) {
PltDeleteMutex(&queueHead->mutex);
PltCloseEvent(&queueHead->containsDataEvent);
return queueHead->head;
}
int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound) {
int err; int err;
err = PltCreateEvent(&queueHead->containsDataEvent); err = PltCreateEvent(&queueHead->containsDataEvent);
@ -15,16 +22,17 @@ int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBoun
queueHead->head = NULL; queueHead->head = NULL;
queueHead->sizeBound = sizeBound; queueHead->sizeBound = sizeBound;
queueHead->currentSize = 0;
return 0; return 0;
} }
int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) {
PLINKED_BLOCKING_QUEUE_ENTRY entry, lastEntry; PLINKED_BLOCKING_QUEUE_ENTRY entry, lastEntry;
entry = (PLINKED_BLOCKING_QUEUE_ENTRY) malloc(sizeof(*entry)); entry = (PLINKED_BLOCKING_QUEUE_ENTRY) malloc(sizeof(*entry));
if (entry == NULL) { if (entry == NULL) {
return 0; return LBQ_NO_MEMORY;
} }
entry->next = NULL; entry->next = NULL;
@ -32,10 +40,18 @@ int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) {
PltLockMutex(&queueHead->mutex); PltLockMutex(&queueHead->mutex);
if (queueHead->currentSize == queueHead->sizeBound) {
PltUnlockMutex(&queueHead->mutex);
free(entry);
return LBQ_BOUND_EXCEEDED;
}
if (queueHead->head == NULL) { if (queueHead->head == NULL) {
LC_ASSERT(queueHead->currentSize == 0);
queueHead->head = entry; queueHead->head = entry;
} }
else { else {
LC_ASSERT(queueHead->currentSize >= 1);
lastEntry = queueHead->head; lastEntry = queueHead->head;
while (lastEntry->next != NULL) { while (lastEntry->next != NULL) {
lastEntry = lastEntry->next; lastEntry = lastEntry->next;
@ -43,19 +59,24 @@ int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) {
lastEntry->next = entry; lastEntry->next = entry;
} }
queueHead->currentSize++;
PltUnlockMutex(&queueHead->mutex); PltUnlockMutex(&queueHead->mutex);
PltSetEvent(&queueHead->containsDataEvent); PltSetEvent(&queueHead->containsDataEvent);
return 1; return LBQ_SUCCESS;
} }
void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) { int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
PLINKED_BLOCKING_QUEUE_ENTRY entry; PLINKED_BLOCKING_QUEUE_ENTRY entry;
void* data; int err;
for (;;) { for (;;) {
PltWaitForEvent(&queueHead->containsDataEvent); err = PltWaitForEvent(&queueHead->containsDataEvent);
if (err != PLT_WAIT_SUCCESS) {
return LBQ_INTERRUPTED;
}
PltLockMutex(&queueHead->mutex); PltLockMutex(&queueHead->mutex);
@ -67,7 +88,7 @@ void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) {
entry = queueHead->head; entry = queueHead->head;
queueHead->head = entry->next; queueHead->head = entry->next;
data = entry->data; *data = entry->data;
free(entry); free(entry);
@ -80,5 +101,5 @@ void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead) {
break; break;
} }
return data; return LBQ_SUCCESS;
} }

View File

@ -3,6 +3,11 @@
#include "platform.h" #include "platform.h"
#include "PlatformThreads.h" #include "PlatformThreads.h"
#define LBQ_SUCCESS 0
#define LBQ_INTERRUPTED 1
#define LBQ_BOUND_EXCEEDED 2
#define LBQ_NO_MEMORY 3
typedef struct _LINKED_BLOCKING_QUEUE_ENTRY { typedef struct _LINKED_BLOCKING_QUEUE_ENTRY {
struct _LINKED_BLOCKING_QUEUE_ENTRY *next; struct _LINKED_BLOCKING_QUEUE_ENTRY *next;
void* data; void* data;
@ -12,9 +17,11 @@ typedef struct _LINKED_BLOCKING_QUEUE {
PLT_MUTEX mutex; PLT_MUTEX mutex;
PLT_EVENT containsDataEvent; PLT_EVENT containsDataEvent;
int sizeBound; int sizeBound;
int currentSize;
PLINKED_BLOCKING_QUEUE_ENTRY head; PLINKED_BLOCKING_QUEUE_ENTRY head;
} LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE; } LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE;
int initializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound); int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound);
int offerQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data); int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data);
void* waitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead); int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data);
PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead);

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#ifdef _WIN32 #ifdef _WIN32
#include <Windows.h> #include <Windows.h>
#else #else
@ -8,3 +9,21 @@
#include <string.h> #include <string.h>
#include <pthread.h> #include <pthread.h>
#endif #endif
#ifdef _WIN32
# if WINAPI_FAMILY == WINAPI_FAMILY_PHONE_APP
# define LC_WINDOWS_PHONE
# elif WINAPI_FAMILY == WINAPI_FAMILY_DESKTOP_APP
# define LC_WINDOWS
# endif
#else
# define LC_POSIX
#endif
#if defined(LC_WINDOWS_PHONE) || defined(LC_WINDOWS)
#include <crtdbg.h>
#define LC_ASSERT _ASSERTE
#else
#define LC_ASSERT
#endif

View File

@ -1,5 +1,5 @@
#include "PlatformSockets.h" #include "PlatformSockets.h"
#include "Limelight.h" #include "Limelight-internal.h"
SOCKET bindUdpSocket(unsigned short port) { SOCKET bindUdpSocket(unsigned short port) {
SOCKET s; SOCKET s;
@ -58,3 +58,19 @@ int enableNoDelay(SOCKET s) {
return 0; return 0;
} }
int initializePlatformSockets(void) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
WSADATA data;
return WSAStartup(MAKEWORD(2, 0), &data);
#else
return 0;
#endif
}
void cleanupPlatformSockets(void) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
WSACleanup();
#else
#endif
}

View File

@ -1,5 +1,7 @@
#pragma once #pragma once
#include "Limelight.h"
#ifdef _WIN32 #ifdef _WIN32
#include <Windows.h> #include <Windows.h>
#define LastSocketError() WSAGetLastError() #define LastSocketError() WSAGetLastError()
@ -16,8 +18,8 @@
#define closesocket(x) close(x) #define closesocket(x) close(x)
#endif #endif
#define IP_ADDRESS unsigned int
SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port); SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port);
SOCKET bindUdpSocket(unsigned short port); SOCKET bindUdpSocket(unsigned short port);
int enableNoDelay(SOCKET s); int enableNoDelay(SOCKET s);
int initializePlatformSockets(void);
void cleanupPlatformSockets(void);

View File

@ -6,7 +6,7 @@ struct thread_context {
void* context; void* context;
}; };
#ifdef _WIN32 #ifdef LC_WINDOWS
DWORD WINAPI ThreadProc(LPVOID lpParameter) { DWORD WINAPI ThreadProc(LPVOID lpParameter) {
struct thread_context *ctx = (struct thread_context *)lpParameter; struct thread_context *ctx = (struct thread_context *)lpParameter;
@ -29,7 +29,7 @@ void* ThreadProc(void* context) {
#endif #endif
void PltSleepMs(int ms) { void PltSleepMs(int ms) {
#ifdef _WIN32 #ifdef LC_WINDOWS
Sleep(ms); Sleep(ms);
#else #else
long usecs = (long)ms * 1000; long usecs = (long)ms * 1000;
@ -38,8 +38,8 @@ void PltSleepMs(int ms) {
} }
int PltCreateMutex(PLT_MUTEX *mutex) { int PltCreateMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
*mutex = CreateMutex(NULL, FALSE, NULL); *mutex = CreateMutexEx(NULL, NULL, 0, 0);
if (!*mutex) { if (!*mutex) {
return -1; return -1;
} }
@ -50,7 +50,7 @@ int PltCreateMutex(PLT_MUTEX *mutex) {
} }
void PltDeleteMutex(PLT_MUTEX *mutex) { void PltDeleteMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
CloseHandle(*mutex); CloseHandle(*mutex);
#else #else
pthread_mutex_destroy(mutex); pthread_mutex_destroy(mutex);
@ -58,15 +58,15 @@ void PltDeleteMutex(PLT_MUTEX *mutex) {
} }
void PltLockMutex(PLT_MUTEX *mutex) { void PltLockMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
WaitForSingleObject(*mutex, INFINITE); WaitForSingleObjectEx(*mutex, INFINITE, FALSE);
#else #else
pthread_mutex_lock(mutex); pthread_mutex_lock(mutex);
#endif #endif
} }
void PltUnlockMutex(PLT_MUTEX *mutex) { void PltUnlockMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
ReleaseMutex(*mutex); ReleaseMutex(*mutex);
#else #else
pthread_mutex_unlock(mutex); pthread_mutex_unlock(mutex);
@ -74,20 +74,28 @@ void PltUnlockMutex(PLT_MUTEX *mutex) {
} }
void PltJoinThread(PLT_THREAD *thread) { void PltJoinThread(PLT_THREAD *thread) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
WaitForSingleObject(*thread, INFINITE); WaitForSingleObjectEx(*thread, INFINITE, FALSE);
#else #else
pthread_join(*thread, NULL); pthread_join(*thread, NULL);
#endif #endif
} }
void PltCloseThread(PLT_THREAD *thread) { void PltCloseThread(PLT_THREAD *thread) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
CloseHandle(*thread); CloseHandle(*thread);
#else #else
#endif #endif
} }
void PltInterruptThread(PLT_THREAD *thread) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
CloseHandle(*thread);
#else
pthread_cancel(*thread);
#endif
}
int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) { int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) {
struct thread_context *ctx; struct thread_context *ctx;
int err; int err;
@ -125,8 +133,8 @@ int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) {
} }
int PltCreateEvent(PLT_EVENT *event) { int PltCreateEvent(PLT_EVENT *event) {
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
*event = CreateEvent(NULL, TRUE, FALSE, NULL); *event = CreateEventEx(NULL, NULL, CREATE_EVENT_MANUAL_RESET, EVENT_ALL_ACCESS);
if (!*event) { if (!*event) {
return -1; return -1;
} }
@ -166,25 +174,25 @@ void PltClearEvent(PLT_EVENT *event) {
#endif #endif
} }
void PltPulseEvent(PLT_EVENT *event) { int PltWaitForEvent(PLT_EVENT *event) {
#ifdef _WIN32 #ifdef _WIN32
PulseEvent(*event); DWORD error = WaitForSingleObjectEx(*event, INFINITE, FALSE);
#else if (error == STATUS_WAIT_0) {
pthread_mutex_lock(&event->mutex); return PLT_WAIT_SUCCESS;
event->signalled = 1; }
pthread_cond_broadcast(&event->cond); else if (error == WAIT_IO_COMPLETION) {
event->signalled = 0; return PLT_WAIT_INTERRUPTED;
pthread_mutex_unlock(&event->mutex); }
#endif else {
} LC_ASSERT(0);
return -1;
void PltWaitForEvent(PLT_EVENT *event) { }
#ifdef _WIN32
WaitForSingleObject(*event, INFINITE);
#else #else
pthread_mutex_lock(&event->mutex);
while (!event->signalled) { while (!event->signalled) {
pthread_cond_wait(&event->cond, &event->mutex); pthread_cond_wait(&event->cond, &event->mutex);
} }
pthread_mutex_unlock(&event->mutex);
#endif #endif
} }

View File

@ -4,11 +4,11 @@
typedef void (*ThreadEntry)(void *context); typedef void (*ThreadEntry)(void *context);
#ifdef _WIN32 #if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
typedef HANDLE PLT_THREAD; typedef HANDLE PLT_THREAD;
typedef HANDLE PLT_MUTEX; typedef HANDLE PLT_MUTEX;
typedef HANDLE PLT_EVENT; typedef HANDLE PLT_EVENT;
#else #elif defined (LC_POSIX)
typedef pthread_t PLT_THREAD; typedef pthread_t PLT_THREAD;
typedef pthread_mutex_t PLT_MUTEX; typedef pthread_mutex_t PLT_MUTEX;
typedef struct _PLT_EVENT { typedef struct _PLT_EVENT {
@ -16,6 +16,8 @@ typedef struct _PLT_EVENT {
pthread_cond_t cond; pthread_cond_t cond;
int signalled; int signalled;
} PLT_EVENT; } PLT_EVENT;
#else
#error Unsupported platform
#endif #endif
int PltCreateMutex(PLT_MUTEX *mutex); int PltCreateMutex(PLT_MUTEX *mutex);
@ -25,13 +27,16 @@ void PltUnlockMutex(PLT_MUTEX *mutex);
int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread); int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread);
void PltCloseThread(PLT_THREAD *thread); void PltCloseThread(PLT_THREAD *thread);
void PltInterruptThread(PLT_THREAD *thread);
void PltJoinThread(PLT_THREAD *thread); void PltJoinThread(PLT_THREAD *thread);
int PltCreateEvent(PLT_EVENT *event); int PltCreateEvent(PLT_EVENT *event);
void PltCloseEvent(PLT_EVENT *event); void PltCloseEvent(PLT_EVENT *event);
void PltSetEvent(PLT_EVENT *event); void PltSetEvent(PLT_EVENT *event);
void PltClearEvent(PLT_EVENT *event); void PltClearEvent(PLT_EVENT *event);
void PltPulseEvent(PLT_EVENT *event); int PltWaitForEvent(PLT_EVENT *event);
void PltWaitForEvent(PLT_EVENT *event);
#define PLT_WAIT_SUCCESS 0
#define PLT_WAIT_INTERRUPTED 1
void PltSleepMs(int ms); void PltSleepMs(int ms);

View File

@ -1,11 +1,5 @@
#pragma once #pragma once
typedef struct _LENTRY {
struct _LENTRY *next;
char* data;
int length;
} LENTRY, *PLENTRY;
typedef struct _NV_VIDEO_PACKET { typedef struct _NV_VIDEO_PACKET {
int frameIndex; int frameIndex;
int packetIndex; int packetIndex;
@ -21,8 +15,3 @@ typedef struct _RTP_PACKET {
unsigned short sequenceNumber; unsigned short sequenceNumber;
char reserved[8]; char reserved[8];
} RTP_PACKET, *PRTP_PACKET; } RTP_PACKET, *PRTP_PACKET;
typedef struct _DECODE_UNIT {
int fullLength;
PLENTRY bufferList;
} DECODE_UNIT, *PDECODE_UNIT;

View File

@ -1,5 +1,5 @@
#include "Platform.h" #include "Platform.h"
#include "Limelight.h" #include "Limelight-internal.h"
#include "LinkedBlockingQueue.h" #include "LinkedBlockingQueue.h"
#include "Video.h" #include "Video.h"
@ -18,7 +18,7 @@ typedef struct _BUFFER_DESC {
} BUFFER_DESC, *PBUFFER_DESC; } BUFFER_DESC, *PBUFFER_DESC;
void initializeVideoDepacketizer(void) { void initializeVideoDepacketizer(void) {
initializeLinkedBlockingQueue(&decodeUnitQueue, 15); LbqInitializeLinkedBlockingQueue(&decodeUnitQueue, 15);
} }
static void clearAvcNalState(void) { static void clearAvcNalState(void) {
@ -34,6 +34,20 @@ static void clearAvcNalState(void) {
nalChainDataLength = 0; nalChainDataLength = 0;
} }
void destroyVideoDepacketizer(void) {
PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry;
entry = LbqDestroyLinkedBlockingQueue(&decodeUnitQueue);
while (entry != NULL) {
nextEntry = entry->next;
free(entry->data);
free(entry);
entry = nextEntry;
}
clearAvcNalState();
}
static int isSeqFrameStart(PBUFFER_DESC candidate) { static int isSeqFrameStart(PBUFFER_DESC candidate) {
return (candidate->length == 4 && candidate->data[candidate->offset + candidate->length - 1] == 1); return (candidate->length == 4 && candidate->data[candidate->offset + candidate->length - 1] == 1);
} }
@ -92,7 +106,7 @@ static void reassembleFrame(void) {
nalChainHead = NULL; nalChainHead = NULL;
nalChainDataLength = 0; nalChainDataLength = 0;
if (!offerQueueItem(&decodeUnitQueue, du)) { if (!LbqOfferQueueItem(&decodeUnitQueue, du)) {
nalChainHead = du->bufferList; nalChainHead = du->bufferList;
nalChainDataLength = du->fullLength; nalChainDataLength = du->fullLength;
free(du); free(du);
@ -105,8 +119,14 @@ static void reassembleFrame(void) {
} }
} }
PDECODE_UNIT getNextDecodeUnit(void) { int getNextDecodeUnit(PDECODE_UNIT *du) {
return (PDECODE_UNIT) waitForQueueElement(&decodeUnitQueue); int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**)du);
if (err == LBQ_SUCCESS) {
return 1;
}
else {
return 0;
}
} }
void freeDecodeUnit(PDECODE_UNIT decodeUnit) { void freeDecodeUnit(PDECODE_UNIT decodeUnit) {

View File

@ -1,4 +1,4 @@
#include "Limelight.h" #include "Limelight-internal.h"
#include "PlatformSockets.h" #include "PlatformSockets.h"
#include "PlatformThreads.h" #include "PlatformThreads.h"
#include "LinkedBlockingQueue.h" #include "LinkedBlockingQueue.h"
@ -8,6 +8,7 @@ PSTREAM_CONFIGURATION configuration;
IP_ADDRESS remoteHost; IP_ADDRESS remoteHost;
SOCKET rtpSocket; SOCKET rtpSocket;
SOCKET firstFrameSocket;
LINKED_BLOCKING_QUEUE packetQueue; LINKED_BLOCKING_QUEUE packetQueue;
@ -21,7 +22,24 @@ void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig,
configuration = streamConfig; configuration = streamConfig;
remoteHost = host; remoteHost = host;
initializeLinkedBlockingQueue(&packetQueue, 15); LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
initializeVideoDepacketizer();
}
void destroyVideoStream(void) {
PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry;
destroyVideoDepacketizer();
entry = LbqDestroyLinkedBlockingQueue(&packetQueue);
while (entry != NULL) {
nextEntry = entry->next;
free(entry->data);
free(entry);
entry = nextEntry;
}
} }
static void UdpPingThreadProc(void *context) { static void UdpPingThreadProc(void *context) {
@ -58,23 +76,38 @@ static void ReceiveThreadProc(void* context) {
err = recv(rtpSocket, &buffer[sizeof(int)], 1500, 0); err = recv(rtpSocket, &buffer[sizeof(int)], 1500, 0);
if (err <= 0) { if (err <= 0) {
Limelog("Receive thread terminating #2\n"); Limelog("Receive thread terminating #2\n");
free(buffer);
return; return;
} }
memcpy(buffer, &err, sizeof(err)); memcpy(buffer, &err, sizeof(err));
if (!offerQueueItem(&packetQueue, buffer)) { err = LbqOfferQueueItem(&packetQueue, buffer);
if (err != LBQ_SUCCESS) {
free(buffer); free(buffer);
Limelog("Packet queue overflow\n"); }
if (err == LBQ_BOUND_EXCEEDED) {
Limelog("Video packet queue overflow\n");
}
else if (err == LBQ_INTERRUPTED) {
Limelog("Receive thread terminating #2\n");
return;
} }
} }
} }
static void DepacketizerThreadProc(void* context) { static void DepacketizerThreadProc(void* context) {
int length; int length;
int err;
char *data;
for (;;) { for (;;) {
char* data = (char*) waitForQueueElement(&packetQueue); err = LbqWaitForQueueElement(&packetQueue, (void**)&data);
if (err != LBQ_SUCCESS) {
Limelog("Depacketizer thread terminating\n");
return;
}
memcpy(&length, data, sizeof(int)); memcpy(&length, data, sizeof(int));
queueRtpPacket((PRTP_PACKET) &data[sizeof(int)], length); queueRtpPacket((PRTP_PACKET) &data[sizeof(int)], length);
@ -84,8 +117,12 @@ static void DepacketizerThreadProc(void* context) {
} }
static void DecoderThreadProc(void* context) { static void DecoderThreadProc(void* context) {
PDECODE_UNIT du;
for (;;) { for (;;) {
PDECODE_UNIT du = getNextDecodeUnit(); if (!getNextDecodeUnit(&du)) {
printf("Decoder thread terminating\n");
return;
}
callbacks->submitDecodeUnit(du); callbacks->submitDecodeUnit(du);
@ -97,16 +134,15 @@ int readFirstFrame(void) {
char firstFrame[1000]; char firstFrame[1000];
int err; int err;
int offset = 0; int offset = 0;
SOCKET s;
s = connectTcpSocket(remoteHost, 47996); firstFrameSocket = connectTcpSocket(remoteHost, 47996);
if (s == INVALID_SOCKET) { if (firstFrameSocket == INVALID_SOCKET) {
return LastSocketError(); return LastSocketError();
} }
Limelog("Waiting for first frame\n"); Limelog("Waiting for first frame\n");
for (;;) { for (;;) {
err = recv(s, &firstFrame[offset], sizeof(firstFrame) - offset, 0); err = recv(firstFrameSocket, &firstFrame[offset], sizeof(firstFrame) - offset, 0);
if (err <= 0) { if (err <= 0) {
break; break;
} }
@ -120,6 +156,56 @@ int readFirstFrame(void) {
return 0; return 0;
} }
void stopVideoStream(void) {
if (udpPingThread != NULL) {
PltInterruptThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltInterruptThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltInterruptThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltInterruptThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
closesocket(firstFrameSocket);
firstFrameSocket = INVALID_SOCKET;
}
if (rtpSocket != INVALID_SOCKET) {
closesocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
if (udpPingThread != NULL) {
PltJoinThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltJoinThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltJoinThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltJoinThread(&decoderThread);
}
if (udpPingThread != NULL) {
PltCloseThread(&udpPingThread);
}
if (receiveThread != NULL) {
PltCloseThread(&receiveThread);
}
if (depacketizerThread != NULL) {
PltCloseThread(&depacketizerThread);
}
if (decoderThread != NULL) {
PltCloseThread(&decoderThread);
}
}
int startVideoStream(void* rendererContext, int drFlags) { int startVideoStream(void* rendererContext, int drFlags) {
int err; int err;
@ -128,8 +214,6 @@ int startVideoStream(void* rendererContext, int drFlags) {
configuration->height, 60, rendererContext, drFlags); configuration->height, 60, rendererContext, drFlags);
} }
initializeVideoDepacketizer();
// FIXME: Set socket options here // FIXME: Set socket options here
rtpSocket = bindUdpSocket(47998); rtpSocket = bindUdpSocket(47998);

View File

@ -76,6 +76,7 @@
<ItemGroup> <ItemGroup>
<ClCompile Include="ByteBuffer.cpp" /> <ClCompile Include="ByteBuffer.cpp" />
<ClCompile Include="Config.cpp" /> <ClCompile Include="Config.cpp" />
<ClCompile Include="Connection.cpp" />
<ClCompile Include="ControlStream.cpp" /> <ClCompile Include="ControlStream.cpp" />
<ClCompile Include="Handshake.cpp" /> <ClCompile Include="Handshake.cpp" />
<ClCompile Include="LinkedBlockingQueue.cpp" /> <ClCompile Include="LinkedBlockingQueue.cpp" />
@ -86,6 +87,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="ByteBuffer.h" /> <ClInclude Include="ByteBuffer.h" />
<ClInclude Include="Limelight-internal.h" />
<ClInclude Include="Limelight.h" /> <ClInclude Include="Limelight.h" />
<ClInclude Include="LinkedBlockingQueue.h" /> <ClInclude Include="LinkedBlockingQueue.h" />
<ClInclude Include="Platform.h" /> <ClInclude Include="Platform.h" />

View File

@ -45,6 +45,9 @@
<ClCompile Include="VideoStream.cpp"> <ClCompile Include="VideoStream.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="Connection.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="PlatformSockets.h"> <ClInclude Include="PlatformSockets.h">
@ -53,9 +56,6 @@
<ClInclude Include="Platform.h"> <ClInclude Include="Platform.h">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="Limelight.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="ByteBuffer.h"> <ClInclude Include="ByteBuffer.h">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClInclude> </ClInclude>
@ -68,5 +68,11 @@
<ClInclude Include="Video.h"> <ClInclude Include="Video.h">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="Limelight-internal.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="Limelight.h">
<Filter>Source Files</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
</Project> </Project>