Cross-platform threading, sockets updates, and control stream implementation, and various other fixes

This commit is contained in:
Cameron Gutman
2014-01-18 18:53:50 -05:00
parent f02e916f6c
commit 103c052729
11 changed files with 443 additions and 8 deletions

View File

@@ -5,10 +5,10 @@
typedef struct _BYTE_BUFFER {
char* buffer;
int offset;
int length;
int position;
int byteOrder;
unsigned int offset;
unsigned int length;
unsigned int position;
unsigned int byteOrder;
} BYTE_BUFFER, *PBYTE_BUFFER;
void BbInitializeWrappedBuffer(PBYTE_BUFFER buff, char* data, int offset, int length, int byteOrder);

View File

@@ -92,10 +92,15 @@ const int UNKNOWN_CONFIG [] = {
(int) 0xFE000000
};
const int CONFIG_SIZE = sizeof(UNKNOWN_CONFIG) +(8 * 4) + 3;
const int CONFIG_SIZE = sizeof(UNKNOWN_CONFIG) + (8 * 4) + 3;
int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig) {
return CONFIG_SIZE;
}
char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig) {
BYTE_BUFFER bb;
int i;
char* config = (char *)malloc(CONFIG_SIZE);
if (config == NULL) {
return NULL;
@@ -103,4 +108,28 @@ char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig) {
BbInitializeWrappedBuffer(&bb, config, 0, CONFIG_SIZE, BYTE_ORDER_LITTLE);
BbPutShort(&bb, 0x1204);
BbPutShort(&bb, 0x0004);
BbPutInt(&bb, streamConfig->width);
BbPutShort(&bb, 0x1205);
BbPutShort(&bb, 0x0004);
BbPutInt(&bb, streamConfig->height);
BbPutShort(&bb, 0x1206);
BbPutShort(&bb, 0x0004);
BbPutInt(&bb, 1);
BbPutShort(&bb, 0x120A);
BbPutShort(&bb, 0x0004);
BbPutInt(&bb, streamConfig->fps);
for (i = 0; i < sizeof(UNKNOWN_CONFIG) / sizeof(int); i++) {
BbPutInt(&bb, UNKNOWN_CONFIG[i]);
}
BbPutShort(&bb, 0x0013);
BbPut(&bb, 0x00);
return config;
}

View File

@@ -0,0 +1,211 @@
#include "Limelight.h"
#include "PlatformSockets.h"
#include "PlatformThreads.h"
typedef struct _CONTROL_STREAM {
SOCKET s;
STREAM_CONFIGURATION streamConfig;
PLT_THREAD heartbeatThread;
PLT_THREAD jitterThread;
PLT_THREAD resyncThread;
} CONTROL_STREAM, *PCONTROL_STREAM;
typedef struct _NVCTL_PACKET_HEADER {
unsigned short type;
unsigned short payloadLength;
} NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER;
const short PTYPE_KEEPALIVE = 0x13ff;
const short PPAYLEN_KEEPALIVE = 0x0000;
const short PTYPE_HEARTBEAT = 0x1401;
const short PPAYLEN_HEARTBEAT = 0x0000;
const short PTYPE_1405 = 0x1405;
const short PPAYLEN_1405 = 0x0000;
const short PTYPE_RESYNC = 0x1404;
const short PPAYLEN_RESYNC = 16;
const short PTYPE_JITTER = 0x140c;
const short PPAYLEN_JITTER = 0x10;
void* initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig) {
PCONTROL_STREAM ctx;
ctx = (PCONTROL_STREAM) malloc(sizeof(*ctx));
if (ctx == NULL) {
return NULL;
}
ctx->s = connectTcpSocket(host, 47995);
if (ctx->s == INVALID_SOCKET) {
free(ctx);
return NULL;
}
enableNoDelay(ctx->s);
memcpy(&ctx->streamConfig, streamConfig, sizeof(*streamConfig));
return ctx;
}
static PNVCTL_PACKET_HEADER readNvctlPacket(PCONTROL_STREAM stream) {
NVCTL_PACKET_HEADER staticHeader;
PNVCTL_PACKET_HEADER fullPacket;
int err;
err = recv(stream->s, (char*) &staticHeader, sizeof(staticHeader), 0);
if (err != sizeof(staticHeader)) {
return NULL;
}
fullPacket = (PNVCTL_PACKET_HEADER) malloc(staticHeader.payloadLength + sizeof(staticHeader));
if (fullPacket == NULL) {
return NULL;
}
memcpy(fullPacket, &staticHeader, sizeof(staticHeader));
err = recv(stream->s, (char*) (fullPacket + 1), staticHeader.payloadLength, 0);
if (err != staticHeader.payloadLength) {
free(fullPacket);
return NULL;
}
return fullPacket;
}
static PNVCTL_PACKET_HEADER sendNoPayloadAndReceive(PCONTROL_STREAM stream, short ptype, short paylen) {
NVCTL_PACKET_HEADER header;
int err;
header.type = ptype;
header.payloadLength = paylen;
err = send(stream->s, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
return NULL;
}
return readNvctlPacket(stream);
}
static void heartbeatThreadFunc(void* context) {
PCONTROL_STREAM stream = (PCONTROL_STREAM) context;
int err;
NVCTL_PACKET_HEADER header;
for (;;) {
header.type = PTYPE_HEARTBEAT;
header.payloadLength = PPAYLEN_HEARTBEAT;
err = send(stream->s, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Heartbeat thread terminating\n");
return;
}
Sleep(3000);
}
}
static void jitterThreadFunc(void* context) {
PCONTROL_STREAM stream = (PCONTROL_STREAM) context;
int payload[4];
NVCTL_PACKET_HEADER header;
int err;
header.type = PTYPE_JITTER;
header.payloadLength = PPAYLEN_JITTER;
for (;;) {
err = send(stream->s, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Jitter thread terminating #1\n");
return;
}
payload[0] = 0;
payload[1] = 77;
payload[2] = 888;
payload[3] = 0; // FIXME: Sequence number?
err = send(stream->s, (char*) payload, sizeof(payload), 0);
if (err != sizeof(payload)) {
Limelog("Jitter thread terminating #2\n");
return;
}
Sleep(100);
}
}
static void resyncThreadFunc(void* context) {
PCONTROL_STREAM stream = (PCONTROL_STREAM) context;
}
int stopControlStream(void* context) {
PCONTROL_STREAM stream = (PCONTROL_STREAM) context;
closesocket(stream->s);
PltJoinThread(stream->heartbeatThread);
PltJoinThread(stream->jitterThread);
PltJoinThread(stream->resyncThread);
PltCloseThread(stream->heartbeatThread);
PltCloseThread(stream->jitterThread);
PltCloseThread(stream->resyncThread);
return 0;
}
int startControlStream(void* context) {
PCONTROL_STREAM stream = (PCONTROL_STREAM) context;
int err;
char* config;
int configSize;
PNVCTL_PACKET_HEADER response;
configSize = getConfigDataSize(&stream->streamConfig);
config = allocateConfigDataForStreamConfig(&stream->streamConfig);
if (config == NULL) {
return NULL;
}
// Send config
err = send(stream->s, config, configSize, 0);
free(config);
if (err != configSize) {
return NULL;
}
// Ping pong
response = sendNoPayloadAndReceive(stream, PTYPE_HEARTBEAT, PPAYLEN_HEARTBEAT);
if (response == NULL) {
return NULL;
}
free(response);
// 1405
response = sendNoPayloadAndReceive(stream, PTYPE_1405, PPAYLEN_1405);
if (response == NULL) {
return NULL;
}
free(response);
err = PltCreateThread(heartbeatThreadFunc, context, &stream->heartbeatThread);
if (err != 0) {
return err;
}
err = PltCreateThread(jitterThreadFunc, context, &stream->jitterThread);
if (err != 0) {
return err;
}
err = PltCreateThread(resyncThreadFunc, context, &stream->resyncThread);
if (err != 0) {
return err;
}
return 0;
}

