diff --git a/src/RtpFecQueue.c b/src/RtpFecQueue.c index 22dcf24..d6a2255 100644 --- a/src/RtpFecQueue.c +++ b/src/RtpFecQueue.c @@ -29,6 +29,7 @@ static void purgeListEntries(PRTPFEC_QUEUE_LIST list) { void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) { purgeListEntries(&queue->pendingFecBlockList); + purgeListEntries(&queue->completedFecBlockList); } static void insertEntryIntoList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) { @@ -266,14 +267,11 @@ cleanup_packets: LC_ASSERT(droppedDataLength <= recoveredDataLength); LC_ASSERT(droppedDataLength == recoveredDataLength || (nvPacket->flags & FLAG_EOF)); - // Check all NV_VIDEO_PACKET fields except fecInfo which differs in the recovered packet + // Check all NV_VIDEO_PACKET fields except FEC stuff which differs in the recovered packet LC_ASSERT(nvPacket->flags == droppedNvPacket->flags); LC_ASSERT(nvPacket->frameIndex == droppedNvPacket->frameIndex); LC_ASSERT(nvPacket->streamPacketIndex == droppedNvPacket->streamPacketIndex); - - // TODO: Investigate assertion failure here with GFE 3.20.4. The remaining fields and - // video data are still recovered successfully, so this doesn't seem critical. - //LC_ASSERT(memcmp(nvPacket->reserved, droppedNvPacket->reserved, sizeof(nvPacket->reserved)) == 0); + LC_ASSERT(nvPacket->reserved == droppedNvPacket->reserved); // Check the data itself - use memcmp() and only loop if an error is detected if (memcmp(nvPacket + 1, droppedNvPacket + 1, droppedDataLength)) { @@ -350,7 +348,7 @@ cleanup: return ret; } -static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { +static void stageCompleteFecBlock(PRTP_FEC_QUEUE queue) { unsigned int nextSeqNum = queue->bufferLowestSequenceNumber; while (queue->pendingFecBlockList.count > 0) { @@ -389,8 +387,8 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { LC_ASSERT(queue->bufferFirstRecvTimeMs != 0); entry->receiveTimeMs = queue->bufferFirstRecvTimeMs; - // Submit this packet for decoding. It will own freeing the entry now. - queueRtpPacket(entry); + // Move this packet to the completed FEC block list + insertEntryIntoList(&queue->completedFecBlockList, entry); break; } else if (isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) { @@ -411,6 +409,19 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { } } +static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { + while (queue->completedFecBlockList.count > 0) { + PRTPFEC_QUEUE_ENTRY entry = queue->completedFecBlockList.head; + + // Parity packets should have been removed by stageCompleteFecBlock() + LC_ASSERT(!entry->isParity); + + // Submit this packet for decoding. It will own freeing the entry now. + removeEntryFromList(&queue->completedFecBlockList, entry); + queueRtpPacket(entry); + } +} + int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry) { if (isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)) { // Reject packets behind our current buffer window @@ -449,14 +460,19 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ queue->bufferDataPackets); } + // Discard any pending buffers from the previous FEC block + purgeListEntries(&queue->pendingFecBlockList); + + // Discard any completed FEC blocks from the previous frame + if (queue->currentFrameNumber != nvPacket->frameIndex) { + purgeListEntries(&queue->completedFecBlockList); + } + queue->currentFrameNumber = nvPacket->frameIndex; // Tell the control stream logic about this frame, even if we don't end up // being able to reconstruct a full frame from it. connectionSawFrame(queue->currentFrameNumber); - - // Discard any unsubmitted buffers from the previous frame - purgeListEntries(&queue->pendingFecBlockList); queue->bufferFirstRecvTimeMs = PltGetMillis(); queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); @@ -489,15 +505,23 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ // Try to submit this frame. If we haven't received enough packets, // this will fail and we'll keep waiting. if (reconstructFrame(queue) == 0) { - // Submit the frame data to the depacketizer - submitCompletedFrame(queue); + // Stage the complete FEC block for use once reassembly is complete + stageCompleteFecBlock(queue); - // submitCompletedFrame() should have consumed all data + // stageCompleteFecBlock() should have consumed all pending FEC data LC_ASSERT(queue->pendingFecBlockList.head == NULL); LC_ASSERT(queue->pendingFecBlockList.tail == NULL); LC_ASSERT(queue->pendingFecBlockList.count == 0); - - // Ignore any more packets for this frame + + // Submit the completed frame + submitCompletedFrame(queue); + + // submitCompletedFrame() should have consumed all completed FEC data + LC_ASSERT(queue->completedFecBlockList.head == NULL); + LC_ASSERT(queue->completedFecBlockList.tail == NULL); + LC_ASSERT(queue->completedFecBlockList.count == 0); + + // Continue to the next frame queue->currentFrameNumber++; } diff --git a/src/RtpFecQueue.h b/src/RtpFecQueue.h index d4a561d..84eb006 100644 --- a/src/RtpFecQueue.h +++ b/src/RtpFecQueue.h @@ -21,6 +21,7 @@ typedef struct _RTPFEC_QUEUE_LIST { typedef struct _RTP_FEC_QUEUE { RTPFEC_QUEUE_LIST pendingFecBlockList; + RTPFEC_QUEUE_LIST completedFecBlockList; uint64_t bufferFirstRecvTimeMs; uint32_t bufferLowestSequenceNumber;