diff --git a/src/RtpFecQueue.c b/src/RtpFecQueue.c index d3782ab..6eb0048 100644 --- a/src/RtpFecQueue.c +++ b/src/RtpFecQueue.c @@ -15,28 +15,30 @@ void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) { queue->bufferHead = entry->next; free(entry->packet); } - - while (queue->queueHead != NULL) { - PRTPFEC_QUEUE_ENTRY entry = queue->queueHead; - queue->queueHead = entry->next; - free(entry->packet); - } } // newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet static int queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet, int length, int isParity) { PRTPFEC_QUEUE_ENTRY entry; - LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->bufferLowestSequenceNumber)); + LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)); - // Don't queue duplicates either - entry = queue->bufferHead; - while (entry != NULL) { - if (entry->packet->sequenceNumber == packet->sequenceNumber) { - return 0; + // If the packet is in order, we can take the fast path and avoid having + // to loop through the whole list. If we get an out of order or missing + // packet, the fast path will stop working and we'll use the loop instead. + if (packet->sequenceNumber == queue->nextContiguousSequenceNumber) { + queue->nextContiguousSequenceNumber = U16(packet->sequenceNumber + 1); + } + else { + // Check for duplicates + entry = queue->bufferHead; + while (entry != NULL) { + if (entry->packet->sequenceNumber == packet->sequenceNumber) { + return 0; + } + + entry = entry->next; } - - entry = entry->next; } newEntry->packet = packet; @@ -211,15 +213,15 @@ cleanup: static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) { LC_ASSERT(entry != NULL); - LC_ASSERT(queue->queueSize > 0); - LC_ASSERT(queue->queueHead != NULL); - LC_ASSERT(queue->queueTail != NULL); + LC_ASSERT(queue->bufferSize > 0); + LC_ASSERT(queue->bufferHead != NULL); + LC_ASSERT(queue->bufferTail != NULL); - if (queue->queueHead == entry) { - queue->queueHead = entry->next; + if (queue->bufferHead == entry) { + queue->bufferHead = entry->next; } - if (queue->queueTail == entry) { - queue->queueTail = entry->prev; + if (queue->bufferTail == entry) { + queue->bufferTail = entry->prev; } if (entry->prev != NULL) { @@ -228,11 +230,66 @@ static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) { if (entry->next != NULL) { entry->next->prev = entry->prev; } - queue->queueSize--; + queue->bufferSize--; +} + +static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { + unsigned int nextSeqNum = queue->bufferLowestSequenceNumber; + + while (queue->bufferSize > 0) { + PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; + + unsigned int lowestRtpSequenceNumber = entry->packet->sequenceNumber; + + while (entry != NULL) { + // We should never encounter a packet that's lower than our next seq num + LC_ASSERT(!isBefore16(entry->packet->sequenceNumber, nextSeqNum)); + + // Never return parity packets + if (entry->isParity) { + PRTPFEC_QUEUE_ENTRY parityEntry = entry; + + // Skip this entry + entry = parityEntry->next; + + // Remove this entry + removeEntry(queue, parityEntry); + + // Free the entry and packet + free(parityEntry->packet); + + continue; + } + + // Check for the next packet in sequence. This will be O(1) for non-reordered packet streams. + if (entry->packet->sequenceNumber == nextSeqNum) { + removeEntry(queue, entry); + entry->prev = entry->next = NULL; + + // Submit this packet for decoding. It will own freeing the entry now. + queueRtpPacket(entry); + break; + } + else if (isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) { + lowestRtpSequenceNumber = entry->packet->sequenceNumber; + } + + entry = entry->next; + } + + if (entry == NULL) { + // Start at the lowest we found last enumeration + nextSeqNum = lowestRtpSequenceNumber; + } + else { + // We found this packet so move on to the next one in sequence + nextSeqNum = U16(nextSeqNum + 1); + } + } } int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry) { - if (isBefore16(packet->sequenceNumber, queue->bufferLowestSequenceNumber)) { + if (isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)) { // Reject packets behind our current buffer window return RTPF_RET_REJECTED; } @@ -275,6 +332,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ queue->bufferSize = 0; queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); + queue->nextContiguousSequenceNumber = queue->bufferLowestSequenceNumber; queue->receivedBufferDataPackets = 0; queue->bufferDataPackets = (nvPacket->fecInfo & 0xFFC00000) >> 22; queue->fecPercentage = (nvPacket->fecInfo & 0xFF0) >> 4; @@ -302,67 +360,19 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ // Try to submit this frame. If we haven't received enough packets, // this will fail and we'll keep waiting. if (reconstructFrame(queue) == 0) { - // Queue the pending frame data - if (queue->queueTail == NULL) { - queue->queueHead = queue->bufferHead; - queue->queueTail = queue->bufferTail; - } else { - queue->queueTail->next = queue->bufferHead; - queue->queueTail = queue->bufferTail; - } - queue->queueSize += queue->bufferSize; + // Submit the frame data to the depacketizer + submitCompletedFrame(queue); - // Clear the buffer list - queue->bufferHead = NULL; - queue->bufferTail = NULL; - queue->bufferSize = 0; + // submitCompletedFrame() should have consumed all data + LC_ASSERT(queue->bufferHead == NULL); + LC_ASSERT(queue->bufferTail == NULL); + LC_ASSERT(queue->bufferSize == 0); // Ignore any more packets for this frame queue->currentFrameNumber++; } - return (queue->queueHead != NULL) ? RTPF_RET_QUEUED_PACKETS_READY : RTPF_RET_QUEUED_NOTHING_READY; + return RTPF_RET_QUEUED; } } -PRTPFEC_QUEUE_ENTRY RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue) { - PRTPFEC_QUEUE_ENTRY queuedEntry, entry; - - // Find the next queued packet - queuedEntry = NULL; - entry = queue->queueHead; - unsigned int lowestRtpSequenceNumber = UINT16_MAX; - - while (entry != NULL) { - // Never return parity packets - if (entry->isParity) { - PRTPFEC_QUEUE_ENTRY parityEntry = entry; - - // Skip this entry - entry = parityEntry->next; - - // Remove this entry - removeEntry(queue, parityEntry); - - // Free the entry and packet - free(parityEntry->packet); - - continue; - } - - if (queuedEntry == NULL || isBefore16(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) { - lowestRtpSequenceNumber = entry->packet->sequenceNumber; - queuedEntry = entry; - } - - entry = entry->next; - } - - if (queuedEntry != NULL) { - removeEntry(queue, queuedEntry); - queuedEntry->prev = queuedEntry->next = NULL; - return queuedEntry; - } else { - return NULL; - } -} diff --git a/src/RtpFecQueue.h b/src/RtpFecQueue.h index 1e6df04..1fb8444 100644 --- a/src/RtpFecQueue.h +++ b/src/RtpFecQueue.h @@ -13,10 +13,6 @@ typedef struct _RTPFEC_QUEUE_ENTRY { } RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY; typedef struct _RTP_FEC_QUEUE { - PRTPFEC_QUEUE_ENTRY queueHead; - PRTPFEC_QUEUE_ENTRY queueTail; - int queueSize; - PRTPFEC_QUEUE_ENTRY bufferHead; PRTPFEC_QUEUE_ENTRY bufferTail; int bufferSize; @@ -27,15 +23,15 @@ typedef struct _RTP_FEC_QUEUE { int bufferParityPackets; int receivedBufferDataPackets; int fecPercentage; + int nextContiguousSequenceNumber; int currentFrameNumber; } RTP_FEC_QUEUE, *PRTP_FEC_QUEUE; -#define RTPF_RET_QUEUED_NOTHING_READY 0 -#define RTPF_RET_QUEUED_PACKETS_READY 1 -#define RTPF_RET_REJECTED 2 +#define RTPF_RET_QUEUED 0 +#define RTPF_RET_REJECTED 1 void RtpfInitializeQueue(PRTP_FEC_QUEUE queue); void RtpfCleanupQueue(PRTP_FEC_QUEUE queue); int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_QUEUE_ENTRY packetEntry); -PRTPFEC_QUEUE_ENTRY RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue); +void RtpfSubmitQueuedPackets(PRTP_FEC_QUEUE queue); diff --git a/src/VideoStream.c b/src/VideoStream.c index 2337ed8..3f91167 100644 --- a/src/VideoStream.c +++ b/src/VideoStream.c @@ -122,15 +122,8 @@ static void ReceiveThreadProc(void* context) { packet->sequenceNumber = htons(packet->sequenceNumber); queueStatus = RtpfAddPacket(&rtpQueue, packet, err, (PRTPFEC_QUEUE_ENTRY)&buffer[receiveSize]); - if (queueStatus == RTPF_RET_QUEUED_PACKETS_READY) { - // The packet queue now has packets ready - buffer = NULL; - while ((queueEntry = RtpfGetQueuedPacket(&rtpQueue)) != NULL) { - // queueRtpPacket takes ownership of the packet - queueRtpPacket(queueEntry); - } - } - else if (queueStatus == RTPF_RET_QUEUED_NOTHING_READY) { + + if (queueStatus == RTPF_RET_QUEUED) { // The queue owns the buffer buffer = NULL; }