Add connection listener callbacks (WIP)

This commit is contained in:
Cameron Gutman 2014-03-29 14:20:36 -04:00
parent 0f586ce05f
commit c7c929d751
6 changed files with 114 additions and 32 deletions

View File

@ -4,6 +4,7 @@
#include "LinkedBlockingQueue.h"
static AUDIO_RENDERER_CALLBACKS callbacks;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static IP_ADDRESS remoteHost;
static SOCKET rtpSocket = INVALID_SOCKET;
@ -16,9 +17,10 @@ static PLT_THREAD decoderThread;
#define RTP_PORT 48000
void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks) {
void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
memcpy(&callbacks, arCallbacks, sizeof(callbacks));
remoteHost = host;
listenerCallbacks = clCallbacks;
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
}
@ -52,6 +54,7 @@ static void UdpPingThreadProc(void *context) {
err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr));
if (err != sizeof(pingData)) {
Limelog("UDP ping thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -62,10 +65,11 @@ static void UdpPingThreadProc(void *context) {
static void ReceiveThreadProc(void* context) {
int err;
for (;;) {
while (!PltIsThreadInterrupted(&receiveThread)) {
char* buffer = (char*) malloc(1500 + sizeof(int));
if (buffer == NULL) {
Limelog("Receive thread terminating\n");
listenerCallbacks->connectionTerminated(ERROR_OUTOFMEMORY);
return;
}
@ -73,6 +77,7 @@ static void ReceiveThreadProc(void* context) {
if (err <= 0) {
Limelog("Receive thread terminating #2\n");
free(buffer);
listenerCallbacks->connectionTerminated(err);
return;
}
@ -100,7 +105,7 @@ static void DecoderThreadProc(void* context) {
char *data;
unsigned short lastSeq = 0;
for (;;) {
while (!PltIsThreadInterrupted(&decoderThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**) &data);
if (err != LBQ_SUCCESS) {
Limelog("Decoder thread terminating\n");
@ -111,7 +116,7 @@ static void DecoderThreadProc(void* context) {
rtp = (PRTP_PACKET) &data[sizeof(int)];
if (length < sizeof(RTP_PACKET)) {
Limelog("Runt packet\n");
// Runt packet
goto freeandcontinue;
}

View File

@ -1,17 +1,24 @@
#include "Limelight-internal.h"
#include "Platform.h"
#define STAGE_NONE 0
#define STAGE_PLATFORM_INIT 1
#define STAGE_HANDSHAKE 2
#define STAGE_CONTROL_STREAM_INIT 3
#define STAGE_VIDEO_STREAM_INIT 4
#define STAGE_AUDIO_STREAM_INIT 5
#define STAGE_CONTROL_STREAM_START 6
#define STAGE_VIDEO_STREAM_START 7
#define STAGE_AUDIO_STREAM_START 8
static int stage = STAGE_NONE;
static CONNECTION_LISTENER_CALLBACKS ListenerCallbacks;
static const char* stageNames [] = {
"none",
"platform initialization",
"handshake",
"control stream initialization",
"video stream initialization",
"audio stream initialization",
"control stream establishment",
"video stream establishment",
"audio stream establishment"
};
const char* LiGetStageName(int stage) {
return stageNames[stage];
}
void LiStopConnection(void) {
if (stage == STAGE_AUDIO_STREAM_START) {
@ -65,80 +72,104 @@ void LiStopConnection(void) {
LC_ASSERT(stage == STAGE_NONE);
}
int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks,
PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags) {
int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks,
PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags) {
int err;
memcpy(&ListenerCallbacks, clCallbacks, sizeof(ListenerCallbacks));
Limelog("Initializing platform...");
ListenerCallbacks.stageStarting(STAGE_PLATFORM_INIT);
err = initializePlatformSockets();
if (err != 0) {
Limelog("failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_PLATFORM_INIT);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_PLATFORM_INIT);
ListenerCallbacks.stageComplete(STAGE_PLATFORM_INIT);
Limelog("done\n");
Limelog("Starting handshake...");
ListenerCallbacks.stageStarting(STAGE_HANDSHAKE);
err = performHandshake(host);
if (err != 0) {
Limelog("failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_HANDSHAKE);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_HANDSHAKE);
ListenerCallbacks.stageComplete(STAGE_HANDSHAKE);
Limelog("done\n");
Limelog("Initializing control stream...");
err = initializeControlStream(host, streamConfig);
ListenerCallbacks.stageStarting(STAGE_CONTROL_STREAM_INIT);
err = initializeControlStream(host, streamConfig, &ListenerCallbacks);
if (err != 0) {
Limelog("failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_CONTROL_STREAM_INIT);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_CONTROL_STREAM_INIT);
ListenerCallbacks.stageComplete(STAGE_CONTROL_STREAM_INIT);
Limelog("done\n");
Limelog("Initializing video stream...");
initializeVideoStream(host, streamConfig, drCallbacks);
ListenerCallbacks.stageStarting(STAGE_VIDEO_STREAM_INIT);
initializeVideoStream(host, streamConfig, drCallbacks, &ListenerCallbacks);
stage++;
LC_ASSERT(stage == STAGE_VIDEO_STREAM_INIT);
ListenerCallbacks.stageComplete(STAGE_VIDEO_STREAM_INIT);
Limelog("done\n");
Limelog("Initializing audio stream...");
initializeAudioStream(host, arCallbacks);
ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_INIT);
initializeAudioStream(host, arCallbacks, &ListenerCallbacks);
stage++;
LC_ASSERT(stage == STAGE_AUDIO_STREAM_INIT);
ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_INIT);
Limelog("done\n");
Limelog("Starting control stream...");
ListenerCallbacks.stageStarting(STAGE_CONTROL_STREAM_START);
err = startControlStream();
if (err != 0) {
Limelog("failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_CONTROL_STREAM_START);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_CONTROL_STREAM_START);
ListenerCallbacks.stageComplete(STAGE_CONTROL_STREAM_START);
Limelog("done\n");
Limelog("Starting video stream...");
ListenerCallbacks.stageStarting(STAGE_VIDEO_STREAM_START);
err = startVideoStream(renderContext, drFlags);
if (err != 0) {
Limelog("Video stream start failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_VIDEO_STREAM_START);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_VIDEO_STREAM_START);
ListenerCallbacks.stageComplete(STAGE_VIDEO_STREAM_START);
Limelog("done\n");
Limelog("Starting audio stream...");
ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_START);
err = startAudioStream();
if (err != 0) {
Limelog("Audio stream start failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_AUDIO_STREAM_START);
goto Cleanup;
}
stage++;
LC_ASSERT(stage == STAGE_AUDIO_STREAM_START);
ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_START);
Limelog("done\n");
Cleanup:

View File

@ -14,6 +14,7 @@ static PLT_THREAD heartbeatThread;
static PLT_THREAD jitterThread;
static PLT_THREAD resyncThread;
static PLT_EVENT resyncEvent;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static const short PTYPE_KEEPALIVE = 0x13ff;
static const short PPAYLEN_KEEPALIVE = 0x0000;
@ -30,12 +31,13 @@ static const short PPAYLEN_RESYNC = 16;
static const short PTYPE_JITTER = 0x140c;
static const short PPAYLEN_JITTER = 0x10;
int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr) {
int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr, PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr));
PltCreateEvent(&resyncEvent);
host = addr;
listenerCallbacks = clCallbacks;
return 0;
}
@ -99,12 +101,13 @@ static void heartbeatThreadFunc(void* context) {
int err;
NVCTL_PACKET_HEADER header;
header.type = PTYPE_HEARTBEAT;
header.payloadLength = PPAYLEN_HEARTBEAT;
while (!PltIsThreadInterrupted(&heartbeatThread)) {
header.type = PTYPE_HEARTBEAT;
header.payloadLength = PPAYLEN_HEARTBEAT;
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Heartbeat thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -123,6 +126,7 @@ static void jitterThreadFunc(void* context) {
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Jitter thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -134,6 +138,7 @@ static void jitterThreadFunc(void* context) {
err = send(ctlSock, (char*) payload, sizeof(payload), 0);
if (err != sizeof(payload)) {
Limelog("Jitter thread terminating #2\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -148,12 +153,13 @@ static void resyncThreadFunc(void* context) {
header.type = PTYPE_RESYNC;
header.payloadLength = PPAYLEN_RESYNC;
for (;;) {
while (!PltIsThreadInterrupted(&resyncThread)) {
PltWaitForEvent(&resyncEvent);
err = send(ctlSock, (char*) &header, sizeof(header), 0);
if (err != sizeof(header)) {
Limelog("Resync thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -164,6 +170,7 @@ static void resyncThreadFunc(void* context) {
err = send(ctlSock, (char*) payload, sizeof(payload), 0);
if (err != sizeof(payload)) {
Limelog("Resync thread terminating #2\n");
listenerCallbacks->connectionTerminated(err);
return;
}

View File

@ -8,7 +8,7 @@
char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig);
int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig);
int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig);
int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks);
int startControlStream(void);
int stopControlStream(void);
void destroyControlStream(void);
@ -25,12 +25,12 @@ int getNextDecodeUnit(PDECODE_UNIT *du);
void freeDecodeUnit(PDECODE_UNIT decodeUnit);
void queueRtpPacket(PRTP_PACKET rtpPacket, int length);
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks);
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks);
void destroyVideoStream(void);
int startVideoStream(void* rendererContext, int drFlags);
void stopVideoStream(void);
void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks);
void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks);
void destroyAudioStream(void);
int startAudioStream(void);
void stopAudioStream(void);

View File

@ -51,9 +51,42 @@ typedef struct _AUDIO_RENDERER_CALLBACKS {
AudioRendererDecodeAndPlaySample decodeAndPlaySample;
} AUDIO_RENDERER_CALLBACKS, *PAUDIO_RENDERER_CALLBACKS;
int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks,
PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags);
// Subject to change in future releases
// Use LiGetStageName() for stable stage names
#define STAGE_NONE 0
#define STAGE_PLATFORM_INIT 1
#define STAGE_HANDSHAKE 2
#define STAGE_CONTROL_STREAM_INIT 3
#define STAGE_VIDEO_STREAM_INIT 4
#define STAGE_AUDIO_STREAM_INIT 5
#define STAGE_CONTROL_STREAM_START 6
#define STAGE_VIDEO_STREAM_START 7
#define STAGE_AUDIO_STREAM_START 8
typedef void(*ConnListenerStageStarting)(int stage);
typedef void(*ConnListenerStageComplete)(int stage);
typedef void(*ConnListenerStageFailed)(int stage);
typedef void(*ConnListenerConnectionStarted)(void);
typedef void(*ConnListenerConnectionTerminated)(int errorCode);
typedef void(*ConnListenerDisplayMessage)(char* message);
typedef void(*ConnListenerDisplayTransientMessage)(char* message);
typedef struct _CONNECTION_LISTENER_CALLBACKS {
ConnListenerStageStarting stageStarting;
ConnListenerStageComplete stageComplete;
ConnListenerStageFailed stageFailed;
ConnListenerConnectionStarted connectionStarted;
ConnListenerConnectionTerminated connectionTerminated;
ConnListenerDisplayMessage displayMessage;
ConnListenerDisplayTransientMessage displayTransientMessage;
} CONNECTION_LISTENER_CALLBACKS, *PCONNECTION_LISTENER_CALLBACKS;
int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks,
PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags);
void LiStopConnection(void);
const char* LiGetStageName(int stage);
#ifdef __cplusplus
}

View File

@ -11,6 +11,7 @@
static DECODER_RENDERER_CALLBACKS callbacks;
static STREAM_CONFIGURATION configuration;
static IP_ADDRESS remoteHost;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static SOCKET rtpSocket = INVALID_SOCKET;
static SOCKET firstFrameSocket = INVALID_SOCKET;
@ -22,10 +23,12 @@ static PLT_THREAD receiveThread;
static PLT_THREAD depacketizerThread;
static PLT_THREAD decoderThread;
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks) {
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks,
PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
memcpy(&callbacks, drCallbacks, sizeof(callbacks));
memcpy(&configuration, streamConfig, sizeof(configuration));
remoteHost = host;
listenerCallbacks = clCallbacks;
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
@ -63,6 +66,7 @@ static void UdpPingThreadProc(void *context) {
err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr));
if (err != sizeof(pingData)) {
Limelog("UDP ping thread terminating #1\n");
listenerCallbacks->connectionTerminated(err);
return;
}
@ -73,10 +77,11 @@ static void UdpPingThreadProc(void *context) {
static void ReceiveThreadProc(void* context) {
int err;
for (;;) {
while (!PltIsThreadInterrupted(&receiveThread)) {
char* buffer = (char*) malloc(1500 + sizeof(int));
if (buffer == NULL) {
Limelog("Receive thread terminating\n");
listenerCallbacks->connectionTerminated(ERROR_OUTOFMEMORY);
return;
}
@ -84,6 +89,7 @@ static void ReceiveThreadProc(void* context) {
if (err <= 0) {
Limelog("Receive thread terminating #2\n");
free(buffer);
listenerCallbacks->connectionTerminated(err);
return;
}
@ -109,7 +115,7 @@ static void DepacketizerThreadProc(void* context) {
int err;
char *data;
for (;;) {
while (!PltIsThreadInterrupted(&depacketizerThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**)&data);
if (err != LBQ_SUCCESS) {
Limelog("Depacketizer thread terminating\n");
@ -125,7 +131,7 @@ static void DepacketizerThreadProc(void* context) {
static void DecoderThreadProc(void* context) {
PDECODE_UNIT du;
for (;;) {
while (!PltIsThreadInterrupted(&decoderThread)) {
if (!getNextDecodeUnit(&du)) {
printf("Decoder thread terminating\n");
return;