Create a second queue for completed FEC blocks

This commit is contained in:
Cameron Gutman 2021-04-26 20:26:16 -05:00
parent b33e9fbcde
commit 9a5dbcf31c
2 changed files with 41 additions and 16 deletions

View File

@ -29,6 +29,7 @@ static void purgeListEntries(PRTPFEC_QUEUE_LIST list) {
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) {
purgeListEntries(&queue->pendingFecBlockList);
purgeListEntries(&queue->completedFecBlockList);
}
static void insertEntryIntoList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) {
@ -266,14 +267,11 @@ cleanup_packets:
LC_ASSERT(droppedDataLength <= recoveredDataLength);
LC_ASSERT(droppedDataLength == recoveredDataLength || (nvPacket->flags & FLAG_EOF));
// Check all NV_VIDEO_PACKET fields except fecInfo which differs in the recovered packet
// Check all NV_VIDEO_PACKET fields except FEC stuff which differs in the recovered packet
LC_ASSERT(nvPacket->flags == droppedNvPacket->flags);
LC_ASSERT(nvPacket->frameIndex == droppedNvPacket->frameIndex);
LC_ASSERT(nvPacket->streamPacketIndex == droppedNvPacket->streamPacketIndex);
// TODO: Investigate assertion failure here with GFE 3.20.4. The remaining fields and
// video data are still recovered successfully, so this doesn't seem critical.
//LC_ASSERT(memcmp(nvPacket->reserved, droppedNvPacket->reserved, sizeof(nvPacket->reserved)) == 0);
LC_ASSERT(nvPacket->reserved == droppedNvPacket->reserved);
// Check the data itself - use memcmp() and only loop if an error is detected
if (memcmp(nvPacket + 1, droppedNvPacket + 1, droppedDataLength)) {
@ -350,7 +348,7 @@ cleanup:
return ret;
}
static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
static void stageCompleteFecBlock(PRTP_FEC_QUEUE queue) {
unsigned int nextSeqNum = queue->bufferLowestSequenceNumber;
while (queue->pendingFecBlockList.count > 0) {
@ -389,8 +387,8 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
LC_ASSERT(queue->bufferFirstRecvTimeMs != 0);
entry->receiveTimeMs = queue->bufferFirstRecvTimeMs;
// Submit this packet for decoding. It will own freeing the entry now.
queueRtpPacket(entry);
// Move this packet to the completed FEC block list
insertEntryIntoList(&queue->completedFecBlockList, entry);
break;
}
else if (isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) {
@ -411,6 +409,19 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
}
}
static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
while (queue->completedFecBlockList.count > 0) {
PRTPFEC_QUEUE_ENTRY entry = queue->completedFecBlockList.head;
// Parity packets should have been removed by stageCompleteFecBlock()
LC_ASSERT(!entry->isParity);
// Submit this packet for decoding. It will own freeing the entry now.
removeEntryFromList(&queue->completedFecBlockList, entry);
queueRtpPacket(entry);
}
}
int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry) {
if (isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)) {
// Reject packets behind our current buffer window
@ -449,14 +460,19 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
queue->bufferDataPackets);
}
// Discard any pending buffers from the previous FEC block
purgeListEntries(&queue->pendingFecBlockList);
// Discard any completed FEC blocks from the previous frame
if (queue->currentFrameNumber != nvPacket->frameIndex) {
purgeListEntries(&queue->completedFecBlockList);
}
queue->currentFrameNumber = nvPacket->frameIndex;
// Tell the control stream logic about this frame, even if we don't end up
// being able to reconstruct a full frame from it.
connectionSawFrame(queue->currentFrameNumber);
// Discard any unsubmitted buffers from the previous frame
purgeListEntries(&queue->pendingFecBlockList);
queue->bufferFirstRecvTimeMs = PltGetMillis();
queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex);
@ -489,15 +505,23 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
// Try to submit this frame. If we haven't received enough packets,
// this will fail and we'll keep waiting.
if (reconstructFrame(queue) == 0) {
// Submit the frame data to the depacketizer
submitCompletedFrame(queue);
// Stage the complete FEC block for use once reassembly is complete
stageCompleteFecBlock(queue);
// submitCompletedFrame() should have consumed all data
// stageCompleteFecBlock() should have consumed all pending FEC data
LC_ASSERT(queue->pendingFecBlockList.head == NULL);
LC_ASSERT(queue->pendingFecBlockList.tail == NULL);
LC_ASSERT(queue->pendingFecBlockList.count == 0);
// Ignore any more packets for this frame
// Submit the completed frame
submitCompletedFrame(queue);
// submitCompletedFrame() should have consumed all completed FEC data
LC_ASSERT(queue->completedFecBlockList.head == NULL);
LC_ASSERT(queue->completedFecBlockList.tail == NULL);
LC_ASSERT(queue->completedFecBlockList.count == 0);
// Continue to the next frame
queue->currentFrameNumber++;
}

View File

@ -21,6 +21,7 @@ typedef struct _RTPFEC_QUEUE_LIST {
typedef struct _RTP_FEC_QUEUE {
RTPFEC_QUEUE_LIST pendingFecBlockList;
RTPFEC_QUEUE_LIST completedFecBlockList;
uint64_t bufferFirstRecvTimeMs;
uint32_t bufferLowestSequenceNumber;