Refactor IDR/RFI requests to simplify code and fix race conditions

This commit is contained in:
Cameron Gutman 2021-05-14 20:41:20 -05:00
parent cd62147cdf
commit a1a150c300

View File

@ -38,8 +38,8 @@ static bool usePeriodicPing;
static PLT_THREAD lossStatsThread; static PLT_THREAD lossStatsThread;
static PLT_THREAD invalidateRefFramesThread; static PLT_THREAD invalidateRefFramesThread;
static PLT_THREAD requestIdrFrameThread;
static PLT_THREAD controlReceiveThread; static PLT_THREAD controlReceiveThread;
static PLT_EVENT invalidateRefFramesEvent;
static int lossCountSinceLastReport; static int lossCountSinceLastReport;
static int lastGoodFrame; static int lastGoodFrame;
static int lastSeenFrame; static int lastSeenFrame;
@ -54,8 +54,8 @@ static int lastIntervalLossPercentage;
static int lastConnectionStatusUpdate; static int lastConnectionStatusUpdate;
static int currentEnetSequenceNumber; static int currentEnetSequenceNumber;
static bool idrFrameRequired;
static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples; static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
static PLT_EVENT idrFrameRequiredEvent;
static PPLT_CRYPTO_CONTEXT encryptionCtx; static PPLT_CRYPTO_CONTEXT encryptionCtx;
static PPLT_CRYPTO_CONTEXT decryptionCtx; static PPLT_CRYPTO_CONTEXT decryptionCtx;
@ -211,7 +211,7 @@ static bool supportsIdrFrameRequest;
// Initializes the control stream // Initializes the control stream
int initializeControlStream(void) { int initializeControlStream(void) {
stopping = false; stopping = false;
PltCreateEvent(&invalidateRefFramesEvent); PltCreateEvent(&idrFrameRequiredEvent);
LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20); LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20);
PltCreateMutex(&enetMutex); PltCreateMutex(&enetMutex);
@ -250,7 +250,6 @@ int initializeControlStream(void) {
} }
} }
idrFrameRequired = false;
lastGoodFrame = 0; lastGoodFrame = 0;
lastSeenFrame = 0; lastSeenFrame = 0;
lossCountSinceLastReport = 0; lossCountSinceLastReport = 0;
@ -283,16 +282,11 @@ void destroyControlStream(void) {
LC_ASSERT(stopping); LC_ASSERT(stopping);
PltDestroyCryptoContext(encryptionCtx); PltDestroyCryptoContext(encryptionCtx);
PltDestroyCryptoContext(decryptionCtx); PltDestroyCryptoContext(decryptionCtx);
PltCloseEvent(&invalidateRefFramesEvent); PltCloseEvent(&idrFrameRequiredEvent);
freeFrameInvalidationList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples)); freeFrameInvalidationList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
PltDeleteMutex(&enetMutex); PltDeleteMutex(&enetMutex);
} }
int getNextFrameInvalidationTuple(PQUEUED_FRAME_INVALIDATION_TUPLE* qfit) {
int err = LbqPollQueueElement(&invalidReferenceFrameTuples, (void**)qfit);
return (err == LBQ_SUCCESS);
}
void queueFrameInvalidationTuple(int startFrame, int endFrame) { void queueFrameInvalidationTuple(int startFrame, int endFrame) {
LC_ASSERT(startFrame <= endFrame); LC_ASSERT(startFrame <= endFrame);
@ -304,25 +298,28 @@ void queueFrameInvalidationTuple(int startFrame, int endFrame) {
qfit->endFrame = endFrame; qfit->endFrame = endFrame;
if (LbqOfferQueueItem(&invalidReferenceFrameTuples, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) { if (LbqOfferQueueItem(&invalidReferenceFrameTuples, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) {
// Too many invalidation tuples, so we need an IDR frame now // Too many invalidation tuples, so we need an IDR frame now
Limelog("RFI range list reached maximum size limit\n");
free(qfit); free(qfit);
idrFrameRequired = true; requestIdrOnDemand();
} }
} }
else { else {
idrFrameRequired = true; requestIdrOnDemand();
} }
} }
else { else {
idrFrameRequired = true; requestIdrOnDemand();
} }
PltSetEvent(&invalidateRefFramesEvent);
} }
// Request an IDR frame on demand by the decoder // Request an IDR frame on demand by the decoder
void requestIdrOnDemand(void) { void requestIdrOnDemand(void) {
idrFrameRequired = true; // Any reference frame invalidation requests should be dropped now.
PltSetEvent(&invalidateRefFramesEvent); // We require a full IDR frame to recover.
freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
// Request the IDR frame
PltSetEvent(&idrFrameRequiredEvent);
} }
// Invalidate reference frames lost by the network // Invalidate reference frames lost by the network
@ -955,30 +952,12 @@ static void requestIdrFrame(void) {
Limelog("IDR frame request sent\n"); Limelog("IDR frame request sent\n");
} }
static void requestInvalidateReferenceFrames(void) { static void requestInvalidateReferenceFrames(int startFrame, int endFrame) {
int64_t payload[3]; int64_t payload[3];
PQUEUED_FRAME_INVALIDATION_TUPLE qfit;
int startFrame;
int endFrame;
LC_ASSERT(startFrame <= endFrame);
LC_ASSERT(isReferenceFrameInvalidationEnabled()); LC_ASSERT(isReferenceFrameInvalidationEnabled());
if (!getNextFrameInvalidationTuple(&qfit)) {
return;
}
LC_ASSERT(qfit->startFrame <= qfit->endFrame);
startFrame = qfit->startFrame;
endFrame = qfit->endFrame;
// Aggregate all lost frames into one range
do {
LC_ASSERT(qfit->endFrame >= endFrame);
endFrame = qfit->endFrame;
free(qfit);
} while (getNextFrameInvalidationTuple(&qfit));
payload[0] = LE64(startFrame); payload[0] = LE64(startFrame);
payload[1] = LE64(endFrame); payload[1] = LE64(endFrame);
payload[2] = 0; payload[2] = 0;
@ -995,32 +974,49 @@ static void requestInvalidateReferenceFrames(void) {
} }
static void invalidateRefFramesFunc(void* context) { static void invalidateRefFramesFunc(void* context) {
LC_ASSERT(isReferenceFrameInvalidationEnabled());
while (!PltIsThreadInterrupted(&invalidateRefFramesThread)) { while (!PltIsThreadInterrupted(&invalidateRefFramesThread)) {
// Wait for a request to invalidate reference frames PQUEUED_FRAME_INVALIDATION_TUPLE qfit;
PltWaitForEvent(&invalidateRefFramesEvent); int startFrame;
PltClearEvent(&invalidateRefFramesEvent); int endFrame;
// Wait for a reference frame invalidation request or a request to shutdown
if (LbqWaitForQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) != LBQ_SUCCESS) {
// Bail if we're stopping
return;
}
startFrame = qfit->startFrame;
endFrame = qfit->endFrame;
// Aggregate all lost frames into one range
do {
LC_ASSERT(qfit->endFrame >= endFrame);
endFrame = qfit->endFrame;
free(qfit);
} while (LbqPollQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) == LBQ_SUCCESS);
// Send the reference frame invalidation request
requestInvalidateReferenceFrames(startFrame, endFrame);
}
}
static void requestIdrFrameFunc(void* context) {
while (!PltIsThreadInterrupted(&requestIdrFrameThread)) {
PltWaitForEvent(&idrFrameRequiredEvent);
PltClearEvent(&idrFrameRequiredEvent);
// Bail if we've been shutdown
if (stopping) { if (stopping) {
break; // Bail if we're stopping
return;
} }
// Sometimes we absolutely need an IDR frame // Any pending reference frame invalidation requests are now redundant
if (idrFrameRequired) { freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
// Empty invalidate reference frames tuples
PQUEUED_FRAME_INVALIDATION_TUPLE qfit;
while (getNextFrameInvalidationTuple(&qfit)) {
free(qfit);
}
// Send an IDR frame request // Request the IDR frame
idrFrameRequired = false; requestIdrFrame();
requestIdrFrame();
}
else {
// Otherwise invalidate reference frames
requestInvalidateReferenceFrames();
}
} }
} }
@ -1028,7 +1024,7 @@ static void invalidateRefFramesFunc(void* context) {
int stopControlStream(void) { int stopControlStream(void) {
stopping = true; stopping = true;
LbqSignalQueueShutdown(&invalidReferenceFrameTuples); LbqSignalQueueShutdown(&invalidReferenceFrameTuples);
PltSetEvent(&invalidateRefFramesEvent); PltSetEvent(&idrFrameRequiredEvent);
// This must be set to stop in a timely manner // This must be set to stop in a timely manner
LC_ASSERT(ConnectionInterrupted); LC_ASSERT(ConnectionInterrupted);
@ -1038,17 +1034,24 @@ int stopControlStream(void) {
} }
PltInterruptThread(&lossStatsThread); PltInterruptThread(&lossStatsThread);
PltInterruptThread(&invalidateRefFramesThread); PltInterruptThread(&requestIdrFrameThread);
PltInterruptThread(&controlReceiveThread); PltInterruptThread(&controlReceiveThread);
PltJoinThread(&lossStatsThread); PltJoinThread(&lossStatsThread);
PltJoinThread(&invalidateRefFramesThread); PltJoinThread(&requestIdrFrameThread);
PltJoinThread(&controlReceiveThread); PltJoinThread(&controlReceiveThread);
PltCloseThread(&lossStatsThread); PltCloseThread(&lossStatsThread);
PltCloseThread(&invalidateRefFramesThread); PltCloseThread(&requestIdrFrameThread);
PltCloseThread(&controlReceiveThread); PltCloseThread(&controlReceiveThread);
// We will only have an RFI thread if RFI is enabled
if (isReferenceFrameInvalidationEnabled()) {
PltInterruptThread(&invalidateRefFramesThread);
PltJoinThread(&invalidateRefFramesThread);
PltCloseThread(&invalidateRefFramesThread);
}
if (peer != NULL) { if (peer != NULL) {
// We use enet_peer_disconnect_now() so the host knows immediately // We use enet_peer_disconnect_now() so the host knows immediately
// of our termination and can cleanup properly for reconnection. // of our termination and can cleanup properly for reconnection.
@ -1246,7 +1249,7 @@ int startControlStream(void) {
return err; return err;
} }
err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread); err = PltCreateThread("ReqIdrFrame", requestIdrFrameFunc, NULL, &requestIdrFrameThread);
if (err != 0) { if (err != 0) {
stopping = true; stopping = true;
@ -1279,5 +1282,46 @@ int startControlStream(void) {
return err; return err;
} }
// Only create the reference frame invalidation thread if RFI is enabled
if (isReferenceFrameInvalidationEnabled()) {
err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread);
if (err != 0) {
stopping = true;
PltSetEvent(&idrFrameRequiredEvent);
if (ctlSock != INVALID_SOCKET) {
shutdownTcpSocket(ctlSock);
}
else {
ConnectionInterrupted = true;
}
PltInterruptThread(&lossStatsThread);
PltJoinThread(&lossStatsThread);
PltCloseThread(&lossStatsThread);
PltInterruptThread(&controlReceiveThread);
PltJoinThread(&controlReceiveThread);
PltCloseThread(&controlReceiveThread);
PltInterruptThread(&requestIdrFrameThread);
PltJoinThread(&requestIdrFrameThread);
PltCloseThread(&requestIdrFrameThread);
if (ctlSock != INVALID_SOCKET) {
closeSocket(ctlSock);
ctlSock = INVALID_SOCKET;
}
else {
enet_peer_disconnect_now(peer, 0);
peer = NULL;
enet_host_destroy(client);
client = NULL;
}
return err;
}
}
return 0; return 0;
} }