Refactor FEC queue in preparation for multi-block frame support

This commit is contained in:
Cameron Gutman 2021-04-26 17:09:50 -05:00
parent 8dab1ee300
commit ca4019c09f
2 changed files with 84 additions and 80 deletions

View File

@ -16,16 +16,65 @@ void RtpfInitializeQueue(PRTP_FEC_QUEUE queue) {
queue->currentFrameNumber = UINT16_MAX; queue->currentFrameNumber = UINT16_MAX;
} }
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) { static void purgeListEntries(PRTPFEC_QUEUE_LIST list) {
while (queue->bufferHead != NULL) { while (list->head != NULL) {
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; PRTPFEC_QUEUE_ENTRY entry = list->head;
queue->bufferHead = entry->next; list->head = entry->next;
free(entry->packet); free(entry->packet);
} }
list->tail = NULL;
list->count = 0;
}
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) {
purgeListEntries(&queue->pendingFecBlockList);
}
static void insertEntryIntoList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) {
if (list->head == NULL) {
LC_ASSERT(list->count == 0);
LC_ASSERT(list->tail == NULL);
list->head = list->tail = entry;
}
else {
LC_ASSERT(list->count != 0);
PRTPFEC_QUEUE_ENTRY oldTail = list->tail;
entry->prev = oldTail;
LC_ASSERT(oldTail->next == NULL);
oldTail->next = entry;
list->tail = entry;
}
list->count++;
}
static void removeEntryFromList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) {
LC_ASSERT(entry != NULL);
LC_ASSERT(list->head != NULL);
LC_ASSERT(list->tail != NULL);
if (list->head == entry) {
list->head = entry->next;
}
if (list->tail == entry) {
list->tail = entry->prev;
}
if (entry->prev != NULL) {
entry->prev->next = entry->next;
entry->prev = NULL;
}
if (entry->next != NULL) {
entry->next->prev = entry->prev;
entry->next = NULL;
}
list->count--;
} }
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet // newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool head, PRTP_PACKET packet, int length, bool isParity) { static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity) {
PRTPFEC_QUEUE_ENTRY entry; PRTPFEC_QUEUE_ENTRY entry;
LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)); LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber));
@ -38,7 +87,7 @@ static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool
} }
else { else {
// Check for duplicates // Check for duplicates
entry = queue->bufferHead; entry = queue->pendingFecBlockList.head;
while (entry != NULL) { while (entry != NULL) {
if (entry->packet->sequenceNumber == packet->sequenceNumber) { if (entry->packet->sequenceNumber == packet->sequenceNumber) {
return false; return false;
@ -57,27 +106,7 @@ static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool
// 90 KHz video clock // 90 KHz video clock
newEntry->presentationTimeMs = packet->timestamp / 90; newEntry->presentationTimeMs = packet->timestamp / 90;
if (queue->bufferHead == NULL) { insertEntryIntoList(&queue->pendingFecBlockList, newEntry);
LC_ASSERT(queue->bufferSize == 0);
queue->bufferHead = queue->bufferTail = newEntry;
}
else if (head) {
LC_ASSERT(queue->bufferSize > 0);
PRTPFEC_QUEUE_ENTRY oldHead = queue->bufferHead;
newEntry->next = oldHead;
LC_ASSERT(oldHead->prev == NULL);
oldHead->prev = newEntry;
queue->bufferHead = newEntry;
}
else {
LC_ASSERT(queue->bufferSize > 0);
PRTPFEC_QUEUE_ENTRY oldTail = queue->bufferTail;
newEntry->prev = oldTail;
LC_ASSERT(oldTail->next == NULL);
oldTail->next = newEntry;
queue->bufferTail = newEntry;
}
queue->bufferSize++;
return true; return true;
} }
@ -99,9 +128,9 @@ static int reconstructFrame(PRTP_FEC_QUEUE queue) {
// We'll need an extra packet to run in FEC validation mode, because we will // We'll need an extra packet to run in FEC validation mode, because we will
// be "dropping" one below and recovering it using parity. However, some frames // be "dropping" one below and recovering it using parity. However, some frames
// are so large that FEC is disabled entirely, so don't wait for parity on those. // are so large that FEC is disabled entirely, so don't wait for parity on those.
if (queue->bufferSize < queue->bufferDataPackets + (queue->fecPercentage ? 1 : 0)) { if (queue->pendingFecBlockList.count < queue->bufferDataPackets + (queue->fecPercentage ? 1 : 0)) {
#else #else
if (queue->bufferSize < queue->bufferDataPackets) { if (queue->pendingFecBlockList.count < queue->bufferDataPackets) {
#endif #endif
// Not enough data to recover yet // Not enough data to recover yet
return -1; return -1;
@ -155,7 +184,7 @@ static int reconstructFrame(PRTP_FEC_QUEUE queue) {
int droppedRtpPacketLength = 0; int droppedRtpPacketLength = 0;
#endif #endif
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; PRTPFEC_QUEUE_ENTRY entry = queue->pendingFecBlockList.head;
while (entry != NULL) { while (entry != NULL) {
unsigned int index = U16(entry->packet->sequenceNumber - queue->bufferLowestSequenceNumber); unsigned int index = U16(entry->packet->sequenceNumber - queue->bufferLowestSequenceNumber);
@ -206,9 +235,9 @@ cleanup_packets:
PRTPFEC_QUEUE_ENTRY queueEntry = (PRTPFEC_QUEUE_ENTRY)&packets[i][receiveSize]; PRTPFEC_QUEUE_ENTRY queueEntry = (PRTPFEC_QUEUE_ENTRY)&packets[i][receiveSize];
PRTP_PACKET rtpPacket = (PRTP_PACKET) packets[i]; PRTP_PACKET rtpPacket = (PRTP_PACKET) packets[i];
rtpPacket->sequenceNumber = U16(i + queue->bufferLowestSequenceNumber); rtpPacket->sequenceNumber = U16(i + queue->bufferLowestSequenceNumber);
rtpPacket->header = queue->bufferHead->packet->header; rtpPacket->header = queue->pendingFecBlockList.head->packet->header;
rtpPacket->timestamp = queue->bufferHead->packet->timestamp; rtpPacket->timestamp = queue->pendingFecBlockList.head->packet->timestamp;
rtpPacket->ssrc = queue->bufferHead->packet->ssrc; rtpPacket->ssrc = queue->pendingFecBlockList.head->packet->ssrc;
int dataOffset = sizeof(*rtpPacket); int dataOffset = sizeof(*rtpPacket);
if (rtpPacket->header & FLAG_EXTENSION) { if (rtpPacket->header & FLAG_EXTENSION) {
@ -295,7 +324,7 @@ cleanup_packets:
// it may be a legitimate part of the H.264 bytestream. // it may be a legitimate part of the H.264 bytestream.
LC_ASSERT(isBefore16(rtpPacket->sequenceNumber, queue->bufferFirstParitySequenceNumber)); LC_ASSERT(isBefore16(rtpPacket->sequenceNumber, queue->bufferFirstParitySequenceNumber));
queuePacket(queue, queueEntry, false, rtpPacket, StreamConfig.packetSize + dataOffset, false); queuePacket(queue, queueEntry, rtpPacket, StreamConfig.packetSize + dataOffset, false);
} else if (packets[i] != NULL) { } else if (packets[i] != NULL) {
free(packets[i]); free(packets[i]);
} }
@ -314,33 +343,11 @@ cleanup:
return ret; return ret;
} }
static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) {
LC_ASSERT(entry != NULL);
LC_ASSERT(queue->bufferSize > 0);
LC_ASSERT(queue->bufferHead != NULL);
LC_ASSERT(queue->bufferTail != NULL);
if (queue->bufferHead == entry) {
queue->bufferHead = entry->next;
}
if (queue->bufferTail == entry) {
queue->bufferTail = entry->prev;
}
if (entry->prev != NULL) {
entry->prev->next = entry->next;
}
if (entry->next != NULL) {
entry->next->prev = entry->prev;
}
queue->bufferSize--;
}
static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
unsigned int nextSeqNum = queue->bufferLowestSequenceNumber; unsigned int nextSeqNum = queue->bufferLowestSequenceNumber;
while (queue->bufferSize > 0) { while (queue->pendingFecBlockList.count > 0) {
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; PRTPFEC_QUEUE_ENTRY entry = queue->pendingFecBlockList.head;
unsigned int lowestRtpSequenceNumber = entry->packet->sequenceNumber; unsigned int lowestRtpSequenceNumber = entry->packet->sequenceNumber;
@ -356,7 +363,7 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
entry = parityEntry->next; entry = parityEntry->next;
// Remove this entry // Remove this entry
removeEntry(queue, parityEntry); removeEntryFromList(&queue->pendingFecBlockList, parityEntry);
// Free the entry and packet // Free the entry and packet
free(parityEntry->packet); free(parityEntry->packet);
@ -366,8 +373,7 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
// Check for the next packet in sequence. This will be O(1) for non-reordered packet streams. // Check for the next packet in sequence. This will be O(1) for non-reordered packet streams.
if (entry->packet->sequenceNumber == nextSeqNum) { if (entry->packet->sequenceNumber == nextSeqNum) {
removeEntry(queue, entry); removeEntryFromList(&queue->pendingFecBlockList, entry);
entry->prev = entry->next = NULL;
// To avoid having to sample the system time for each packet, we cheat // To avoid having to sample the system time for each packet, we cheat
// and use the first packet's receive time for all packets. This ends up // and use the first packet's receive time for all packets. This ends up
@ -427,12 +433,12 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
// Reinitialize the queue if it's empty after a frame delivery or // Reinitialize the queue if it's empty after a frame delivery or
// if we can't finish a frame before receiving the next one. // if we can't finish a frame before receiving the next one.
if (queue->bufferSize == 0 || queue->currentFrameNumber != nvPacket->frameIndex) { if (queue->pendingFecBlockList.count == 0 || queue->currentFrameNumber != nvPacket->frameIndex) {
if (queue->currentFrameNumber != nvPacket->frameIndex && queue->bufferSize != 0) { if (queue->currentFrameNumber != nvPacket->frameIndex && queue->pendingFecBlockList.count != 0) {
Limelog("Unrecoverable frame %d: %d+%d=%d received < %d needed\n", Limelog("Unrecoverable frame %d: %d+%d=%d received < %d needed\n",
queue->currentFrameNumber, queue->receivedBufferDataPackets, queue->currentFrameNumber, queue->receivedBufferDataPackets,
queue->bufferSize - queue->receivedBufferDataPackets, queue->pendingFecBlockList.count - queue->receivedBufferDataPackets,
queue->bufferSize, queue->pendingFecBlockList.count,
queue->bufferDataPackets); queue->bufferDataPackets);
} }
@ -443,14 +449,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
connectionSawFrame(queue->currentFrameNumber); connectionSawFrame(queue->currentFrameNumber);
// Discard any unsubmitted buffers from the previous frame // Discard any unsubmitted buffers from the previous frame
while (queue->bufferHead != NULL) { purgeListEntries(&queue->pendingFecBlockList);
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead;
queue->bufferHead = entry->next;
free(entry->packet);
}
queue->bufferTail = NULL;
queue->bufferSize = 0;
queue->bufferFirstRecvTimeMs = PltGetMillis(); queue->bufferFirstRecvTimeMs = PltGetMillis();
queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex);
@ -472,7 +471,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
LC_ASSERT((nvPacket->fecInfo & 0xFFC00000) >> 22 == queue->bufferDataPackets); LC_ASSERT((nvPacket->fecInfo & 0xFFC00000) >> 22 == queue->bufferDataPackets);
LC_ASSERT((nvPacket->flags & FLAG_EOF) || length - dataOffset == StreamConfig.packetSize); LC_ASSERT((nvPacket->flags & FLAG_EOF) || length - dataOffset == StreamConfig.packetSize);
if (!queuePacket(queue, packetEntry, false, packet, length, !isBefore16(packet->sequenceNumber, queue->bufferFirstParitySequenceNumber))) { if (!queuePacket(queue, packetEntry, packet, length, !isBefore16(packet->sequenceNumber, queue->bufferFirstParitySequenceNumber))) {
return RTPF_RET_REJECTED; return RTPF_RET_REJECTED;
} }
else { else {
@ -487,9 +486,9 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
submitCompletedFrame(queue); submitCompletedFrame(queue);
// submitCompletedFrame() should have consumed all data // submitCompletedFrame() should have consumed all data
LC_ASSERT(queue->bufferHead == NULL); LC_ASSERT(queue->pendingFecBlockList.head == NULL);
LC_ASSERT(queue->bufferTail == NULL); LC_ASSERT(queue->pendingFecBlockList.tail == NULL);
LC_ASSERT(queue->bufferSize == 0); LC_ASSERT(queue->pendingFecBlockList.count == 0);
// Ignore any more packets for this frame // Ignore any more packets for this frame
queue->currentFrameNumber++; queue->currentFrameNumber++;

View File

@ -13,11 +13,16 @@ typedef struct _RTPFEC_QUEUE_ENTRY {
struct _RTPFEC_QUEUE_ENTRY* prev; struct _RTPFEC_QUEUE_ENTRY* prev;
} RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY; } RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY;
typedef struct _RTPFEC_QUEUE_LIST {
PRTPFEC_QUEUE_ENTRY head;
PRTPFEC_QUEUE_ENTRY tail;
uint32_t count;
} RTPFEC_QUEUE_LIST, *PRTPFEC_QUEUE_LIST;
typedef struct _RTP_FEC_QUEUE { typedef struct _RTP_FEC_QUEUE {
PRTPFEC_QUEUE_ENTRY bufferHead; RTPFEC_QUEUE_LIST pendingFecBlockList;
PRTPFEC_QUEUE_ENTRY bufferTail;
uint64_t bufferFirstRecvTimeMs; uint64_t bufferFirstRecvTimeMs;
uint32_t bufferSize;
uint32_t bufferLowestSequenceNumber; uint32_t bufferLowestSequenceNumber;
uint32_t bufferHighestSequenceNumber; uint32_t bufferHighestSequenceNumber;
uint32_t bufferFirstParitySequenceNumber; uint32_t bufferFirstParitySequenceNumber;