Improve FEC queue packet submission and retrieval from O(n) to O(1)

This commit is contained in:
Cameron Gutman 2019-04-25 23:02:01 -07:00
parent f395d3056a
commit fe7cb006da
3 changed files with 94 additions and 95 deletions

View File

@ -15,28 +15,30 @@ void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) {
queue->bufferHead = entry->next;
free(entry->packet);
}
while (queue->queueHead != NULL) {
PRTPFEC_QUEUE_ENTRY entry = queue->queueHead;
queue->queueHead = entry->next;
free(entry->packet);
}
}
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
static int queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet, int length, int isParity) {
PRTPFEC_QUEUE_ENTRY entry;
LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->bufferLowestSequenceNumber));
LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber));
// Don't queue duplicates either
entry = queue->bufferHead;
while (entry != NULL) {
if (entry->packet->sequenceNumber == packet->sequenceNumber) {
return 0;
// If the packet is in order, we can take the fast path and avoid having
// to loop through the whole list. If we get an out of order or missing
// packet, the fast path will stop working and we'll use the loop instead.
if (packet->sequenceNumber == queue->nextContiguousSequenceNumber) {
queue->nextContiguousSequenceNumber = U16(packet->sequenceNumber + 1);
}
else {
// Check for duplicates
entry = queue->bufferHead;
while (entry != NULL) {
if (entry->packet->sequenceNumber == packet->sequenceNumber) {
return 0;
}
entry = entry->next;
}
entry = entry->next;
}
newEntry->packet = packet;
@ -211,15 +213,15 @@ cleanup:
static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) {
LC_ASSERT(entry != NULL);
LC_ASSERT(queue->queueSize > 0);
LC_ASSERT(queue->queueHead != NULL);
LC_ASSERT(queue->queueTail != NULL);
LC_ASSERT(queue->bufferSize > 0);
LC_ASSERT(queue->bufferHead != NULL);
LC_ASSERT(queue->bufferTail != NULL);
if (queue->queueHead == entry) {
queue->queueHead = entry->next;
if (queue->bufferHead == entry) {
queue->bufferHead = entry->next;
}
if (queue->queueTail == entry) {
queue->queueTail = entry->prev;
if (queue->bufferTail == entry) {
queue->bufferTail = entry->prev;
}
if (entry->prev != NULL) {
@ -228,11 +230,66 @@ static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) {
if (entry->next != NULL) {
entry->next->prev = entry->prev;
}
queue->queueSize--;
queue->bufferSize--;
}
static void submitCompletedFrame(PRTP_FEC_QUEUE queue) {
unsigned int nextSeqNum = queue->bufferLowestSequenceNumber;
while (queue->bufferSize > 0) {
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead;
unsigned int lowestRtpSequenceNumber = entry->packet->sequenceNumber;
while (entry != NULL) {
// We should never encounter a packet that's lower than our next seq num
LC_ASSERT(!isBefore16(entry->packet->sequenceNumber, nextSeqNum));
// Never return parity packets
if (entry->isParity) {
PRTPFEC_QUEUE_ENTRY parityEntry = entry;
// Skip this entry
entry = parityEntry->next;
// Remove this entry
removeEntry(queue, parityEntry);
// Free the entry and packet
free(parityEntry->packet);
continue;
}
// Check for the next packet in sequence. This will be O(1) for non-reordered packet streams.
if (entry->packet->sequenceNumber == nextSeqNum) {
removeEntry(queue, entry);
entry->prev = entry->next = NULL;
// Submit this packet for decoding. It will own freeing the entry now.
queueRtpPacket(entry);
break;
}
else if (isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) {
lowestRtpSequenceNumber = entry->packet->sequenceNumber;
}
entry = entry->next;
}
if (entry == NULL) {
// Start at the lowest we found last enumeration
nextSeqNum = lowestRtpSequenceNumber;
}
else {
// We found this packet so move on to the next one in sequence
nextSeqNum = U16(nextSeqNum + 1);
}
}
}
int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry) {
if (isBefore16(packet->sequenceNumber, queue->bufferLowestSequenceNumber)) {
if (isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)) {
// Reject packets behind our current buffer window
return RTPF_RET_REJECTED;
}
@ -275,6 +332,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
queue->bufferSize = 0;
queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex);
queue->nextContiguousSequenceNumber = queue->bufferLowestSequenceNumber;
queue->receivedBufferDataPackets = 0;
queue->bufferDataPackets = (nvPacket->fecInfo & 0xFFC00000) >> 22;
queue->fecPercentage = (nvPacket->fecInfo & 0xFF0) >> 4;
@ -302,67 +360,19 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_
// Try to submit this frame. If we haven't received enough packets,
// this will fail and we'll keep waiting.
if (reconstructFrame(queue) == 0) {
// Queue the pending frame data
if (queue->queueTail == NULL) {
queue->queueHead = queue->bufferHead;
queue->queueTail = queue->bufferTail;
} else {
queue->queueTail->next = queue->bufferHead;
queue->queueTail = queue->bufferTail;
}
queue->queueSize += queue->bufferSize;
// Submit the frame data to the depacketizer
submitCompletedFrame(queue);
// Clear the buffer list
queue->bufferHead = NULL;
queue->bufferTail = NULL;
queue->bufferSize = 0;
// submitCompletedFrame() should have consumed all data
LC_ASSERT(queue->bufferHead == NULL);
LC_ASSERT(queue->bufferTail == NULL);
LC_ASSERT(queue->bufferSize == 0);
// Ignore any more packets for this frame
queue->currentFrameNumber++;
}
return (queue->queueHead != NULL) ? RTPF_RET_QUEUED_PACKETS_READY : RTPF_RET_QUEUED_NOTHING_READY;
return RTPF_RET_QUEUED;
}
}
PRTPFEC_QUEUE_ENTRY RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue) {
PRTPFEC_QUEUE_ENTRY queuedEntry, entry;
// Find the next queued packet
queuedEntry = NULL;
entry = queue->queueHead;
unsigned int lowestRtpSequenceNumber = UINT16_MAX;
while (entry != NULL) {
// Never return parity packets
if (entry->isParity) {
PRTPFEC_QUEUE_ENTRY parityEntry = entry;
// Skip this entry
entry = parityEntry->next;
// Remove this entry
removeEntry(queue, parityEntry);
// Free the entry and packet
free(parityEntry->packet);
continue;
}
if (queuedEntry == NULL || isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) {
lowestRtpSequenceNumber = entry->packet->sequenceNumber;
queuedEntry = entry;
}
entry = entry->next;
}
if (queuedEntry != NULL) {
removeEntry(queue, queuedEntry);
queuedEntry->prev = queuedEntry->next = NULL;
return queuedEntry;
} else {
return NULL;
}
}

