From 103c05272919392d8ba3ef406feaae7f9007c108 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 18 Jan 2014 18:53:50 -0500 Subject: [PATCH] Cross-platform threading, sockets updates, and control stream implementation, and various other fixes --- limelight-common/ByteBuffer.h | 8 +- limelight-common/Config.cpp | 31 ++- limelight-common/ControlStream.cpp | 211 ++++++++++++++++++ limelight-common/Limelight.h | 13 +- limelight-common/PlatformSockets.cpp | 13 ++ limelight-common/PlatformSockets.h | 5 +- limelight-common/PlatformThreads.cpp | 96 ++++++++ limelight-common/PlatformThreads.h | 18 ++ limelight-common/VideoDepacketizer.cpp | 0 limelight-common/limelight-common.vcxproj | 16 ++ .../limelight-common.vcxproj.filters | 40 ++++ 11 files changed, 443 insertions(+), 8 deletions(-) create mode 100644 limelight-common/ControlStream.cpp create mode 100644 limelight-common/PlatformThreads.cpp create mode 100644 limelight-common/PlatformThreads.h create mode 100644 limelight-common/VideoDepacketizer.cpp diff --git a/limelight-common/ByteBuffer.h b/limelight-common/ByteBuffer.h index 9b40de3..5ccebe0 100644 --- a/limelight-common/ByteBuffer.h +++ b/limelight-common/ByteBuffer.h @@ -5,10 +5,10 @@ typedef struct _BYTE_BUFFER { char* buffer; - int offset; - int length; - int position; - int byteOrder; + unsigned int offset; + unsigned int length; + unsigned int position; + unsigned int byteOrder; } BYTE_BUFFER, *PBYTE_BUFFER; void BbInitializeWrappedBuffer(PBYTE_BUFFER buff, char* data, int offset, int length, int byteOrder); diff --git a/limelight-common/Config.cpp b/limelight-common/Config.cpp index a01a830..da69e2a 100644 --- a/limelight-common/Config.cpp +++ b/limelight-common/Config.cpp @@ -92,10 +92,15 @@ const int UNKNOWN_CONFIG [] = { (int) 0xFE000000 }; -const int CONFIG_SIZE = sizeof(UNKNOWN_CONFIG) +(8 * 4) + 3; +const int CONFIG_SIZE = sizeof(UNKNOWN_CONFIG) + (8 * 4) + 3; + +int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig) { + return CONFIG_SIZE; +} char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig) { BYTE_BUFFER bb; + int i; char* config = (char *)malloc(CONFIG_SIZE); if (config == NULL) { return NULL; @@ -103,4 +108,28 @@ char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig) { BbInitializeWrappedBuffer(&bb, config, 0, CONFIG_SIZE, BYTE_ORDER_LITTLE); + BbPutShort(&bb, 0x1204); + BbPutShort(&bb, 0x0004); + BbPutInt(&bb, streamConfig->width); + + BbPutShort(&bb, 0x1205); + BbPutShort(&bb, 0x0004); + BbPutInt(&bb, streamConfig->height); + + BbPutShort(&bb, 0x1206); + BbPutShort(&bb, 0x0004); + BbPutInt(&bb, 1); + + BbPutShort(&bb, 0x120A); + BbPutShort(&bb, 0x0004); + BbPutInt(&bb, streamConfig->fps); + + for (i = 0; i < sizeof(UNKNOWN_CONFIG) / sizeof(int); i++) { + BbPutInt(&bb, UNKNOWN_CONFIG[i]); + } + + BbPutShort(&bb, 0x0013); + BbPut(&bb, 0x00); + + return config; } diff --git a/limelight-common/ControlStream.cpp b/limelight-common/ControlStream.cpp new file mode 100644 index 0000000..7a42f3d --- /dev/null +++ b/limelight-common/ControlStream.cpp @@ -0,0 +1,211 @@ +#include "Limelight.h" +#include "PlatformSockets.h" +#include "PlatformThreads.h" + +typedef struct _CONTROL_STREAM { + SOCKET s; + STREAM_CONFIGURATION streamConfig; + PLT_THREAD heartbeatThread; + PLT_THREAD jitterThread; + PLT_THREAD resyncThread; +} CONTROL_STREAM, *PCONTROL_STREAM; + +typedef struct _NVCTL_PACKET_HEADER { + unsigned short type; + unsigned short payloadLength; +} NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER; + +const short PTYPE_KEEPALIVE = 0x13ff; +const short PPAYLEN_KEEPALIVE = 0x0000; + +const short PTYPE_HEARTBEAT = 0x1401; +const short PPAYLEN_HEARTBEAT = 0x0000; + +const short PTYPE_1405 = 0x1405; +const short PPAYLEN_1405 = 0x0000; + +const short PTYPE_RESYNC = 0x1404; +const short PPAYLEN_RESYNC = 16; + +const short PTYPE_JITTER = 0x140c; +const short PPAYLEN_JITTER = 0x10; + +void* initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig) { + PCONTROL_STREAM ctx; + + ctx = (PCONTROL_STREAM) malloc(sizeof(*ctx)); + if (ctx == NULL) { + return NULL; + } + + ctx->s = connectTcpSocket(host, 47995); + if (ctx->s == INVALID_SOCKET) { + free(ctx); + return NULL; + } + + enableNoDelay(ctx->s); + + memcpy(&ctx->streamConfig, streamConfig, sizeof(*streamConfig)); + + return ctx; +} + +static PNVCTL_PACKET_HEADER readNvctlPacket(PCONTROL_STREAM stream) { + NVCTL_PACKET_HEADER staticHeader; + PNVCTL_PACKET_HEADER fullPacket; + int err; + + err = recv(stream->s, (char*) &staticHeader, sizeof(staticHeader), 0); + if (err != sizeof(staticHeader)) { + return NULL; + } + + fullPacket = (PNVCTL_PACKET_HEADER) malloc(staticHeader.payloadLength + sizeof(staticHeader)); + if (fullPacket == NULL) { + return NULL; + } + + memcpy(fullPacket, &staticHeader, sizeof(staticHeader)); + err = recv(stream->s, (char*) (fullPacket + 1), staticHeader.payloadLength, 0); + if (err != staticHeader.payloadLength) { + free(fullPacket); + return NULL; + } + + return fullPacket; +} + +static PNVCTL_PACKET_HEADER sendNoPayloadAndReceive(PCONTROL_STREAM stream, short ptype, short paylen) { + NVCTL_PACKET_HEADER header; + int err; + + header.type = ptype; + header.payloadLength = paylen; + err = send(stream->s, (char*) &header, sizeof(header), 0); + if (err != sizeof(header)) { + return NULL; + } + + return readNvctlPacket(stream); +} + +static void heartbeatThreadFunc(void* context) { + PCONTROL_STREAM stream = (PCONTROL_STREAM) context; + int err; + NVCTL_PACKET_HEADER header; + + for (;;) { + header.type = PTYPE_HEARTBEAT; + header.payloadLength = PPAYLEN_HEARTBEAT; + err = send(stream->s, (char*) &header, sizeof(header), 0); + if (err != sizeof(header)) { + Limelog("Heartbeat thread terminating\n"); + return; + } + + Sleep(3000); + } +} + +static void jitterThreadFunc(void* context) { + PCONTROL_STREAM stream = (PCONTROL_STREAM) context; + int payload[4]; + NVCTL_PACKET_HEADER header; + int err; + + header.type = PTYPE_JITTER; + header.payloadLength = PPAYLEN_JITTER; + for (;;) { + err = send(stream->s, (char*) &header, sizeof(header), 0); + if (err != sizeof(header)) { + Limelog("Jitter thread terminating #1\n"); + return; + } + + payload[0] = 0; + payload[1] = 77; + payload[2] = 888; + payload[3] = 0; // FIXME: Sequence number? + + err = send(stream->s, (char*) payload, sizeof(payload), 0); + if (err != sizeof(payload)) { + Limelog("Jitter thread terminating #2\n"); + return; + } + + Sleep(100); + } +} + +static void resyncThreadFunc(void* context) { + PCONTROL_STREAM stream = (PCONTROL_STREAM) context; +} + +int stopControlStream(void* context) { + PCONTROL_STREAM stream = (PCONTROL_STREAM) context; + + closesocket(stream->s); + + PltJoinThread(stream->heartbeatThread); + PltJoinThread(stream->jitterThread); + PltJoinThread(stream->resyncThread); + + PltCloseThread(stream->heartbeatThread); + PltCloseThread(stream->jitterThread); + PltCloseThread(stream->resyncThread); + + return 0; +} + +int startControlStream(void* context) { + PCONTROL_STREAM stream = (PCONTROL_STREAM) context; + int err; + char* config; + int configSize; + PNVCTL_PACKET_HEADER response; + + configSize = getConfigDataSize(&stream->streamConfig); + config = allocateConfigDataForStreamConfig(&stream->streamConfig); + if (config == NULL) { + return NULL; + } + + // Send config + err = send(stream->s, config, configSize, 0); + free(config); + if (err != configSize) { + return NULL; + } + + // Ping pong + response = sendNoPayloadAndReceive(stream, PTYPE_HEARTBEAT, PPAYLEN_HEARTBEAT); + if (response == NULL) { + return NULL; + } + free(response); + + // 1405 + response = sendNoPayloadAndReceive(stream, PTYPE_1405, PPAYLEN_1405); + if (response == NULL) { + return NULL; + } + free(response); + + err = PltCreateThread(heartbeatThreadFunc, context, &stream->heartbeatThread); + if (err != 0) { + return err; + } + + err = PltCreateThread(jitterThreadFunc, context, &stream->jitterThread); + if (err != 0) { + return err; + } + + err = PltCreateThread(resyncThreadFunc, context, &stream->resyncThread); + if (err != 0) { + return err; + } + + return 0; +} \ No newline at end of file diff --git a/limelight-common/Limelight.h b/limelight-common/Limelight.h index b95a4b1..545b2b5 100644 --- a/limelight-common/Limelight.h +++ b/limelight-common/Limelight.h @@ -1,7 +1,18 @@ #include "Platform.h" +#include "PlatformSockets.h" typedef struct _STREAM_CONFIGURATION { int width; int height; int fps; -} STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION; \ No newline at end of file +} STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION; + +#include +#define Limelog printf + +char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig); +int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig); + +void* initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig); +int startControlStream(void* context); +int stopControlStream(void* context); \ No newline at end of file diff --git a/limelight-common/PlatformSockets.cpp b/limelight-common/PlatformSockets.cpp index d1205f4..4cbc91a 100644 --- a/limelight-common/PlatformSockets.cpp +++ b/limelight-common/PlatformSockets.cpp @@ -19,4 +19,17 @@ SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port) { } return s; +} + +int enableNoDelay(SOCKET s) { + int err; + int val; + + val = 1; + err = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val)); + if (err == SOCKET_ERROR) { + return LastSocketError(); + } + + return 0; } \ No newline at end of file diff --git a/limelight-common/PlatformSockets.h b/limelight-common/PlatformSockets.h index 10c7c0a..89c2689 100644 --- a/limelight-common/PlatformSockets.h +++ b/limelight-common/PlatformSockets.h @@ -1,6 +1,6 @@ #ifdef _WIN32 -#include +#include #define LastSocketError() WSAGetLastError() #else #define SOCKET int @@ -12,4 +12,5 @@ #define IP_ADDRESS unsigned int -SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port); \ No newline at end of file +SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port); +int enableNoDelay(SOCKET s); \ No newline at end of file diff --git a/limelight-common/PlatformThreads.cpp b/limelight-common/PlatformThreads.cpp new file mode 100644 index 0000000..9b81208 --- /dev/null +++ b/limelight-common/PlatformThreads.cpp @@ -0,0 +1,96 @@ +#include "PlatformThreads.h" + +struct thread_context { + ThreadEntry entry; + void* context; +}; + +#ifdef _WIN32 +DWORD WINAPI ThreadProc(LPVOID lpParameter) { + struct thread_context *ctx = (struct thread_context *)lpParameter; + + ctx->entry(ctx->context); + + free(ctx); + + return 0; +} +#else +#error POSIX threads not implemented +#endif + +int PltCreateMutex(PLT_MUTEX *mutex) { +#ifdef _WIN32 + *mutex = CreateMutex(NULL, FALSE, NULL); + if (!*mutex) { + return -1; + } + return 0; +#else +#endif +} + +void PltDeleteMutex(PLT_MUTEX *mutex) { +#ifdef _WIN32 + CloseHandle(*mutex); +#else +#endif +} + +void PltLockMutex(PLT_MUTEX *mutex) { +#ifdef _WIN32 + WaitForSingleObject(*mutex, INFINITE); +#else +#endif +} + +void PltUnlockMutex(PLT_MUTEX *mutex) { +#ifdef _WIN32 + ReleaseMutex(*mutex); +#else +#endif +} + +void PltJoinThread(PLT_THREAD thread) { +#ifdef _WIN32 + WaitForSingleObject(thread, INFINITE); +#else +#endif +} + +void PltCloseThread(PLT_THREAD thread) { +#ifdef _WIN32 + CloseHandle(thread); +#else +#endif +} + +int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) { + struct thread_context *ctx; + int err; + + ctx = (struct thread_context *)malloc(sizeof(*ctx)); + if (ctx == NULL) { + return -1; + } + + ctx->entry = entry; + ctx->context = context; + +#ifdef _WIN32 + { + HANDLE hThread = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL); + if (hThread == NULL) { + free(ctx); + return -1; + } + else { + CloseHandle(hThread); + err = 0; + } + } +#else +#endif + + return err; +} \ No newline at end of file diff --git a/limelight-common/PlatformThreads.h b/limelight-common/PlatformThreads.h new file mode 100644 index 0000000..269f1da --- /dev/null +++ b/limelight-common/PlatformThreads.h @@ -0,0 +1,18 @@ +#include "Platform.h" + +typedef void (*ThreadEntry)(void *context); + +#ifdef _WIN32 +typedef HANDLE PLT_THREAD; +typedef HANDLE PLT_MUTEX; +#else +#endif + +int PltCreateMutex(PLT_MUTEX *mutex); +void PltDeleteMutex(PLT_MUTEX *mutex); +void PltLockMutex(PLT_MUTEX *mutex); +void PltUnlockMutex(PLT_MUTEX *mutex); + +int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread); +void PltCloseThread(PLT_THREAD thread); +void PltJoinThread(PLT_THREAD thread); diff --git a/limelight-common/VideoDepacketizer.cpp b/limelight-common/VideoDepacketizer.cpp new file mode 100644 index 0000000..e69de29 diff --git a/limelight-common/limelight-common.vcxproj b/limelight-common/limelight-common.vcxproj index c69be09..a44955e 100644 --- a/limelight-common/limelight-common.vcxproj +++ b/limelight-common/limelight-common.vcxproj @@ -73,6 +73,22 @@ + + + + + + + + + + + + + + + + diff --git a/limelight-common/limelight-common.vcxproj.filters b/limelight-common/limelight-common.vcxproj.filters index 34c1b07..226b809 100644 --- a/limelight-common/limelight-common.vcxproj.filters +++ b/limelight-common/limelight-common.vcxproj.filters @@ -17,4 +17,44 @@ + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + + Source Files + + \ No newline at end of file