Update control connection for GFE 2.0.1+. Remove config and handshake code that isn't used on GFE 2.0.1+

This commit is contained in:
Cameron Gutman
2014-06-28 21:19:39 -07:00
parent 929487249d
commit ee96cccb51
10 changed files with 147 additions and 358 deletions

View File

@@ -2,6 +2,8 @@
#include "PlatformSockets.h"
#include "PlatformThreads.h"
#include "ByteBuffer.h"
typedef struct _NVCTL_PACKET_HEADER {
unsigned short type;
unsigned short payloadLength;
@@ -10,26 +12,31 @@ typedef struct _NVCTL_PACKET_HEADER {
static IP_ADDRESS host;
static SOCKET ctlSock = INVALID_SOCKET;
static STREAM_CONFIGURATION streamConfig;
static PLT_THREAD heartbeatThread;
static PLT_THREAD jitterThread;
static PLT_THREAD lossStatsThread;
static PLT_THREAD resyncThread;
static PLT_EVENT resyncEvent;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static int lossCountSinceLastReport = 0;
static long currentFrame = 0;
static const short PTYPE_KEEPALIVE = 0x13ff;
static const short PPAYLEN_KEEPALIVE = 0x0000;
#define PTYPE_START_STREAM_A 0x140b
#define PPAYLEN_START_STREAM_A 1
static const char PPAYLOAD_START_STREAM_A[1] = { 0 };
static const short PTYPE_HEARTBEAT = 0x1401;
static const short PPAYLEN_HEARTBEAT = 0x0000;
#define PTYPE_START_STREAM_B 0x1410
#define PPAYLEN_START_STREAM_B 16
static const int PPAYLOAD_START_STREAM_B[4] = { 0, 0, 0, 0xa }; // FIXME: Little endian
static const short PTYPE_1405 = 0x1405;
static const short PPAYLEN_1405 = 0x0000;
#define PTYPE_RESYNC 0x1404
#define PPAYLEN_RESYNC 24
static const short PTYPE_RESYNC = 0x1404;
static const short PPAYLEN_RESYNC = 16;
#define PTYPE_LOSS_STATS 0x140c
#define PPAYLEN_LOSS_STATS 20
static const short PTYPE_JITTER = 0x140c;
static const short PPAYLEN_JITTER = 0x10;
#define PTYPE_FRAME_STATS 0x1417
#define PPAYLEN_FRAME_STATS 64
#define LOSS_REPORT_INTERVAL_MS 50
int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr, PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr));
@@ -56,6 +63,14 @@ void connectionDetectedFrameLoss(int startFrame, int endFrame) {
PltSetEvent(&resyncEvent);
}
void connectionReceivedFrame(int frameIndex) {
currentFrame = frameIndex;
}
void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket) {
lossCountSinceLastReport += (nextReceivedPacket - lastReceivedPacket) - 1;
}
static PNVCTL_PACKET_HEADER readNvctlPacket(void) {
NVCTL_PACKET_HEADER staticHeader;
PNVCTL_PACKET_HEADER fullPacket;
@@ -83,7 +98,7 @@ static PNVCTL_PACKET_HEADER readNvctlPacket(void) {
return fullPacket;
}
static PNVCTL_PACKET_HEADER sendNoPayloadAndReceive(short ptype, short paylen) {
static int sendMessageAndForget(short ptype, short paylen, const void* payload) {
NVCTL_PACKET_HEADER header;
int err;
@@ -91,105 +106,86 @@ static PNVCTL_PACKET_HEADER sendNoPayloadAndReceive(short ptype, short paylen) {
header.payloadLength = paylen;
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
return 0;
}
if (payload != NULL) {
err = send(ctlSock, payload, paylen, 0);
if (err != paylen) {
return 0;
}
}
return 1;
}
static PNVCTL_PACKET_HEADER sendMessage(short ptype, short paylen, const void* payload) {
int success;
success = sendMessageAndForget(ptype, paylen, payload);
if (!success) {
return NULL;
}
return readNvctlPacket();
}
static void heartbeatThreadFunc(void* context) {
int err;
NVCTL_PACKET_HEADER header;
static void lossStatsThreadFunc(void* context) {
char lossStatsPayload[PPAYLEN_LOSS_STATS];
BYTE_BUFFER byteBuffer;
header.type = PTYPE_HEARTBEAT;
header.payloadLength = PPAYLEN_HEARTBEAT;
while (!PltIsThreadInterrupted(&heartbeatThread)) {
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Heartbeat thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
while (!PltIsThreadInterrupted(&lossStatsThread)) {
// Construct the payload
BbInitializeWrappedBuffer(&byteBuffer, lossStatsPayload, 0, PPAYLEN_LOSS_STATS, BYTE_ORDER_LITTLE);
BbPutInt(&byteBuffer, lossCountSinceLastReport);
BbPutInt(&byteBuffer, LOSS_REPORT_INTERVAL_MS);
BbPutInt(&byteBuffer, 1000);
BbPutLong(&byteBuffer, currentFrame);
// Send the message (and don't expect a response)
if (!sendMessageAndForget(PTYPE_LOSS_STATS,
PPAYLEN_LOSS_STATS, lossStatsPayload)) {
Limelog("Loss stats thread terminating #1\n");
return;
}
PltSleepMs(3000);
}
}
// Clear the transient state
lossCountSinceLastReport = 0;
static void jitterThreadFunc(void* context) {
int payload[4];
NVCTL_PACKET_HEADER header;
int err;
header.type = PTYPE_JITTER;
header.payloadLength = PPAYLEN_JITTER;
while (!PltIsThreadInterrupted(&jitterThread)) {
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Jitter thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
payload[0] = 0;
payload[1] = 77;
payload[2] = 888;
payload[3] = 0; // FIXME: Sequence number?
err = send(ctlSock, (char*) payload, sizeof(payload), 0);
if (err != sizeof(payload)) {
Limelog("Jitter thread terminating #2\n");
listenerCallbacks->connectionTerminated(err);
return;
}
PltSleepMs(100);
// Wait a bit
PltSleepMs(LOSS_REPORT_INTERVAL_MS);
}
}
static void resyncThreadFunc(void* context) {
long long payload[2];
NVCTL_PACKET_HEADER header;
long long payload[3];
PNVCTL_PACKET_HEADER response;
int err;
header.type = PTYPE_RESYNC;
header.payloadLength = PPAYLEN_RESYNC;
while (!PltIsThreadInterrupted(&resyncThread)) {
// Wait for a resync request
PltWaitForEvent(&resyncEvent);
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Resync thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
// Form the payload
payload[0] = 0;
payload[1] = 0xFFFFF;
payload[2] = 0;
Limelog("Sending resync packet\n");
err = send(ctlSock, (char*) payload, sizeof(payload), 0);
if (err != sizeof(payload)) {
Limelog("Resync thread terminating #2\n");
listenerCallbacks->connectionTerminated(err);
return;
}
// Done capturing the parameters
PltClearEvent(&resyncEvent);
response = readNvctlPacket();
// Send the resync request and read the response
response = sendMessage(PTYPE_RESYNC, PPAYLEN_RESYNC, payload);
if (response == NULL) {
Limelog("Resync thread terminating #3\n");
Limelog("Resync thread terminating #1\n");
listenerCallbacks->connectionTerminated(LastSocketError());
return;
}
Limelog("Resync complete\n");
PltClearEvent(&resyncEvent);
}
}
int stopControlStream(void) {
PltInterruptThread(&heartbeatThread);
PltInterruptThread(&jitterThread);
PltInterruptThread(&lossStatsThread);
PltInterruptThread(&resyncThread);
if (ctlSock != INVALID_SOCKET) {
@@ -197,12 +193,10 @@ int stopControlStream(void) {
ctlSock = INVALID_SOCKET;
}
PltJoinThread(&heartbeatThread);
PltJoinThread(&jitterThread);
PltJoinThread(&lossStatsThread);
PltJoinThread(&resyncThread);
PltCloseThread(&heartbeatThread);
PltCloseThread(&jitterThread);
PltCloseThread(&lossStatsThread);
PltCloseThread(&resyncThread);
return 0;
@@ -210,8 +204,6 @@ int stopControlStream(void) {
int startControlStream(void) {
int err;
char* config;
int configSize;
PNVCTL_PACKET_HEADER response;
ctlSock = connectTcpSocket(host, 47995);
@@ -221,39 +213,21 @@ int startControlStream(void) {
enableNoDelay(ctlSock);
configSize = getConfigDataSize(&streamConfig);
config = allocateConfigDataForStreamConfig(&streamConfig);
if (config == NULL) {
return -1;
}
// Send config
err = send(ctlSock, config, configSize, 0);
free(config);
if (err != configSize) {
return LastSocketError();
}
// Ping pong
response = sendNoPayloadAndReceive(PTYPE_KEEPALIVE, PPAYLEN_KEEPALIVE);
// Send START A
response = sendMessage(PTYPE_START_STREAM_A,
PPAYLEN_START_STREAM_A, PPAYLOAD_START_STREAM_A);
if (response == NULL) {
return LastSocketError();
}
free(response);
// 1405
response = sendNoPayloadAndReceive(PTYPE_1405, PPAYLEN_1405);
// Send START B
response = sendMessage(PTYPE_START_STREAM_B,
PPAYLEN_START_STREAM_B, PPAYLOAD_START_STREAM_B);
if (response == NULL) {
return LastSocketError();
}
free(response);
err = PltCreateThread(heartbeatThreadFunc, NULL, &heartbeatThread);
if (err != 0) {
return err;
}
err = PltCreateThread(jitterThreadFunc, NULL, &jitterThread);
err = PltCreateThread(lossStatsThreadFunc, NULL, &lossStatsThread);
if (err != 0) {
return err;
}