View File

@ -13,10 +13,6 @@ typedef struct _RTPFEC_QUEUE_ENTRY {
} RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY;
typedef struct _RTP_FEC_QUEUE {
PRTPFEC_QUEUE_ENTRY queueHead;
PRTPFEC_QUEUE_ENTRY queueTail;
int queueSize;
PRTPFEC_QUEUE_ENTRY bufferHead;
PRTPFEC_QUEUE_ENTRY bufferTail;
int bufferSize;
@ -27,15 +23,15 @@ typedef struct _RTP_FEC_QUEUE {
int bufferParityPackets;
int receivedBufferDataPackets;
int fecPercentage;
int nextContiguousSequenceNumber;
int currentFrameNumber;
} RTP_FEC_QUEUE, *PRTP_FEC_QUEUE;
#define RTPF_RET_QUEUED_NOTHING_READY 0
#define RTPF_RET_QUEUED_PACKETS_READY 1
#define RTPF_RET_REJECTED 2
#define RTPF_RET_QUEUED 0
#define RTPF_RET_REJECTED 1
void RtpfInitializeQueue(PRTP_FEC_QUEUE queue);
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue);
int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry);
PRTPFEC_QUEUE_ENTRY RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue);
void RtpfSubmitQueuedPackets(PRTP_FEC_QUEUE queue);

View File

@ -122,15 +122,8 @@ static void ReceiveThreadProc(void* context) {
packet->sequenceNumber = htons(packet->sequenceNumber);
queueStatus = RtpfAddPacket(&rtpQueue, packet, err, (PRTPFEC_QUEUE_ENTRY)&buffer[receiveSize]);
if (queueStatus == RTPF_RET_QUEUED_PACKETS_READY) {
// The packet queue now has packets ready
buffer = NULL;
while ((queueEntry = RtpfGetQueuedPacket(&rtpQueue)) != NULL) {
// queueRtpPacket takes ownership of the packet
queueRtpPacket(queueEntry);
}
}
else if (queueStatus == RTPF_RET_QUEUED_NOTHING_READY) {
if (queueStatus == RTPF_RET_QUEUED) {
// The queue owns the buffer
buffer = NULL;
}