Send final FEC frame status info to Sunshine

This info will be used for dynamic FEC and dynamic video packet batch size.
This commit is contained in:
Cameron Gutman
2023-02-20 16:24:48 -06:00
parent 82105f2f8f
commit 207f981fd0
4 changed files with 105 additions and 13 deletions

View File

@@ -33,6 +33,11 @@ typedef struct _QUEUED_FRAME_INVALIDATION_TUPLE {
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_FRAME_INVALIDATION_TUPLE, *PQUEUED_FRAME_INVALIDATION_TUPLE;
typedef struct _QUEUED_FRAME_FEC_STATUS {
SS_FRAME_FEC_STATUS fecStatus;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_FRAME_FEC_STATUS, *PQUEUED_FRAME_FEC_STATUS;
static SOCKET ctlSock = INVALID_SOCKET;
static ENetHost* client;
static ENetPeer* peer;
@@ -60,6 +65,7 @@ static int lastConnectionStatusUpdate;
static int currentEnetSequenceNumber;
static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
static LINKED_BLOCKING_QUEUE frameFecStatusQueue;
static PLT_EVENT idrFrameRequiredEvent;
static PPLT_CRYPTO_CONTEXT encryptionCtx;
@@ -225,6 +231,7 @@ int initializeControlStream(void) {
stopping = false;
PltCreateEvent(&idrFrameRequiredEvent);
LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20);
LbqInitializeLinkedBlockingQueue(&frameFecStatusQueue, 20);
PltCreateMutex(&enetMutex);
encryptedControlStream = APP_VERSION_AT_LEAST(7, 1, 431);
@@ -281,7 +288,7 @@ int initializeControlStream(void) {
return 0;
}
void freeFrameInvalidationList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
static void freeBasicLbqList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
@@ -297,7 +304,9 @@ void destroyControlStream(void) {
PltDestroyCryptoContext(encryptionCtx);
PltDestroyCryptoContext(decryptionCtx);
PltCloseEvent(&idrFrameRequiredEvent);
freeFrameInvalidationList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&frameFecStatusQueue));
PltDeleteMutex(&enetMutex);
}
@@ -330,7 +339,7 @@ void queueFrameInvalidationTuple(int startFrame, int endFrame) {
void LiRequestIdrFrame(void) {
// Any reference frame invalidation requests should be dropped now.
// We require a full IDR frame to recover.
freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
// Request the IDR frame
PltSetEvent(&idrFrameRequiredEvent);
@@ -347,6 +356,22 @@ void connectionReceivedCompleteFrame(int frameIndex) {
intervalGoodFrameCount++;
}
void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus) {
// This is a Sunshine protocol extension
if (!IS_SUNSHINE()) {
return;
}
// Queue a frame FEC status message. This is best-effort only.
PQUEUED_FRAME_FEC_STATUS queuedFecStatus = malloc(sizeof(*queuedFecStatus));
if (queuedFecStatus != NULL) {
queuedFecStatus->fecStatus = *fecStatus;
if (LbqOfferQueueItem(&frameFecStatusQueue, queuedFecStatus, &queuedFecStatus->entry) == LBQ_BOUND_EXCEEDED) {
free(queuedFecStatus);
}
}
}
void connectionSawFrame(int frameIndex) {
LC_ASSERT(!isBefore16(frameIndex, lastSeenFrame));
@@ -516,7 +541,7 @@ static bool isPacketSentWaitingForAck(ENetPacket* packet) {
return false;
}
static bool sendMessageEnet(short ptype, short paylen, const void* payload) {
static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool reliable) {
ENetPacket* enetPacket;
int err;
@@ -529,7 +554,7 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) {
enetPacket = enet_packet_create(NULL,
sizeof(*encPacket) + AES_GCM_TAG_LENGTH + sizeof(*packet) + paylen,
ENET_PACKET_FLAG_RELIABLE);
reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED);
if (enetPacket == NULL) {
return false;
}
@@ -562,7 +587,8 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) {
}
else {
PNVCTL_ENET_PACKET_HEADER_V1 packet;
enetPacket = enet_packet_create(NULL, sizeof(*packet) + paylen, ENET_PACKET_FLAG_RELIABLE);
enetPacket = enet_packet_create(NULL, sizeof(*packet) + paylen,
reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED);
if (enetPacket == NULL) {
return false;
}
@@ -578,14 +604,14 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) {
// Set a callback to use to let us know if the packet has been freed.
// Freeing can only happen when the packet is acked or send fails.
enetPacket->userData = &packetFreed;
enetPacket->userData = (void*)&packetFreed;
enetPacket->freeCallback = enetPacketFreeCb;
// Queue the packet to be sent
err = enet_peer_send(peer, 0, enetPacket);
// Wait until the packet is actually sent to provide backpressure on senders
if (err == 0) {
if (err == 0 && reliable) {
// Try to send the packet
enet_host_service(client, NULL, 0);
@@ -659,7 +685,7 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload)
// Unlike regular sockets, ENet sockets aren't safe to invoke from multiple
// threads at once. We have to synchronize them with a lock.
if (AppVersionQuad[0] >= 5) {
ret = sendMessageEnet(ptype, paylen, payload);
ret = sendMessageEnet(ptype, paylen, payload, true);
}
else {
ret = sendMessageTcp(ptype, paylen, payload);
@@ -670,7 +696,7 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload)
static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload) {
if (AppVersionQuad[0] >= 5) {
if (!sendMessageEnet(ptype, paylen, payload)) {
if (!sendMessageEnet(ptype, paylen, payload, true)) {
return false;
}
}
@@ -1001,6 +1027,23 @@ static void lossStatsThreadFunc(void* context) {
BbPut32(&byteBuffer, 0); // Timestamp?
while (!PltIsThreadInterrupted(&lossStatsThread)) {
// For Sunshine servers, send the more detailed per-frame FEC messages
if (IS_SUNSHINE()) {
PQUEUED_FRAME_FEC_STATUS queuedFrameStatus;
// Sunshine should always use ENet for control messages
LC_ASSERT(peer != NULL);
while (LbqPollQueueElement(&frameFecStatusQueue, (void**)&queuedFrameStatus) == LBQ_SUCCESS) {
// Send as an unreliable packet, since it's not a critical message
if (!sendMessageEnet(SS_FRAME_FEC_PTYPE, sizeof(queuedFrameStatus->fecStatus), &queuedFrameStatus->fecStatus, false)) {
Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
return;
}
}
}
// Send the message (and don't expect a response)
if (!sendMessageAndForget(0x0200, sizeof(periodicPingPayload), periodicPingPayload)) {
Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError());
@@ -1014,6 +1057,9 @@ static void lossStatsThreadFunc(void* context) {
}
else {
char* lossStatsPayload;
// Sunshine should use the newer codepath above
LC_ASSERT(!IS_SUNSHINE());
lossStatsPayload = malloc(payloadLengths[IDX_LOSS_STATS]);
if (lossStatsPayload == NULL) {
@@ -1154,7 +1200,7 @@ static void requestIdrFrameFunc(void* context) {
}
// Any pending reference frame invalidation requests are now redundant
freeFrameInvalidationList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
// Request the IDR frame
requestIdrFrame();
@@ -1165,6 +1211,7 @@ static void requestIdrFrameFunc(void* context) {
int stopControlStream(void) {
stopping = true;
LbqSignalQueueShutdown(&invalidReferenceFrameTuples);
LbqSignalQueueShutdown(&frameFecStatusQueue);
PltSetEvent(&idrFrameRequiredEvent);
// This must be set to stop in a timely manner

View File

@@ -93,6 +93,7 @@ void connectionDetectedFrameLoss(int startFrame, int endFrame);
void connectionReceivedCompleteFrame(int frameIndex);
void connectionSawFrame(int frameIndex);
void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket);
void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus);
int sendInputPacketOnControlStream(unsigned char* data, int length);
bool isControlDataInTransit(void);

View File

@@ -89,6 +89,24 @@ static void removeEntryFromList(PRTPV_QUEUE_LIST list, PRTPV_QUEUE_ENTRY entry)
list->count--;
}
static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) {
SS_FRAME_FEC_STATUS fecStatus;
fecStatus.frameIndex = BE32(queue->currentFrameNumber);
fecStatus.highestReceivedSequenceNumber = BE16(queue->receivedHighestSequenceNumber);
fecStatus.nextContiguousSequenceNumber = BE16(queue->nextContiguousSequenceNumber);
fecStatus.missingPacketsBeforeHighestReceived = (uint8_t)queue->missingPackets;
fecStatus.totalDataPackets = (uint8_t)queue->bufferDataPackets;
fecStatus.totalParityPackets = (uint8_t)queue->bufferParityPackets;
fecStatus.receivedDataPackets = (uint8_t)queue->receivedDataPackets;
fecStatus.receivedParityPackets = (uint8_t)queue->receivedParityPackets;
fecStatus.fecPercentage = (uint8_t)queue->fecPercentage;
fecStatus.multiFecBlockIndex = (uint8_t)queue->multiFecCurrentBlockNumber;
fecStatus.multiFecBlockCount = (uint8_t)(queue->multiFecLastBlockNumber + 1);
connectionSendFrameFecStatus(&fecStatus);
}
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
static bool queuePacket(PRTP_VIDEO_QUEUE queue, PRTPV_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity, bool isFecRecovery) {
PRTPV_QUEUE_ENTRY entry;
@@ -305,13 +323,16 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) {
// If this fails, something is probably wrong with our FEC state.
LC_ASSERT(ret == 0);
#ifdef FEC_VERBOSE
if (queue->bufferDataPackets != queue->receivedDataPackets) {
#ifdef FEC_VERBOSE
Limelog("Recovered %d video data shards from frame %d\n",
queue->bufferDataPackets - queue->receivedDataPackets,
queue->currentFrameNumber);
}
#endif
// Report the final FEC status if we needed to perform a recovery
reportFinalFrameFecStatus(queue);
}
cleanup_packets:
for (i = 0; i < totalPackets; i++) {
@@ -550,6 +571,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_
if (queue->pendingFecBlockList.count == 0 || queue->currentFrameNumber != nvPacket->frameIndex ||
queue->multiFecCurrentBlockNumber != fecCurrentBlockNumber) {
if (queue->pendingFecBlockList.count != 0) {
// Report the final status of the FEC queue before dropping this frame
reportFinalFrameFecStatus(queue);
if (queue->multiFecLastBlockNumber != 0) {
Limelog("Unrecoverable frame %d (block %d of %d): %d+%d=%d received < %d needed\n",
queue->currentFrameNumber, queue->multiFecCurrentBlockNumber+1,
@@ -591,6 +615,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_
// or block 0 of a new frame.
uint8_t expectedFecBlockNumber = (queue->currentFrameNumber == nvPacket->frameIndex ? queue->multiFecCurrentBlockNumber : 0);
if (fecCurrentBlockNumber != expectedFecBlockNumber) {
// Report the final status of the FEC queue before dropping this frame
reportFinalFrameFecStatus(queue);
Limelog("Unrecoverable frame %d: lost FEC blocks %d to %d\n",
nvPacket->frameIndex,
expectedFecBlockNumber + 1,

View File

@@ -36,9 +36,26 @@ typedef struct _RTP_PACKET {
uint32_t ssrc;
} RTP_PACKET, *PRTP_PACKET;
// Fields are big-endian
typedef struct _SS_PING {
char payload[16];
uint32_t sequenceNumber;
} SS_PING, *PSS_PING;
// Fields are big-endian
#define SS_FRAME_FEC_PTYPE 0x5501
typedef struct _SS_FRAME_FEC_STATUS {
uint32_t frameIndex;
uint16_t highestReceivedSequenceNumber;
uint16_t nextContiguousSequenceNumber;
uint8_t missingPacketsBeforeHighestReceived;
uint8_t totalDataPackets;
uint8_t totalParityPackets;
uint8_t receivedDataPackets;
uint8_t receivedParityPackets;
uint8_t fecPercentage;
uint8_t multiFecBlockIndex;
uint8_t multiFecBlockCount;
} SS_FRAME_FEC_STATUS, *PSS_FRAME_FEC_STATUS;
#pragma pack(pop)