From c7c929d7512464bc5bd90693f6c8683bf8719dc5 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 29 Mar 2014 14:20:36 -0400 Subject: [PATCH] Add connection listener callbacks (WIP) --- limelight-common/AudioStream.c | 13 ++++-- limelight-common/Connection.c | 61 ++++++++++++++++++++------- limelight-common/ControlStream.c | 15 +++++-- limelight-common/Limelight-internal.h | 6 +-- limelight-common/Limelight.h | 37 +++++++++++++++- limelight-common/VideoStream.c | 14 ++++-- 6 files changed, 114 insertions(+), 32 deletions(-) diff --git a/limelight-common/AudioStream.c b/limelight-common/AudioStream.c index 980bd3d..a7cf0b1 100644 --- a/limelight-common/AudioStream.c +++ b/limelight-common/AudioStream.c @@ -4,6 +4,7 @@ #include "LinkedBlockingQueue.h" static AUDIO_RENDERER_CALLBACKS callbacks; +static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks; static IP_ADDRESS remoteHost; static SOCKET rtpSocket = INVALID_SOCKET; @@ -16,9 +17,10 @@ static PLT_THREAD decoderThread; #define RTP_PORT 48000 -void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks) { +void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks) { memcpy(&callbacks, arCallbacks, sizeof(callbacks)); remoteHost = host; + listenerCallbacks = clCallbacks; LbqInitializeLinkedBlockingQueue(&packetQueue, 30); } @@ -52,6 +54,7 @@ static void UdpPingThreadProc(void *context) { err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr)); if (err != sizeof(pingData)) { Limelog("UDP ping thread terminating #1\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -62,10 +65,11 @@ static void UdpPingThreadProc(void *context) { static void ReceiveThreadProc(void* context) { int err; - for (;;) { + while (!PltIsThreadInterrupted(&receiveThread)) { char* buffer = (char*) malloc(1500 + sizeof(int)); if (buffer == NULL) { Limelog("Receive thread terminating\n"); + listenerCallbacks->connectionTerminated(ERROR_OUTOFMEMORY); return; } @@ -73,6 +77,7 @@ static void ReceiveThreadProc(void* context) { if (err <= 0) { Limelog("Receive thread terminating #2\n"); free(buffer); + listenerCallbacks->connectionTerminated(err); return; } @@ -100,7 +105,7 @@ static void DecoderThreadProc(void* context) { char *data; unsigned short lastSeq = 0; - for (;;) { + while (!PltIsThreadInterrupted(&decoderThread)) { err = LbqWaitForQueueElement(&packetQueue, (void**) &data); if (err != LBQ_SUCCESS) { Limelog("Decoder thread terminating\n"); @@ -111,7 +116,7 @@ static void DecoderThreadProc(void* context) { rtp = (PRTP_PACKET) &data[sizeof(int)]; if (length < sizeof(RTP_PACKET)) { - Limelog("Runt packet\n"); + // Runt packet goto freeandcontinue; } diff --git a/limelight-common/Connection.c b/limelight-common/Connection.c index 3e002e7..d95521e 100644 --- a/limelight-common/Connection.c +++ b/limelight-common/Connection.c @@ -1,17 +1,24 @@ #include "Limelight-internal.h" #include "Platform.h" -#define STAGE_NONE 0 -#define STAGE_PLATFORM_INIT 1 -#define STAGE_HANDSHAKE 2 -#define STAGE_CONTROL_STREAM_INIT 3 -#define STAGE_VIDEO_STREAM_INIT 4 -#define STAGE_AUDIO_STREAM_INIT 5 -#define STAGE_CONTROL_STREAM_START 6 -#define STAGE_VIDEO_STREAM_START 7 -#define STAGE_AUDIO_STREAM_START 8 - static int stage = STAGE_NONE; +static CONNECTION_LISTENER_CALLBACKS ListenerCallbacks; + +static const char* stageNames [] = { + "none", + "platform initialization", + "handshake", + "control stream initialization", + "video stream initialization", + "audio stream initialization", + "control stream establishment", + "video stream establishment", + "audio stream establishment" +}; + +const char* LiGetStageName(int stage) { + return stageNames[stage]; +} void LiStopConnection(void) { if (stage == STAGE_AUDIO_STREAM_START) { @@ -65,80 +72,104 @@ void LiStopConnection(void) { LC_ASSERT(stage == STAGE_NONE); } -int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, - PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags) { +int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks, + PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags) { int err; + memcpy(&ListenerCallbacks, clCallbacks, sizeof(ListenerCallbacks)); + Limelog("Initializing platform..."); + ListenerCallbacks.stageStarting(STAGE_PLATFORM_INIT); err = initializePlatformSockets(); if (err != 0) { Limelog("failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_PLATFORM_INIT); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_PLATFORM_INIT); + ListenerCallbacks.stageComplete(STAGE_PLATFORM_INIT); Limelog("done\n"); Limelog("Starting handshake..."); + ListenerCallbacks.stageStarting(STAGE_HANDSHAKE); err = performHandshake(host); if (err != 0) { Limelog("failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_HANDSHAKE); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_HANDSHAKE); + ListenerCallbacks.stageComplete(STAGE_HANDSHAKE); Limelog("done\n"); Limelog("Initializing control stream..."); - err = initializeControlStream(host, streamConfig); + ListenerCallbacks.stageStarting(STAGE_CONTROL_STREAM_INIT); + err = initializeControlStream(host, streamConfig, &ListenerCallbacks); if (err != 0) { Limelog("failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_CONTROL_STREAM_INIT); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_CONTROL_STREAM_INIT); + ListenerCallbacks.stageComplete(STAGE_CONTROL_STREAM_INIT); Limelog("done\n"); Limelog("Initializing video stream..."); - initializeVideoStream(host, streamConfig, drCallbacks); + ListenerCallbacks.stageStarting(STAGE_VIDEO_STREAM_INIT); + initializeVideoStream(host, streamConfig, drCallbacks, &ListenerCallbacks); stage++; LC_ASSERT(stage == STAGE_VIDEO_STREAM_INIT); + ListenerCallbacks.stageComplete(STAGE_VIDEO_STREAM_INIT); Limelog("done\n"); Limelog("Initializing audio stream..."); - initializeAudioStream(host, arCallbacks); + ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_INIT); + initializeAudioStream(host, arCallbacks, &ListenerCallbacks); stage++; LC_ASSERT(stage == STAGE_AUDIO_STREAM_INIT); + ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_INIT); Limelog("done\n"); Limelog("Starting control stream..."); + ListenerCallbacks.stageStarting(STAGE_CONTROL_STREAM_START); err = startControlStream(); if (err != 0) { Limelog("failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_CONTROL_STREAM_START); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_CONTROL_STREAM_START); + ListenerCallbacks.stageComplete(STAGE_CONTROL_STREAM_START); Limelog("done\n"); Limelog("Starting video stream..."); + ListenerCallbacks.stageStarting(STAGE_VIDEO_STREAM_START); err = startVideoStream(renderContext, drFlags); if (err != 0) { Limelog("Video stream start failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_VIDEO_STREAM_START); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_VIDEO_STREAM_START); + ListenerCallbacks.stageComplete(STAGE_VIDEO_STREAM_START); Limelog("done\n"); Limelog("Starting audio stream..."); + ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_START); err = startAudioStream(); if (err != 0) { Limelog("Audio stream start failed: %d\n", err); + ListenerCallbacks.stageFailed(STAGE_AUDIO_STREAM_START); goto Cleanup; } stage++; LC_ASSERT(stage == STAGE_AUDIO_STREAM_START); + ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_START); Limelog("done\n"); Cleanup: diff --git a/limelight-common/ControlStream.c b/limelight-common/ControlStream.c index feb42fb..a327bdd 100644 --- a/limelight-common/ControlStream.c +++ b/limelight-common/ControlStream.c @@ -14,6 +14,7 @@ static PLT_THREAD heartbeatThread; static PLT_THREAD jitterThread; static PLT_THREAD resyncThread; static PLT_EVENT resyncEvent; +static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks; static const short PTYPE_KEEPALIVE = 0x13ff; static const short PPAYLEN_KEEPALIVE = 0x0000; @@ -30,12 +31,13 @@ static const short PPAYLEN_RESYNC = 16; static const short PTYPE_JITTER = 0x140c; static const short PPAYLEN_JITTER = 0x10; -int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr) { +int initializeControlStream(IP_ADDRESS addr, PSTREAM_CONFIGURATION streamConfigPtr, PCONNECTION_LISTENER_CALLBACKS clCallbacks) { memcpy(&streamConfig, streamConfigPtr, sizeof(*streamConfigPtr)); PltCreateEvent(&resyncEvent); host = addr; + listenerCallbacks = clCallbacks; return 0; } @@ -99,12 +101,13 @@ static void heartbeatThreadFunc(void* context) { int err; NVCTL_PACKET_HEADER header; + header.type = PTYPE_HEARTBEAT; + header.payloadLength = PPAYLEN_HEARTBEAT; while (!PltIsThreadInterrupted(&heartbeatThread)) { - header.type = PTYPE_HEARTBEAT; - header.payloadLength = PPAYLEN_HEARTBEAT; err = send(ctlSock, (char*) &header, sizeof(header), 0); if (err != sizeof(header)) { Limelog("Heartbeat thread terminating #1\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -123,6 +126,7 @@ static void jitterThreadFunc(void* context) { err = send(ctlSock, (char*) &header, sizeof(header), 0); if (err != sizeof(header)) { Limelog("Jitter thread terminating #1\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -134,6 +138,7 @@ static void jitterThreadFunc(void* context) { err = send(ctlSock, (char*) payload, sizeof(payload), 0); if (err != sizeof(payload)) { Limelog("Jitter thread terminating #2\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -148,12 +153,13 @@ static void resyncThreadFunc(void* context) { header.type = PTYPE_RESYNC; header.payloadLength = PPAYLEN_RESYNC; - for (;;) { + while (!PltIsThreadInterrupted(&resyncThread)) { PltWaitForEvent(&resyncEvent); err = send(ctlSock, (char*) &header, sizeof(header), 0); if (err != sizeof(header)) { Limelog("Resync thread terminating #1\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -164,6 +170,7 @@ static void resyncThreadFunc(void* context) { err = send(ctlSock, (char*) payload, sizeof(payload), 0); if (err != sizeof(payload)) { Limelog("Resync thread terminating #2\n"); + listenerCallbacks->connectionTerminated(err); return; } diff --git a/limelight-common/Limelight-internal.h b/limelight-common/Limelight-internal.h index c858639..5f3e46b 100644 --- a/limelight-common/Limelight-internal.h +++ b/limelight-common/Limelight-internal.h @@ -8,7 +8,7 @@ char* allocateConfigDataForStreamConfig(PSTREAM_CONFIGURATION streamConfig); int getConfigDataSize(PSTREAM_CONFIGURATION streamConfig); -int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig); +int initializeControlStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks); int startControlStream(void); int stopControlStream(void); void destroyControlStream(void); @@ -25,12 +25,12 @@ int getNextDecodeUnit(PDECODE_UNIT *du); void freeDecodeUnit(PDECODE_UNIT decodeUnit); void queueRtpPacket(PRTP_PACKET rtpPacket, int length); -void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks); +void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks); void destroyVideoStream(void); int startVideoStream(void* rendererContext, int drFlags); void stopVideoStream(void); -void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks); +void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks); void destroyAudioStream(void); int startAudioStream(void); void stopAudioStream(void); \ No newline at end of file diff --git a/limelight-common/Limelight.h b/limelight-common/Limelight.h index 5d2dbef..20c9cf6 100644 --- a/limelight-common/Limelight.h +++ b/limelight-common/Limelight.h @@ -51,9 +51,42 @@ typedef struct _AUDIO_RENDERER_CALLBACKS { AudioRendererDecodeAndPlaySample decodeAndPlaySample; } AUDIO_RENDERER_CALLBACKS, *PAUDIO_RENDERER_CALLBACKS; -int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, - PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags); +// Subject to change in future releases +// Use LiGetStageName() for stable stage names +#define STAGE_NONE 0 +#define STAGE_PLATFORM_INIT 1 +#define STAGE_HANDSHAKE 2 +#define STAGE_CONTROL_STREAM_INIT 3 +#define STAGE_VIDEO_STREAM_INIT 4 +#define STAGE_AUDIO_STREAM_INIT 5 +#define STAGE_CONTROL_STREAM_START 6 +#define STAGE_VIDEO_STREAM_START 7 +#define STAGE_AUDIO_STREAM_START 8 + +typedef void(*ConnListenerStageStarting)(int stage); +typedef void(*ConnListenerStageComplete)(int stage); +typedef void(*ConnListenerStageFailed)(int stage); + +typedef void(*ConnListenerConnectionStarted)(void); +typedef void(*ConnListenerConnectionTerminated)(int errorCode); + +typedef void(*ConnListenerDisplayMessage)(char* message); +typedef void(*ConnListenerDisplayTransientMessage)(char* message); + +typedef struct _CONNECTION_LISTENER_CALLBACKS { + ConnListenerStageStarting stageStarting; + ConnListenerStageComplete stageComplete; + ConnListenerStageFailed stageFailed; + ConnListenerConnectionStarted connectionStarted; + ConnListenerConnectionTerminated connectionTerminated; + ConnListenerDisplayMessage displayMessage; + ConnListenerDisplayTransientMessage displayTransientMessage; +} CONNECTION_LISTENER_CALLBACKS, *PCONNECTION_LISTENER_CALLBACKS; + +int LiStartConnection(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks, + PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags); void LiStopConnection(void); +const char* LiGetStageName(int stage); #ifdef __cplusplus } diff --git a/limelight-common/VideoStream.c b/limelight-common/VideoStream.c index afb9312..d3f31c4 100644 --- a/limelight-common/VideoStream.c +++ b/limelight-common/VideoStream.c @@ -11,6 +11,7 @@ static DECODER_RENDERER_CALLBACKS callbacks; static STREAM_CONFIGURATION configuration; static IP_ADDRESS remoteHost; +static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks; static SOCKET rtpSocket = INVALID_SOCKET; static SOCKET firstFrameSocket = INVALID_SOCKET; @@ -22,10 +23,12 @@ static PLT_THREAD receiveThread; static PLT_THREAD depacketizerThread; static PLT_THREAD decoderThread; -void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks) { +void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, + PCONNECTION_LISTENER_CALLBACKS clCallbacks) { memcpy(&callbacks, drCallbacks, sizeof(callbacks)); memcpy(&configuration, streamConfig, sizeof(configuration)); remoteHost = host; + listenerCallbacks = clCallbacks; LbqInitializeLinkedBlockingQueue(&packetQueue, 30); @@ -63,6 +66,7 @@ static void UdpPingThreadProc(void *context) { err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, sizeof(saddr)); if (err != sizeof(pingData)) { Limelog("UDP ping thread terminating #1\n"); + listenerCallbacks->connectionTerminated(err); return; } @@ -73,10 +77,11 @@ static void UdpPingThreadProc(void *context) { static void ReceiveThreadProc(void* context) { int err; - for (;;) { + while (!PltIsThreadInterrupted(&receiveThread)) { char* buffer = (char*) malloc(1500 + sizeof(int)); if (buffer == NULL) { Limelog("Receive thread terminating\n"); + listenerCallbacks->connectionTerminated(ERROR_OUTOFMEMORY); return; } @@ -84,6 +89,7 @@ static void ReceiveThreadProc(void* context) { if (err <= 0) { Limelog("Receive thread terminating #2\n"); free(buffer); + listenerCallbacks->connectionTerminated(err); return; } @@ -109,7 +115,7 @@ static void DepacketizerThreadProc(void* context) { int err; char *data; - for (;;) { + while (!PltIsThreadInterrupted(&depacketizerThread)) { err = LbqWaitForQueueElement(&packetQueue, (void**)&data); if (err != LBQ_SUCCESS) { Limelog("Depacketizer thread terminating\n"); @@ -125,7 +131,7 @@ static void DepacketizerThreadProc(void* context) { static void DecoderThreadProc(void* context) { PDECODE_UNIT du; - for (;;) { + while (!PltIsThreadInterrupted(&decoderThread)) { if (!getNextDecodeUnit(&du)) { printf("Decoder thread terminating\n"); return;