View File

@@ -1,7 +1,18 @@
#include "Platform.h"
#include "PlatformSockets.h"
typedef struct _STREAM_CONFIGURATION {
int width;
int height;
int fps;
} STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION;
} STREAM_CONFIGURATION, *PSTREAM_CONFIGURATION;
#include <stdio.h>
#define Limelog printf
char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig);
int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig);
void* initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig);
int startControlStream(void* context);
int stopControlStream(void* context);

View File

@@ -19,4 +19,17 @@ SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port) {
}
return s;
}
int enableNoDelay(SOCKET s) {
int err;
int val;
val = 1;
err = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val));
if (err == SOCKET_ERROR) {
return LastSocketError();
}
return 0;
}

View File

@@ -1,6 +1,6 @@
#ifdef _WIN32
#include <WinSock2.h>
#include <Windows.h>
#define LastSocketError() WSAGetLastError()
#else
#define SOCKET int
@@ -12,4 +12,5 @@
#define IP_ADDRESS unsigned int
SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port);
SOCKET connectTcpSocket(IP_ADDRESS dstaddr, unsigned short port);
int enableNoDelay(SOCKET s);

View File

@@ -0,0 +1,96 @@
#include "PlatformThreads.h"
struct thread_context {
ThreadEntry entry;
void* context;
};
#ifdef _WIN32
DWORD WINAPI ThreadProc(LPVOID lpParameter) {
struct thread_context *ctx = (struct thread_context *)lpParameter;
ctx->entry(ctx->context);
free(ctx);
return 0;
}
#else
#error POSIX threads not implemented
#endif
int PltCreateMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32
*mutex = CreateMutex(NULL, FALSE, NULL);
if (!*mutex) {
return -1;
}
return 0;
#else
#endif
}
void PltDeleteMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32
CloseHandle(*mutex);
#else
#endif
}
void PltLockMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32
WaitForSingleObject(*mutex, INFINITE);
#else
#endif
}
void PltUnlockMutex(PLT_MUTEX *mutex) {
#ifdef _WIN32
ReleaseMutex(*mutex);
#else
#endif
}
void PltJoinThread(PLT_THREAD thread) {
#ifdef _WIN32
WaitForSingleObject(thread, INFINITE);
#else
#endif
}
void PltCloseThread(PLT_THREAD thread) {
#ifdef _WIN32
CloseHandle(thread);
#else
#endif
}
int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread) {
struct thread_context *ctx;
int err;
ctx = (struct thread_context *)malloc(sizeof(*ctx));
if (ctx == NULL) {
return -1;
}
ctx->entry = entry;
ctx->context = context;
#ifdef _WIN32
{
HANDLE hThread = CreateThread(NULL, 0, ThreadProc, ctx, 0, NULL);
if (hThread == NULL) {
free(ctx);
return -1;
}
else {
CloseHandle(hThread);
err = 0;
}
}
#else
#endif
return err;
}

