From 207f981fd08b9e78c8ec7436b2503955041a1114 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 20 Feb 2023 16:24:48 -0600 Subject: [PATCH] Send final FEC frame status info to Sunshine This info will be used for dynamic FEC and dynamic video packet batch size. --- src/ControlStream.c | 69 +++++++++++++++++++++++++++++++++------- src/Limelight-internal.h | 1 + src/RtpVideoQueue.c | 31 ++++++++++++++++-- src/Video.h | 17 ++++++++++ 4 files changed, 105 insertions(+), 13 deletions(-) diff --git a/src/ControlStream.c b/src/ControlStream.c index b4a3b06..79aa08d 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -33,6 +33,11 @@ typedef struct _QUEUED_FRAME_INVALIDATION_TUPLE { LINKED_BLOCKING_QUEUE_ENTRY entry; } QUEUED_FRAME_INVALIDATION_TUPLE, *PQUEUED_FRAME_INVALIDATION_TUPLE; +typedef struct _QUEUED_FRAME_FEC_STATUS { + SS_FRAME_FEC_STATUS fecStatus; + LINKED_BLOCKING_QUEUE_ENTRY entry; +} QUEUED_FRAME_FEC_STATUS, *PQUEUED_FRAME_FEC_STATUS; + static SOCKET ctlSock = INVALID_SOCKET; static ENetHost* client; static ENetPeer* peer; @@ -60,6 +65,7 @@ static int lastConnectionStatusUpdate; static int currentEnetSequenceNumber; static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples; +static LINKED_BLOCKING_QUEUE frameFecStatusQueue; static PLT_EVENT idrFrameRequiredEvent; static PPLT_CRYPTO_CONTEXT encryptionCtx; @@ -225,6 +231,7 @@ int initializeControlStream(void) { stopping = false; PltCreateEvent(&idrFrameRequiredEvent); LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20); + LbqInitializeLinkedBlockingQueue(&frameFecStatusQueue, 20); PltCreateMutex(&enetMutex); encryptedControlStream = APP_VERSION_AT_LEAST(7, 1, 431); @@ -281,7 +288,7 @@ int initializeControlStream(void) { return 0; } -void freeFrameInvalidationList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { +static void freeBasicLbqList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { PLINKED_BLOCKING_QUEUE_ENTRY nextEntry; while (entry != NULL) { @@ -297,7 +304,9 @@ void destroyControlStream(void) { PltDestroyCryptoContext(encryptionCtx); PltDestroyCryptoContext(decryptionCtx); PltCloseEvent(&idrFrameRequiredEvent); - freeFrameInvalidationList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples)); + freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples)); + freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&frameFecStatusQueue)); + PltDeleteMutex(&enetMutex); } @@ -330,7 +339,7 @@ void queueFrameInvalidationTuple(int startFrame, int endFrame) { void LiRequestIdrFrame(void) { // Any reference frame invalidation requests should be dropped now. // We require a full IDR frame to recover. - freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); + freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); // Request the IDR frame PltSetEvent(&idrFrameRequiredEvent); @@ -347,6 +356,22 @@ void connectionReceivedCompleteFrame(int frameIndex) { intervalGoodFrameCount++; } +void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus) { + // This is a Sunshine protocol extension + if (!IS_SUNSHINE()) { + return; + } + + // Queue a frame FEC status message. This is best-effort only. + PQUEUED_FRAME_FEC_STATUS queuedFecStatus = malloc(sizeof(*queuedFecStatus)); + if (queuedFecStatus != NULL) { + queuedFecStatus->fecStatus = *fecStatus; + if (LbqOfferQueueItem(&frameFecStatusQueue, queuedFecStatus, &queuedFecStatus->entry) == LBQ_BOUND_EXCEEDED) { + free(queuedFecStatus); + } + } +} + void connectionSawFrame(int frameIndex) { LC_ASSERT(!isBefore16(frameIndex, lastSeenFrame)); @@ -516,7 +541,7 @@ static bool isPacketSentWaitingForAck(ENetPacket* packet) { return false; } -static bool sendMessageEnet(short ptype, short paylen, const void* payload) { +static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool reliable) { ENetPacket* enetPacket; int err; @@ -529,7 +554,7 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) { enetPacket = enet_packet_create(NULL, sizeof(*encPacket) + AES_GCM_TAG_LENGTH + sizeof(*packet) + paylen, - ENET_PACKET_FLAG_RELIABLE); + reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED); if (enetPacket == NULL) { return false; } @@ -562,7 +587,8 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) { } else { PNVCTL_ENET_PACKET_HEADER_V1 packet; - enetPacket = enet_packet_create(NULL, sizeof(*packet) + paylen, ENET_PACKET_FLAG_RELIABLE); + enetPacket = enet_packet_create(NULL, sizeof(*packet) + paylen, + reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED); if (enetPacket == NULL) { return false; } @@ -578,14 +604,14 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) { // Set a callback to use to let us know if the packet has been freed. // Freeing can only happen when the packet is acked or send fails. - enetPacket->userData = &packetFreed; + enetPacket->userData = (void*)&packetFreed; enetPacket->freeCallback = enetPacketFreeCb; // Queue the packet to be sent err = enet_peer_send(peer, 0, enetPacket); // Wait until the packet is actually sent to provide backpressure on senders - if (err == 0) { + if (err == 0 && reliable) { // Try to send the packet enet_host_service(client, NULL, 0); @@ -659,7 +685,7 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload) // Unlike regular sockets, ENet sockets aren't safe to invoke from multiple // threads at once. We have to synchronize them with a lock. if (AppVersionQuad[0] >= 5) { - ret = sendMessageEnet(ptype, paylen, payload); + ret = sendMessageEnet(ptype, paylen, payload, true); } else { ret = sendMessageTcp(ptype, paylen, payload); @@ -670,7 +696,7 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload) static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload) { if (AppVersionQuad[0] >= 5) { - if (!sendMessageEnet(ptype, paylen, payload)) { + if (!sendMessageEnet(ptype, paylen, payload, true)) { return false; } } @@ -1001,6 +1027,23 @@ static void lossStatsThreadFunc(void* context) { BbPut32(&byteBuffer, 0); // Timestamp? while (!PltIsThreadInterrupted(&lossStatsThread)) { + // For Sunshine servers, send the more detailed per-frame FEC messages + if (IS_SUNSHINE()) { + PQUEUED_FRAME_FEC_STATUS queuedFrameStatus; + + // Sunshine should always use ENet for control messages + LC_ASSERT(peer != NULL); + + while (LbqPollQueueElement(&frameFecStatusQueue, (void**)&queuedFrameStatus) == LBQ_SUCCESS) { + // Send as an unreliable packet, since it's not a critical message + if (!sendMessageEnet(SS_FRAME_FEC_PTYPE, sizeof(queuedFrameStatus->fecStatus), &queuedFrameStatus->fecStatus, false)) { + Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError()); + ListenerCallbacks.connectionTerminated(LastSocketFail()); + return; + } + } + } + // Send the message (and don't expect a response) if (!sendMessageAndForget(0x0200, sizeof(periodicPingPayload), periodicPingPayload)) { Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError()); @@ -1014,6 +1057,9 @@ static void lossStatsThreadFunc(void* context) { } else { char* lossStatsPayload; + + // Sunshine should use the newer codepath above + LC_ASSERT(!IS_SUNSHINE()); lossStatsPayload = malloc(payloadLengths[IDX_LOSS_STATS]); if (lossStatsPayload == NULL) { @@ -1154,7 +1200,7 @@ static void requestIdrFrameFunc(void* context) { } // Any pending reference frame invalidation requests are now redundant - freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); + freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples)); // Request the IDR frame requestIdrFrame(); @@ -1165,6 +1211,7 @@ static void requestIdrFrameFunc(void* context) { int stopControlStream(void) { stopping = true; LbqSignalQueueShutdown(&invalidReferenceFrameTuples); + LbqSignalQueueShutdown(&frameFecStatusQueue); PltSetEvent(&idrFrameRequiredEvent); // This must be set to stop in a timely manner diff --git a/src/Limelight-internal.h b/src/Limelight-internal.h index d186b46..40443fc 100644 --- a/src/Limelight-internal.h +++ b/src/Limelight-internal.h @@ -93,6 +93,7 @@ void connectionDetectedFrameLoss(int startFrame, int endFrame); void connectionReceivedCompleteFrame(int frameIndex); void connectionSawFrame(int frameIndex); void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket); +void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus); int sendInputPacketOnControlStream(unsigned char* data, int length); bool isControlDataInTransit(void); diff --git a/src/RtpVideoQueue.c b/src/RtpVideoQueue.c index 7169799..1423ca5 100644 --- a/src/RtpVideoQueue.c +++ b/src/RtpVideoQueue.c @@ -89,6 +89,24 @@ static void removeEntryFromList(PRTPV_QUEUE_LIST list, PRTPV_QUEUE_ENTRY entry) list->count--; } +static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { + SS_FRAME_FEC_STATUS fecStatus; + + fecStatus.frameIndex = BE32(queue->currentFrameNumber); + fecStatus.highestReceivedSequenceNumber = BE16(queue->receivedHighestSequenceNumber); + fecStatus.nextContiguousSequenceNumber = BE16(queue->nextContiguousSequenceNumber); + fecStatus.missingPacketsBeforeHighestReceived = (uint8_t)queue->missingPackets; + fecStatus.totalDataPackets = (uint8_t)queue->bufferDataPackets; + fecStatus.totalParityPackets = (uint8_t)queue->bufferParityPackets; + fecStatus.receivedDataPackets = (uint8_t)queue->receivedDataPackets; + fecStatus.receivedParityPackets = (uint8_t)queue->receivedParityPackets; + fecStatus.fecPercentage = (uint8_t)queue->fecPercentage; + fecStatus.multiFecBlockIndex = (uint8_t)queue->multiFecCurrentBlockNumber; + fecStatus.multiFecBlockCount = (uint8_t)(queue->multiFecLastBlockNumber + 1); + + connectionSendFrameFecStatus(&fecStatus); +} + // newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet static bool queuePacket(PRTP_VIDEO_QUEUE queue, PRTPV_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity, bool isFecRecovery) { PRTPV_QUEUE_ENTRY entry; @@ -305,13 +323,16 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { // If this fails, something is probably wrong with our FEC state. LC_ASSERT(ret == 0); -#ifdef FEC_VERBOSE if (queue->bufferDataPackets != queue->receivedDataPackets) { +#ifdef FEC_VERBOSE Limelog("Recovered %d video data shards from frame %d\n", queue->bufferDataPackets - queue->receivedDataPackets, queue->currentFrameNumber); - } #endif + + // Report the final FEC status if we needed to perform a recovery + reportFinalFrameFecStatus(queue); + } cleanup_packets: for (i = 0; i < totalPackets; i++) { @@ -550,6 +571,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ if (queue->pendingFecBlockList.count == 0 || queue->currentFrameNumber != nvPacket->frameIndex || queue->multiFecCurrentBlockNumber != fecCurrentBlockNumber) { if (queue->pendingFecBlockList.count != 0) { + // Report the final status of the FEC queue before dropping this frame + reportFinalFrameFecStatus(queue); + if (queue->multiFecLastBlockNumber != 0) { Limelog("Unrecoverable frame %d (block %d of %d): %d+%d=%d received < %d needed\n", queue->currentFrameNumber, queue->multiFecCurrentBlockNumber+1, @@ -591,6 +615,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ // or block 0 of a new frame. uint8_t expectedFecBlockNumber = (queue->currentFrameNumber == nvPacket->frameIndex ? queue->multiFecCurrentBlockNumber : 0); if (fecCurrentBlockNumber != expectedFecBlockNumber) { + // Report the final status of the FEC queue before dropping this frame + reportFinalFrameFecStatus(queue); + Limelog("Unrecoverable frame %d: lost FEC blocks %d to %d\n", nvPacket->frameIndex, expectedFecBlockNumber + 1, diff --git a/src/Video.h b/src/Video.h index 69ebe5a..c8c0883 100644 --- a/src/Video.h +++ b/src/Video.h @@ -36,9 +36,26 @@ typedef struct _RTP_PACKET { uint32_t ssrc; } RTP_PACKET, *PRTP_PACKET; +// Fields are big-endian typedef struct _SS_PING { char payload[16]; uint32_t sequenceNumber; } SS_PING, *PSS_PING; +// Fields are big-endian +#define SS_FRAME_FEC_PTYPE 0x5501 +typedef struct _SS_FRAME_FEC_STATUS { + uint32_t frameIndex; + uint16_t highestReceivedSequenceNumber; + uint16_t nextContiguousSequenceNumber; + uint8_t missingPacketsBeforeHighestReceived; + uint8_t totalDataPackets; + uint8_t totalParityPackets; + uint8_t receivedDataPackets; + uint8_t receivedParityPackets; + uint8_t fecPercentage; + uint8_t multiFecBlockIndex; + uint8_t multiFecBlockCount; +} SS_FRAME_FEC_STATUS, *PSS_FRAME_FEC_STATUS; + #pragma pack(pop)