diff --git a/src/ControlStream.c b/src/ControlStream.c index d037351..c010124 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -38,8 +38,8 @@ static bool usePeriodicPing; static PLT_THREAD lossStatsThread; static PLT_THREAD invalidateRefFramesThread; +static PLT_THREAD requestIdrFrameThread; static PLT_THREAD controlReceiveThread; -static PLT_EVENT invalidateRefFramesEvent; static int lossCountSinceLastReport; static int lastGoodFrame; static int lastSeenFrame; @@ -54,8 +54,8 @@ static int lastIntervalLossPercentage; static int lastConnectionStatusUpdate; static int currentEnetSequenceNumber; -static bool idrFrameRequired; static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples; +static PLT_EVENT idrFrameRequiredEvent; static PPLT_CRYPTO_CONTEXT encryptionCtx; static PPLT_CRYPTO_CONTEXT decryptionCtx; @@ -211,7 +211,7 @@ static bool supportsIdrFrameRequest; // Initializes the control stream int initializeControlStream(void) { stopping = false; - PltCreateEvent(&invalidateRefFramesEvent); + PltCreateEvent(&idrFrameRequiredEvent); LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20); PltCreateMutex(&enetMutex); @@ -250,7 +250,6 @@ int initializeControlStream(void) { } } - idrFrameRequired = false; lastGoodFrame = 0; lastSeenFrame = 0; lossCountSinceLastReport = 0; @@ -283,16 +282,11 @@ void destroyControlStream(void) { LC_ASSERT(stopping); PltDestroyCryptoContext(encryptionCtx); PltDestroyCryptoContext(decryptionCtx); - PltCloseEvent(&invalidateRefFramesEvent); + PltCloseEvent(&idrFrameRequiredEvent); freeFrameInvalidationList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples)); PltDeleteMutex(&enetMutex); } -int getNextFrameInvalidationTuple(PQUEUED_FRAME_INVALIDATION_TUPLE* qfit) { - int err = LbqPollQueueElement(&invalidReferenceFrameTuples, (void**)qfit); - return (err == LBQ_SUCCESS); -} - void queueFrameInvalidationTuple(int startFrame, int endFrame) { LC_ASSERT(startFrame <= endFrame); @@ -304,25 +298,28 @@ void queueFrameInvalidationTuple(int startFrame, int endFrame) { qfit->endFrame = endFrame; if (LbqOfferQueueItem(&invalidReferenceFrameTuples, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) { // Too many invalidation tuples, so we need an IDR frame now + Limelog("RFI range list reached maximum size limit\n"); free(qfit); - idrFrameRequired = true; + requestIdrOnDemand(); } } else { - idrFrameRequired = true; + requestIdrOnDemand(); } } else { - idrFrameRequired = true; + requestIdrOnDemand(); } - - PltSetEvent(&invalidateRefFramesEvent); } // Request an IDR frame on demand by the decoder void requestIdrOnDemand(void) { - idrFrameRequired = true; - PltSetEvent(&invalidateRefFramesEvent); + // Any reference frame invalidation requests should be dropped now. + // We require a full IDR frame to recover. + freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); + + // Request the IDR frame + PltSetEvent(&idrFrameRequiredEvent); } // Invalidate reference frames lost by the network @@ -955,30 +952,12 @@ static void requestIdrFrame(void) { Limelog("IDR frame request sent\n"); } -static void requestInvalidateReferenceFrames(void) { +static void requestInvalidateReferenceFrames(int startFrame, int endFrame) { int64_t payload[3]; - PQUEUED_FRAME_INVALIDATION_TUPLE qfit; - int startFrame; - int endFrame; + LC_ASSERT(startFrame <= endFrame); LC_ASSERT(isReferenceFrameInvalidationEnabled()); - if (!getNextFrameInvalidationTuple(&qfit)) { - return; - } - - LC_ASSERT(qfit->startFrame <= qfit->endFrame); - - startFrame = qfit->startFrame; - endFrame = qfit->endFrame; - - // Aggregate all lost frames into one range - do { - LC_ASSERT(qfit->endFrame >= endFrame); - endFrame = qfit->endFrame; - free(qfit); - } while (getNextFrameInvalidationTuple(&qfit)); - payload[0] = LE64(startFrame); payload[1] = LE64(endFrame); payload[2] = 0; @@ -995,32 +974,49 @@ static void requestInvalidateReferenceFrames(void) { } static void invalidateRefFramesFunc(void* context) { + LC_ASSERT(isReferenceFrameInvalidationEnabled()); + while (!PltIsThreadInterrupted(&invalidateRefFramesThread)) { - // Wait for a request to invalidate reference frames - PltWaitForEvent(&invalidateRefFramesEvent); - PltClearEvent(&invalidateRefFramesEvent); - - // Bail if we've been shutdown + PQUEUED_FRAME_INVALIDATION_TUPLE qfit; + int startFrame; + int endFrame; + + // Wait for a reference frame invalidation request or a request to shutdown + if (LbqWaitForQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) != LBQ_SUCCESS) { + // Bail if we're stopping + return; + } + + startFrame = qfit->startFrame; + endFrame = qfit->endFrame; + + // Aggregate all lost frames into one range + do { + LC_ASSERT(qfit->endFrame >= endFrame); + endFrame = qfit->endFrame; + free(qfit); + } while (LbqPollQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) == LBQ_SUCCESS); + + // Send the reference frame invalidation request + requestInvalidateReferenceFrames(startFrame, endFrame); + } +} + +static void requestIdrFrameFunc(void* context) { + while (!PltIsThreadInterrupted(&requestIdrFrameThread)) { + PltWaitForEvent(&idrFrameRequiredEvent); + PltClearEvent(&idrFrameRequiredEvent); + if (stopping) { - break; + // Bail if we're stopping + return; } - // Sometimes we absolutely need an IDR frame - if (idrFrameRequired) { - // Empty invalidate reference frames tuples - PQUEUED_FRAME_INVALIDATION_TUPLE qfit; - while (getNextFrameInvalidationTuple(&qfit)) { - free(qfit); - } + // Any pending reference frame invalidation requests are now redundant + freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); - // Send an IDR frame request - idrFrameRequired = false; - requestIdrFrame(); - } - else { - // Otherwise invalidate reference frames - requestInvalidateReferenceFrames(); - } + // Request the IDR frame + requestIdrFrame(); } } @@ -1028,7 +1024,7 @@ static void invalidateRefFramesFunc(void* context) { int stopControlStream(void) { stopping = true; LbqSignalQueueShutdown(&invalidReferenceFrameTuples); - PltSetEvent(&invalidateRefFramesEvent); + PltSetEvent(&idrFrameRequiredEvent); // This must be set to stop in a timely manner LC_ASSERT(ConnectionInterrupted); @@ -1038,17 +1034,24 @@ int stopControlStream(void) { } PltInterruptThread(&lossStatsThread); - PltInterruptThread(&invalidateRefFramesThread); + PltInterruptThread(&requestIdrFrameThread); PltInterruptThread(&controlReceiveThread); PltJoinThread(&lossStatsThread); - PltJoinThread(&invalidateRefFramesThread); + PltJoinThread(&requestIdrFrameThread); PltJoinThread(&controlReceiveThread); PltCloseThread(&lossStatsThread); - PltCloseThread(&invalidateRefFramesThread); + PltCloseThread(&requestIdrFrameThread); PltCloseThread(&controlReceiveThread); + // We will only have an RFI thread if RFI is enabled + if (isReferenceFrameInvalidationEnabled()) { + PltInterruptThread(&invalidateRefFramesThread); + PltJoinThread(&invalidateRefFramesThread); + PltCloseThread(&invalidateRefFramesThread); + } + if (peer != NULL) { // We use enet_peer_disconnect_now() so the host knows immediately // of our termination and can cleanup properly for reconnection. @@ -1246,7 +1249,7 @@ int startControlStream(void) { return err; } - err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread); + err = PltCreateThread("ReqIdrFrame", requestIdrFrameFunc, NULL, &requestIdrFrameThread); if (err != 0) { stopping = true; @@ -1279,5 +1282,46 @@ int startControlStream(void) { return err; } + // Only create the reference frame invalidation thread if RFI is enabled + if (isReferenceFrameInvalidationEnabled()) { + err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread); + if (err != 0) { + stopping = true; + PltSetEvent(&idrFrameRequiredEvent); + + if (ctlSock != INVALID_SOCKET) { + shutdownTcpSocket(ctlSock); + } + else { + ConnectionInterrupted = true; + } + + PltInterruptThread(&lossStatsThread); + PltJoinThread(&lossStatsThread); + PltCloseThread(&lossStatsThread); + + PltInterruptThread(&controlReceiveThread); + PltJoinThread(&controlReceiveThread); + PltCloseThread(&controlReceiveThread); + + PltInterruptThread(&requestIdrFrameThread); + PltJoinThread(&requestIdrFrameThread); + PltCloseThread(&requestIdrFrameThread); + + if (ctlSock != INVALID_SOCKET) { + closeSocket(ctlSock); + ctlSock = INVALID_SOCKET; + } + else { + enet_peer_disconnect_now(peer, 0); + peer = NULL; + enet_host_destroy(client); + client = NULL; + } + + return err; + } + } + return 0; }