View File

@@ -0,0 +1,18 @@
#include "Platform.h"
typedef void (*ThreadEntry)(void *context);
#ifdef _WIN32
typedef HANDLE PLT_THREAD;
typedef HANDLE PLT_MUTEX;
#else
#endif
int PltCreateMutex(PLT_MUTEX *mutex);
void PltDeleteMutex(PLT_MUTEX *mutex);
void PltLockMutex(PLT_MUTEX *mutex);
void PltUnlockMutex(PLT_MUTEX *mutex);
int PltCreateThread(ThreadEntry entry, void* context, PLT_THREAD *thread);
void PltCloseThread(PLT_THREAD thread);
void PltJoinThread(PLT_THREAD thread);

View File

View File

@@ -73,6 +73,22 @@
<ItemGroup>
<Text Include="ReadMe.txt" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="ByteBuffer.cpp" />
<ClCompile Include="Config.cpp" />
<ClCompile Include="ControlStream.cpp" />
<ClCompile Include="Handshake.cpp" />
<ClCompile Include="PlatformSockets.cpp" />
<ClCompile Include="PlatformThreads.cpp" />
<ClCompile Include="VideoDepacketizer.cpp" />
</ItemGroup>
<ItemGroup>
<ClInclude Include="ByteBuffer.h" />
<ClInclude Include="Limelight.h" />
<ClInclude Include="Platform.h" />
<ClInclude Include="PlatformSockets.h" />
<ClInclude Include="PlatformThreads.h" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>

View File

@@ -17,4 +17,44 @@
<ItemGroup>
<Text Include="ReadMe.txt" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="Handshake.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="PlatformSockets.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="Config.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="ByteBuffer.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="PlatformThreads.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="ControlStream.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="VideoDepacketizer.cpp">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="PlatformSockets.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="Platform.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="Limelight.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="ByteBuffer.h">
<Filter>Source Files</Filter>
</ClInclude>
<ClInclude Include="PlatformThreads.h">
<Filter>Source Files</Filter>
</ClInclude>
</ItemGroup>
</Project>