diff --git a/src/RtpAudioQueue.c b/src/RtpAudioQueue.c index 55a7783..95485a6 100644 --- a/src/RtpAudioQueue.c +++ b/src/RtpAudioQueue.c @@ -35,6 +35,52 @@ void RtpaInitializeQueue(PRTP_AUDIO_QUEUE queue) { memcpy(queue->rs->parity, parity, sizeof(parity)); } +static void validateFecBlockState(PRTP_AUDIO_QUEUE queue) { +#ifdef LC_DEBUG + PRTPA_FEC_BLOCK lastBlock = queue->blockHead; + + // The next sequence number must not be less than the oldest BSN unless we're in the + // starting state (prior to us setting nextRtpSequenceNumber and oldestRtpBaseSequenceNumber). + LC_ASSERT(!isBefore16(queue->nextRtpSequenceNumber, queue->oldestRtpBaseSequenceNumber) || + (queue->nextRtpSequenceNumber == UINT16_MAX && queue->oldestRtpBaseSequenceNumber == 0)); + + if (lastBlock == NULL) { + return; + } + + uint16_t lastSeqNum = lastBlock->fecHeader.baseSequenceNumber; + uint32_t lastTs = lastBlock->fecHeader.baseTimestamp; + + // The head should not have a previous entry + LC_ASSERT(lastBlock->prev == NULL); + + // The next sequence number must not exceed the first FEC block (otherwise it should have been dequeued and freed) + LC_ASSERT(isBefore16(queue->nextRtpSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS)); + + // The first FEC block should not be before the oldest BSN (or we will drop packets that belong in that FEC block). + LC_ASSERT(!isBefore16(queue->blockHead->fecHeader.baseSequenceNumber, queue->oldestRtpBaseSequenceNumber)); + + PRTPA_FEC_BLOCK block = lastBlock->next; + while (block != NULL) { + // Ensure the list is sorted correctly + LC_ASSERT(isBefore16(lastSeqNum, block->fecHeader.baseSequenceNumber)); + LC_ASSERT(isBefore32(lastTs, block->fecHeader.baseTimestamp)); + + // Ensure entry invariants are satisfied + LC_ASSERT(block->blockSize == lastBlock->blockSize); + LC_ASSERT(block->fecHeader.payloadType == lastBlock->fecHeader.payloadType); + LC_ASSERT(block->fecHeader.ssrc == lastBlock->fecHeader.ssrc); + + // Ensure the list itself is consistent + LC_ASSERT(block->prev == lastBlock); + LC_ASSERT(block->next != NULL || queue->blockTail == block); + + lastBlock = block; + block = block->next; + } +#endif +} + static void freeFecBlockHead(PRTP_AUDIO_QUEUE queue) { PRTPA_FEC_BLOCK blockHead = queue->blockHead; @@ -49,6 +95,8 @@ static void freeFecBlockHead(PRTP_AUDIO_QUEUE queue) { queue->oldestRtpBaseSequenceNumber = blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS; + validateFecBlockState(queue); + free(blockHead); } @@ -70,6 +118,8 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK uint16_t blockSize; uint8_t fecBlockPayloadType; + validateFecBlockState(queue); + if (packet->packetType == 97) { if (length < sizeof(RTP_PACKET)) { Limelog("RTP audio data packet too small: %u\n", length); @@ -201,6 +251,8 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK } } + validateFecBlockState(queue); + return block; } @@ -295,6 +347,7 @@ static bool completeFecBlock(PRTP_AUDIO_QUEUE queue, PRTPA_FEC_BLOCK block) { LC_ASSERT(recoveryErrors == 0); } + free(droppedRtpPacket); #endif @@ -329,8 +382,6 @@ static bool enforceQueueConstraints(PRTP_AUDIO_QUEUE queue) { } int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { - LC_ASSERT(!queue->blockHead || isBefore16(queue->nextRtpSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS)); - PRTPA_FEC_BLOCK fecBlock = getFecBlockForRtpPacket(queue, packet, length); if (fecBlock == NULL) { // Reject the packet @@ -397,6 +448,9 @@ int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { LC_ASSERT(fecBlock->nextDataPacketIndex == RTPA_DATA_SHARDS); freeFecBlockHead(queue); } + else { + validateFecBlockState(queue); + } return RTPQ_RET_HANDLE_NOW; } @@ -430,6 +484,8 @@ int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { queue->nextRtpSequenceNumber = queue->blockHead->fecHeader.baseSequenceNumber; } + validateFecBlockState(queue); + return RTPQ_RET_PACKET_READY; } @@ -437,6 +493,8 @@ int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { } PRTP_PACKET RtpaGetQueuedPacket(PRTP_AUDIO_QUEUE queue, uint16_t customHeaderLength, uint16_t* length) { + validateFecBlockState(queue); + // If we're returning audio data even with discontinuities, find the next data packet if (queue->blockHead != NULL && queue->blockHead->allowDiscontinuity) { PRTPA_FEC_BLOCK nextBlock = queue->blockHead; @@ -458,6 +516,9 @@ PRTP_PACKET RtpaGetQueuedPacket(PRTP_AUDIO_QUEUE queue, uint16_t customHeaderLen if (nextBlock->nextDataPacketIndex == RTPA_DATA_SHARDS) { freeFecBlockHead(queue); } + else { + validateFecBlockState(queue); + } } // Return the next RTP sequence number by indexing into the most recent FEC block @@ -478,6 +539,9 @@ PRTP_PACKET RtpaGetQueuedPacket(PRTP_AUDIO_QUEUE queue, uint16_t customHeaderLen if (nextBlock->nextDataPacketIndex == RTPA_DATA_SHARDS) { freeFecBlockHead(queue); } + else { + validateFecBlockState(queue); + } return packet; }