diff --git a/src/RtpFecQueue.c b/src/RtpFecQueue.c index c729427..05da659 100644 --- a/src/RtpFecQueue.c +++ b/src/RtpFecQueue.c @@ -16,16 +16,65 @@ void RtpfInitializeQueue(PRTP_FEC_QUEUE queue) { queue->currentFrameNumber = UINT16_MAX; } -void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) { - while (queue->bufferHead != NULL) { - PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; - queue->bufferHead = entry->next; +static void purgeListEntries(PRTPFEC_QUEUE_LIST list) { + while (list->head != NULL) { + PRTPFEC_QUEUE_ENTRY entry = list->head; + list->head = entry->next; free(entry->packet); } + + list->tail = NULL; + list->count = 0; +} + +void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) { + purgeListEntries(&queue->pendingFecBlockList); +} + +static void insertEntryIntoList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) { + if (list->head == NULL) { + LC_ASSERT(list->count == 0); + LC_ASSERT(list->tail == NULL); + list->head = list->tail = entry; + } + else { + LC_ASSERT(list->count != 0); + PRTPFEC_QUEUE_ENTRY oldTail = list->tail; + entry->prev = oldTail; + LC_ASSERT(oldTail->next == NULL); + oldTail->next = entry; + list->tail = entry; + } + + list->count++; +} + +static void removeEntryFromList(PRTPFEC_QUEUE_LIST list, PRTPFEC_QUEUE_ENTRY entry) { + LC_ASSERT(entry != NULL); + LC_ASSERT(list->head != NULL); + LC_ASSERT(list->tail != NULL); + + if (list->head == entry) { + list->head = entry->next; + } + if (list->tail == entry) { + list->tail = entry->prev; + } + + if (entry->prev != NULL) { + entry->prev->next = entry->next; + entry->prev = NULL; + } + if (entry->next != NULL) { + entry->next->prev = entry->prev; + entry->next = NULL; + } + + list->count--; } // newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet -static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool head, PRTP_PACKET packet, int length, bool isParity) { +static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity) { PRTPFEC_QUEUE_ENTRY entry; LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)); @@ -38,7 +87,7 @@ static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool } else { // Check for duplicates - entry = queue->bufferHead; + entry = queue->pendingFecBlockList.head; while (entry != NULL) { if (entry->packet->sequenceNumber == packet->sequenceNumber) { return false; @@ -57,27 +106,7 @@ static bool queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, bool // 90 KHz video clock newEntry->presentationTimeMs = packet->timestamp / 90; - if (queue->bufferHead == NULL) { - LC_ASSERT(queue->bufferSize == 0); - queue->bufferHead = queue->bufferTail = newEntry; - } - else if (head) { - LC_ASSERT(queue->bufferSize > 0); - PRTPFEC_QUEUE_ENTRY oldHead = queue->bufferHead; - newEntry->next = oldHead; - LC_ASSERT(oldHead->prev == NULL); - oldHead->prev = newEntry; - queue->bufferHead = newEntry; - } - else { - LC_ASSERT(queue->bufferSize > 0); - PRTPFEC_QUEUE_ENTRY oldTail = queue->bufferTail; - newEntry->prev = oldTail; - LC_ASSERT(oldTail->next == NULL); - oldTail->next = newEntry; - queue->bufferTail = newEntry; - } - queue->bufferSize++; + insertEntryIntoList(&queue->pendingFecBlockList, newEntry); return true; } @@ -99,9 +128,9 @@ static int reconstructFrame(PRTP_FEC_QUEUE queue) { // We'll need an extra packet to run in FEC validation mode, because we will // be "dropping" one below and recovering it using parity. However, some frames // are so large that FEC is disabled entirely, so don't wait for parity on those. - if (queue->bufferSize < queue->bufferDataPackets + (queue->fecPercentage ? 1 : 0)) { + if (queue->pendingFecBlockList.count < queue->bufferDataPackets + (queue->fecPercentage ? 1 : 0)) { #else - if (queue->bufferSize < queue->bufferDataPackets) { + if (queue->pendingFecBlockList.count < queue->bufferDataPackets) { #endif // Not enough data to recover yet return -1; @@ -155,7 +184,7 @@ static int reconstructFrame(PRTP_FEC_QUEUE queue) { int droppedRtpPacketLength = 0; #endif - PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; + PRTPFEC_QUEUE_ENTRY entry = queue->pendingFecBlockList.head; while (entry != NULL) { unsigned int index = U16(entry->packet->sequenceNumber - queue->bufferLowestSequenceNumber); @@ -206,9 +235,9 @@ cleanup_packets: PRTPFEC_QUEUE_ENTRY queueEntry = (PRTPFEC_QUEUE_ENTRY)&packets[i][receiveSize]; PRTP_PACKET rtpPacket = (PRTP_PACKET) packets[i]; rtpPacket->sequenceNumber = U16(i + queue->bufferLowestSequenceNumber); - rtpPacket->header = queue->bufferHead->packet->header; - rtpPacket->timestamp = queue->bufferHead->packet->timestamp; - rtpPacket->ssrc = queue->bufferHead->packet->ssrc; + rtpPacket->header = queue->pendingFecBlockList.head->packet->header; + rtpPacket->timestamp = queue->pendingFecBlockList.head->packet->timestamp; + rtpPacket->ssrc = queue->pendingFecBlockList.head->packet->ssrc; int dataOffset = sizeof(*rtpPacket); if (rtpPacket->header & FLAG_EXTENSION) { @@ -295,7 +324,7 @@ cleanup_packets: // it may be a legitimate part of the H.264 bytestream. LC_ASSERT(isBefore16(rtpPacket->sequenceNumber, queue->bufferFirstParitySequenceNumber)); - queuePacket(queue, queueEntry, false, rtpPacket, StreamConfig.packetSize + dataOffset, false); + queuePacket(queue, queueEntry, rtpPacket, StreamConfig.packetSize + dataOffset, false); } else if (packets[i] != NULL) { free(packets[i]); } @@ -314,33 +343,11 @@ cleanup: return ret; } -static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) { - LC_ASSERT(entry != NULL); - LC_ASSERT(queue->bufferSize > 0); - LC_ASSERT(queue->bufferHead != NULL); - LC_ASSERT(queue->bufferTail != NULL); - - if (queue->bufferHead == entry) { - queue->bufferHead = entry->next; - } - if (queue->bufferTail == entry) { - queue->bufferTail = entry->prev; - } - - if (entry->prev != NULL) { - entry->prev->next = entry->next; - } - if (entry->next != NULL) { - entry->next->prev = entry->prev; - } - queue->bufferSize--; -} - static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { unsigned int nextSeqNum = queue->bufferLowestSequenceNumber; - while (queue->bufferSize > 0) { - PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; + while (queue->pendingFecBlockList.count > 0) { + PRTPFEC_QUEUE_ENTRY entry = queue->pendingFecBlockList.head; unsigned int lowestRtpSequenceNumber = entry->packet->sequenceNumber; @@ -356,7 +363,7 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { entry = parityEntry->next; // Remove this entry - removeEntry(queue, parityEntry); + removeEntryFromList(&queue->pendingFecBlockList, parityEntry); // Free the entry and packet free(parityEntry->packet); @@ -366,8 +373,7 @@ static void submitCompletedFrame(PRTP_FEC_QUEUE queue) { // Check for the next packet in sequence. This will be O(1) for non-reordered packet streams. if (entry->packet->sequenceNumber == nextSeqNum) { - removeEntry(queue, entry); - entry->prev = entry->next = NULL; + removeEntryFromList(&queue->pendingFecBlockList, entry); // To avoid having to sample the system time for each packet, we cheat // and use the first packet's receive time for all packets. This ends up @@ -427,12 +433,12 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ // Reinitialize the queue if it's empty after a frame delivery or // if we can't finish a frame before receiving the next one. - if (queue->bufferSize == 0 || queue->currentFrameNumber != nvPacket->frameIndex) { - if (queue->currentFrameNumber != nvPacket->frameIndex && queue->bufferSize != 0) { + if (queue->pendingFecBlockList.count == 0 || queue->currentFrameNumber != nvPacket->frameIndex) { + if (queue->currentFrameNumber != nvPacket->frameIndex && queue->pendingFecBlockList.count != 0) { Limelog("Unrecoverable frame %d: %d+%d=%d received < %d needed\n", queue->currentFrameNumber, queue->receivedBufferDataPackets, - queue->bufferSize - queue->receivedBufferDataPackets, - queue->bufferSize, + queue->pendingFecBlockList.count - queue->receivedBufferDataPackets, + queue->pendingFecBlockList.count, queue->bufferDataPackets); } @@ -443,14 +449,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ connectionSawFrame(queue->currentFrameNumber); // Discard any unsubmitted buffers from the previous frame - while (queue->bufferHead != NULL) { - PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead; - queue->bufferHead = entry->next; - free(entry->packet); - } - - queue->bufferTail = NULL; - queue->bufferSize = 0; + purgeListEntries(&queue->pendingFecBlockList); queue->bufferFirstRecvTimeMs = PltGetMillis(); queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); @@ -472,7 +471,7 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ LC_ASSERT((nvPacket->fecInfo & 0xFFC00000) >> 22 == queue->bufferDataPackets); LC_ASSERT((nvPacket->flags & FLAG_EOF) || length - dataOffset == StreamConfig.packetSize); - if (!queuePacket(queue, packetEntry, false, packet, length, !isBefore16(packet->sequenceNumber, queue->bufferFirstParitySequenceNumber))) { + if (!queuePacket(queue, packetEntry, packet, length, !isBefore16(packet->sequenceNumber, queue->bufferFirstParitySequenceNumber))) { return RTPF_RET_REJECTED; } else { @@ -487,9 +486,9 @@ int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, int length, PRTPFEC_ submitCompletedFrame(queue); // submitCompletedFrame() should have consumed all data - LC_ASSERT(queue->bufferHead == NULL); - LC_ASSERT(queue->bufferTail == NULL); - LC_ASSERT(queue->bufferSize == 0); + LC_ASSERT(queue->pendingFecBlockList.head == NULL); + LC_ASSERT(queue->pendingFecBlockList.tail == NULL); + LC_ASSERT(queue->pendingFecBlockList.count == 0); // Ignore any more packets for this frame queue->currentFrameNumber++; diff --git a/src/RtpFecQueue.h b/src/RtpFecQueue.h index 92ca3e1..d4a561d 100644 --- a/src/RtpFecQueue.h +++ b/src/RtpFecQueue.h @@ -13,11 +13,16 @@ typedef struct _RTPFEC_QUEUE_ENTRY { struct _RTPFEC_QUEUE_ENTRY* prev; } RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY; +typedef struct _RTPFEC_QUEUE_LIST { + PRTPFEC_QUEUE_ENTRY head; + PRTPFEC_QUEUE_ENTRY tail; + uint32_t count; +} RTPFEC_QUEUE_LIST, *PRTPFEC_QUEUE_LIST; + typedef struct _RTP_FEC_QUEUE { - PRTPFEC_QUEUE_ENTRY bufferHead; - PRTPFEC_QUEUE_ENTRY bufferTail; + RTPFEC_QUEUE_LIST pendingFecBlockList; + uint64_t bufferFirstRecvTimeMs; - uint32_t bufferSize; uint32_t bufferLowestSequenceNumber; uint32_t bufferHighestSequenceNumber; uint32_t bufferFirstParitySequenceNumber;