diff --git a/src/AudioStream.c b/src/AudioStream.c index 38fdbea..a86cf6b 100644 --- a/src/AudioStream.c +++ b/src/AudioStream.c @@ -3,7 +3,7 @@ static SOCKET rtpSocket = INVALID_SOCKET; static LINKED_BLOCKING_QUEUE packetQueue; -static RTP_REORDER_QUEUE rtpReorderQueue; +static RTP_AUDIO_QUEUE rtpAudioQueue; static PLT_THREAD udpPingThread; static PLT_THREAD receiveThread; @@ -26,15 +26,14 @@ static uint64_t firstReceiveTime; // for longer than normal. #define RTP_RECV_BUFFER (64 * 1024) -typedef struct _QUEUED_AUDIO_PACKET { - // data must remain at the front - char data[MAX_PACKET_SIZE]; - +typedef struct _QUEUE_AUDIO_PACKET_HEADER { + LINKED_BLOCKING_QUEUE_ENTRY lentry; int size; - union { - RTP_QUEUE_ENTRY rentry; - LINKED_BLOCKING_QUEUE_ENTRY lentry; - } q; +} QUEUED_AUDIO_PACKET_HEADER, *PQUEUED_AUDIO_PACKET_HEADER; + +typedef struct _QUEUED_AUDIO_PACKET { + QUEUED_AUDIO_PACKET_HEADER header; + char data[MAX_PACKET_SIZE]; } QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET; static void UdpPingThreadProc(void* context) { @@ -67,7 +66,7 @@ static void UdpPingThreadProc(void* context) { // Initialize the audio stream and start int initializeAudioStream(void) { LbqInitializeLinkedBlockingQueue(&packetQueue, 30); - RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME); + RtpaInitializeQueue(&rtpAudioQueue); lastSeq = 0; receivedDataFromPeer = false; firstReceiveTime = 0; @@ -122,13 +121,13 @@ void destroyAudioStream(void) { PltDestroyCryptoContext(audioDecryptionCtx); freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue)); - RtpqCleanupQueue(&rtpReorderQueue); + RtpaCleanupQueue(&rtpAudioQueue); } static bool queuePacketToLbq(PQUEUED_AUDIO_PACKET* packet) { int err; - err = LbqOfferQueueItem(&packetQueue, *packet, &(*packet)->q.lentry); + err = LbqOfferQueueItem(&packetQueue, *packet, &(*packet)->header.lentry); if (err == LBQ_SUCCESS) { // The LBQ owns the buffer now *packet = NULL; @@ -160,7 +159,7 @@ static void decodeInputData(PQUEUED_AUDIO_PACKET packet) { // We must have room for the AES padding which may be written to the buffer unsigned char decryptedOpusData[ROUND_TO_PKCS7_PADDED_LEN(MAX_PACKET_SIZE)]; unsigned char iv[16] = { 0 }; - int dataLength = packet->size - sizeof(*rtp); + int dataLength = packet->header.size - sizeof(*rtp); LC_ASSERT(dataLength <= MAX_PACKET_SIZE); @@ -182,7 +181,7 @@ static void decodeInputData(PQUEUED_AUDIO_PACKET packet) { AudioCallbacks.decodeAndPlaySample((char*)decryptedOpusData, dataLength); } else { - AudioCallbacks.decodeAndPlaySample((char*)(rtp + 1), packet->size - sizeof(*rtp)); + AudioCallbacks.decodeAndPlaySample((char*)(rtp + 1), packet->header.size - sizeof(*rtp)); } } @@ -217,13 +216,13 @@ static void ReceiveThreadProc(void* context) { } } - packet->size = recvUdpSocket(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, useSelect); - if (packet->size < 0) { + packet->header.size = recvUdpSocket(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, useSelect); + if (packet->header.size < 0) { Limelog("Audio Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); break; } - else if (packet->size == 0) { + else if (packet->header.size == 0) { // Receive timed out; try again if (!receivedDataFromPeer) { @@ -236,16 +235,12 @@ static void ReceiveThreadProc(void* context) { continue; } - if (packet->size < (int)sizeof(RTP_PACKET)) { + if (packet->header.size < (int)sizeof(RTP_PACKET)) { // Runt packet continue; } rtp = (PRTP_PACKET)&packet->data[0]; - if (rtp->packetType != 97) { - // Not audio - continue; - } if (!receivedDataFromPeer) { receivedDataFromPeer = true; @@ -260,7 +255,10 @@ static void ReceiveThreadProc(void* context) { // GFE accumulates audio samples before we are ready to receive them, so // we will drop the ones that arrived before the receive thread was ready. if (packetsToDrop > 0) { - packetsToDrop--; + // Only count actual audio data (not FEC) in the packets to drop calculation + if (rtp->packetType == 97) { + packetsToDrop--; + } continue; } @@ -269,7 +267,7 @@ static void ReceiveThreadProc(void* context) { rtp->timestamp = BE32(rtp->timestamp); rtp->ssrc = BE32(rtp->ssrc); - queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET)packet, &packet->q.rentry); + queueStatus = RtpaAddPacket(&rtpAudioQueue, (PRTP_PACKET)&packet->data[0], (uint16_t)packet->header.size); if (RTPQ_HANDLE_NOW(queueStatus)) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if (!queuePacketToLbq(&packet)) { @@ -289,7 +287,11 @@ static void ReceiveThreadProc(void* context) { if (RTPQ_PACKET_READY(queueStatus)) { // If packets are ready, pull them and send them to the decoder - while ((packet = (PQUEUED_AUDIO_PACKET)RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) { + uint16_t length; + while ((packet = (PQUEUED_AUDIO_PACKET)RtpaGetQueuedPacket(&rtpAudioQueue, sizeof(QUEUED_AUDIO_PACKET_HEADER), &length)) != NULL) { + // Populate header data (not preserved in queued packets) + packet->header.size = length; + if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if (!queuePacketToLbq(&packet)) { // An exit signal was received diff --git a/src/Limelight-internal.h b/src/Limelight-internal.h index 14f1116..f3150b4 100644 --- a/src/Limelight-internal.h +++ b/src/Limelight-internal.h @@ -8,7 +8,7 @@ #include "Video.h" #include "Input.h" #include "RtpFecQueue.h" -#include "RtpReorderQueue.h" +#include "RtpAudioQueue.h" #include "ByteBuffer.h" #include diff --git a/src/RtpAudioQueue.c b/src/RtpAudioQueue.c new file mode 100644 index 0000000..f730c59 --- /dev/null +++ b/src/RtpAudioQueue.c @@ -0,0 +1,391 @@ +#include "Limelight-internal.h" + +void RtpaInitializeQueue(PRTP_AUDIO_QUEUE queue) { + memset(queue, 0, sizeof(*queue)); + queue->maxQueueTimeMs = RTPQ_DEFAULT_QUEUE_TIME; + queue->nextRtpSequenceNumber = UINT16_MAX; + + reed_solomon_init(); + + // The number of data and parity shards is constant, so we can reuse + // the same RS matrices for all traffic. + queue->rs = reed_solomon_new(RTPA_DATA_SHARDS, RTPA_FEC_SHARDS); + + // For unknown reasons, the RS parity matrix computed by our RS implementation + // doesn't match the one Nvidia uses for audio data. I'm not exactly sure why, + // but we can simply replace it with the matrix generated by OpenFEC which + // works correctly. This is possible because the data and FEC shard count is + // constant and known in advance. + const unsigned char parity[] = { 0x77, 0x40, 0x38, 0x0e, 0xc7, 0xa7, 0x0d, 0x6c }; + memcpy(&queue->rs->m[16], parity, sizeof(parity)); + memcpy(queue->rs->parity, parity, sizeof(parity)); +} + +static void freeFecBlockHead(PRTP_AUDIO_QUEUE queue) { + PRTPA_FEC_BLOCK blockHead = queue->blockHead; + + queue->blockHead = queue->blockHead->next; + if (queue->blockHead != NULL) { + queue->blockHead->prev = NULL; + } + else { + LC_ASSERT(queue->blockTail == blockHead); + queue->blockTail = NULL; + } + + queue->oldestRtpBaseSequenceNumber = blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS; + + free(blockHead); +} + +void RtpaCleanupQueue(PRTP_AUDIO_QUEUE queue) { + while (queue->blockHead != NULL) { + freeFecBlockHead(queue); + } + + LC_ASSERT(queue->blockTail == NULL); + + reed_solomon_release(queue->rs); + queue->rs = NULL; +} + +static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { + uint32_t fecBlockSsrc; + uint16_t fecBlockBaseSeqNum; + uint32_t fecBlockBaseTs; + uint16_t blockSize; + uint8_t fecBlockPayloadType; + + if (packet->packetType == 97) { + if (length < sizeof(RTP_PACKET)) { + Limelog("RTP audio data packet too small: %u\n", length); + LC_ASSERT(false); + return NULL; + } + + // This is a data packet, so we will need to synthesize an FEC header + fecBlockPayloadType = packet->packetType; + fecBlockBaseSeqNum = (packet->sequenceNumber / RTPA_DATA_SHARDS) * RTPA_DATA_SHARDS; + fecBlockBaseTs = packet->timestamp - ((packet->sequenceNumber - fecBlockBaseSeqNum) * AudioPacketDuration); + fecBlockSsrc = packet->ssrc; + + blockSize = length - sizeof(RTP_PACKET); + } + else if (packet->packetType == 127) { + PAUDIO_FEC_HEADER fecHeader = (PAUDIO_FEC_HEADER)(packet + 1); + + if (length < sizeof(RTP_PACKET) + sizeof(AUDIO_FEC_HEADER)) { + Limelog("RTP audio FEC packet too small: %u\n", length); + LC_ASSERT(false); + return NULL; + } + + // This is an FEC packet, so we can just copy (and byteswap) the FEC header + fecBlockPayloadType = fecHeader->payloadType; + fecBlockBaseSeqNum = BE16(fecHeader->baseSequenceNumber); + fecBlockBaseTs = BE32(fecHeader->baseTimestamp); + fecBlockSsrc = BE32(fecHeader->ssrc); + + // Ensure the FEC shard index is valid to prevent OOB access + // later during recovery. + if (fecHeader->fecShardIndex >= RTPA_FEC_SHARDS) { + Limelog("Too many audio FEC shards: %u\n", fecHeader->fecShardIndex); + LC_ASSERT(false); + return NULL; + } + + blockSize = length - sizeof(RTP_PACKET) - sizeof(AUDIO_FEC_HEADER); + } + else { + LC_ASSERT(false); + return NULL; + } + + // Drop packets from FEC blocks that have already been completed + if (isBefore16(fecBlockBaseSeqNum, queue->oldestRtpBaseSequenceNumber)) { + return NULL; + } + + // Look for an existing FEC block + PRTPA_FEC_BLOCK existingBlock = queue->blockHead; + while (existingBlock != NULL) { + if (existingBlock->fecHeader.baseSequenceNumber == fecBlockBaseSeqNum) { + // The FEC header data should match for all packets + LC_ASSERT(existingBlock->fecHeader.payloadType == fecBlockPayloadType); + LC_ASSERT(existingBlock->fecHeader.baseTimestamp == fecBlockBaseTs); + LC_ASSERT(existingBlock->fecHeader.ssrc == fecBlockSsrc); + LC_ASSERT(existingBlock->blockSize == blockSize); + + // If the block is completed, don't return it + return existingBlock->fullyReassembled ? NULL : existingBlock; + } + else if (existingBlock->fecHeader.baseSequenceNumber > fecBlockBaseSeqNum) { + // The new block goes right before this one + break; + } + + existingBlock = existingBlock->next; + } + + // We didn't find an existing FEC block, so we'll have to make one + uint16_t dataPacketSize = blockSize + sizeof(RTP_PACKET); + PRTPA_FEC_BLOCK block = malloc(sizeof(*block) + (RTPA_DATA_SHARDS * dataPacketSize) + (RTPA_FEC_SHARDS * blockSize)); + if (block == NULL) { + return NULL; + } + + memset(block, 0, sizeof(*block)); + + block->queueTimeMs = PltGetMillis(); + block->blockSize = blockSize; + memset(block->marks, 1, sizeof(block->marks)); + + // Set up the FEC header + block->fecHeader.payloadType = fecBlockPayloadType; + block->fecHeader.baseSequenceNumber = fecBlockBaseSeqNum; + block->fecHeader.baseTimestamp = fecBlockBaseTs; + block->fecHeader.ssrc = fecBlockSsrc; + + // Set up packet buffers pointing into the slab we allocated + uint8_t* data = (uint8_t*)(block + 1); + for (int i = 0; i < RTPA_DATA_SHARDS; i++) { + block->dataPackets[i] = (PRTP_PACKET)data; + data += dataPacketSize; + } + for (int i = 0; i < RTPA_FEC_SHARDS; i++) { + block->fecPackets[i] = data; + data += blockSize; + } + + // Place this block into the list in order + if (existingBlock != NULL) { + // This new block comes right before existingBlock + PRTPA_FEC_BLOCK prevBlock = existingBlock->prev; + + existingBlock->prev = block; + + if (prevBlock == NULL) { + LC_ASSERT(queue->blockHead == existingBlock); + queue->blockHead = block; + } + else { + prevBlock->next = block; + } + + block->prev = prevBlock; + block->next = existingBlock; + } + else { + // This block goes at the tail of the list + block->prev = queue->blockTail; + if (queue->blockTail != NULL) { + queue->blockTail->next = block; + } + queue->blockTail = block; + if (queue->blockHead == NULL) { + queue->blockHead = block; + } + } + + return block; +} + +static bool completeFecBlock(PRTP_AUDIO_QUEUE queue, PRTPA_FEC_BLOCK block) { + uint8_t* shards[RTPA_TOTAL_SHARDS]; + + // If we don't have enough shards, we can't do anything + if (block->dataShardsReceived + block->fecShardsReceived < RTPA_DATA_SHARDS) { + return false; + } + + // If we have all data shards, don't bother with any recovery + LC_ASSERT(block->dataShardsReceived <= RTPA_DATA_SHARDS); + if (block->dataShardsReceived == RTPA_DATA_SHARDS) { + return true; + } + + // We have recovery to do. Let's build the array. + for (int i = 0; i < RTPA_DATA_SHARDS; i++) { + shards[i] = (uint8_t*)(block->dataPackets[i] + 1); + } + for (int i = 0; i < RTPA_FEC_SHARDS; i++) { + shards[RTPA_DATA_SHARDS + i] = block->fecPackets[i]; + } + + int res = reed_solomon_reconstruct(queue->rs, shards, block->marks, RTPA_TOTAL_SHARDS, block->blockSize); + + // We should always have enough data to recover the entire block since we checked above. + LC_ASSERT(res == 0); + + // We will need to recover the RTP packet using the FEC header + for (int i = 0; i < RTPA_DATA_SHARDS; i++) { + if (block->marks[i]) { + block->dataPackets[i]->header = 0x80; // RTPv2 + block->dataPackets[i]->packetType = block->fecHeader.payloadType; + block->dataPackets[i]->sequenceNumber = block->fecHeader.baseSequenceNumber + i; + block->dataPackets[i]->timestamp = block->fecHeader.baseTimestamp + (i * AudioPacketDuration); + block->dataPackets[i]->ssrc = block->fecHeader.ssrc; + + block->marks[i] = 0; + } + } + + return true; +} + +static bool queueHasPacketReady(PRTP_AUDIO_QUEUE queue) { + return queue->blockHead != NULL && + queue->blockHead->marks[queue->blockHead->nextDataPacketIndex] == 0 && + queue->blockHead->fecHeader.baseSequenceNumber + queue->blockHead->nextDataPacketIndex == queue->nextRtpSequenceNumber; +} + +static bool enforceQueueConstraints(PRTP_AUDIO_QUEUE queue) { + // Empty queue is fine + if (queue->blockHead == NULL) { + return false; + } + + // Check that the queue's time constraint is satisfied + if (PltGetMillis() - queue->blockHead->queueTimeMs > queue->maxQueueTimeMs) { + Limelog("Unable to recover audio data block %u to %u (%u+%u=%u received < %u needed)\n", + queue->blockHead->fecHeader.baseSequenceNumber, + queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS - 1, + queue->blockHead->dataShardsReceived, + queue->blockHead->fecShardsReceived, + queue->blockHead->dataShardsReceived + queue->blockHead->fecShardsReceived, + RTPA_DATA_SHARDS); + return true; + } + + return false; +} + +int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length) { + LC_ASSERT(!queue->blockHead || queue->nextRtpSequenceNumber < queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS); + + PRTPA_FEC_BLOCK fecBlock = getFecBlockForRtpPacket(queue, packet, length); + if (fecBlock == NULL) { + // Reject the packet + return 0; + } + + if (packet->packetType == 97) { + uint16_t pos = packet->sequenceNumber - fecBlock->fecHeader.baseSequenceNumber; + + // This is validated in getFecBlockForRtpPacket() + LC_ASSERT(pos < RTPA_DATA_SHARDS); + + if (fecBlock->marks[pos]) { + // If there was a missing data shard, copy the RTP header and packet data into it + memcpy(fecBlock->dataPackets[pos], packet, length); + fecBlock->marks[pos] = 0; + fecBlock->dataShardsReceived++; + } + else { + // This is a duplicate packet - reject it + return 0; + } + } + else if (packet->packetType == 127) { + PAUDIO_FEC_HEADER fecHeader = (PAUDIO_FEC_HEADER)(packet + 1); + + // This is validated in getFecBlockForRtpPacket() + LC_ASSERT(fecHeader->fecShardIndex < RTPA_FEC_SHARDS); + + if (fecBlock->marks[RTPA_DATA_SHARDS + fecHeader->fecShardIndex]) { + // If there was a missing FEC shard, copy just the FEC data into it + memcpy(fecBlock->fecPackets[fecHeader->fecShardIndex], fecHeader + 1, length - sizeof(RTP_PACKET) - sizeof(AUDIO_FEC_HEADER)); + fecBlock->marks[RTPA_DATA_SHARDS + fecHeader->fecShardIndex] = 0; + fecBlock->fecShardsReceived++; + } + else { + // This is a duplicate packet - reject it + return 0; + } + } + else { + // getFecBlockForRtpPacket() would have already failed + LC_ASSERT(false); + return 0; + } + + if ((queue->nextRtpSequenceNumber == UINT16_MAX && queue->oldestRtpBaseSequenceNumber == 0) && + packet->sequenceNumber != fecBlock->fecHeader.baseSequenceNumber) { + // Our first packet was not the start of an FEC block, so go ahead and queue it + // but ensure nextRtpSequenceNumber is set to the start of the FEC block. + queue->nextRtpSequenceNumber = fecBlock->fecHeader.baseSequenceNumber; + } + else if ((queue->nextRtpSequenceNumber == UINT16_MAX && queue->oldestRtpBaseSequenceNumber == 0) || + packet->sequenceNumber == queue->nextRtpSequenceNumber) { + queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; + + // We are going to return this entry, so update the FEC block + // state to indicate that the caller has already received it. + fecBlock->nextDataPacketIndex++; + + // If we've returned all packets in this FEC block, free it. + if (queue->nextRtpSequenceNumber == fecBlock->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS) { + LC_ASSERT(fecBlock == queue->blockHead); + LC_ASSERT(fecBlock->nextDataPacketIndex == RTPA_DATA_SHARDS); + freeFecBlockHead(queue); + } + + return RTPQ_RET_HANDLE_NOW; + } + + // Try to complete the FEC block via data shards or data+FEC shards + if (completeFecBlock(queue, fecBlock)) { + // We completed a FEC block + fecBlock->fullyReassembled = true; + } + + if (queueHasPacketReady(queue)) { + return RTPQ_RET_PACKET_READY; + } + + // We don't have enough to proceed. Let's ensure we haven't + // violated queue constraints with this FEC block. + if (enforceQueueConstraints(queue)) { + // We need to discard this FEC block and point the next RTP sequence number to the next block + queue->nextRtpSequenceNumber = queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS; + + // NOTE: Here we elect to just throw away the entire FEC block. We could play back the source + // data that we have, but this is easier. It's also unclear whether playback of partial data + // after a significant delay is actually worse than dropping it due to causing additional + // latency to accumulate in the audio pipeline. + freeFecBlockHead(queue); + } + + return queueHasPacketReady(queue) ? RTPQ_RET_PACKET_READY : 0; +} + +PRTP_PACKET RtpaGetQueuedPacket(PRTP_AUDIO_QUEUE queue, uint16_t customHeaderLength, uint16_t* length) { + PRTPA_FEC_BLOCK nextBlock = queue->blockHead; + + if (nextBlock == NULL) { + return NULL; + } + + // Return the next RTP sequence number by indexing into the most recent FEC block + if (queueHasPacketReady(queue)) { + PRTP_PACKET packet = malloc(customHeaderLength + sizeof(RTP_PACKET) + nextBlock->blockSize); + if (packet == NULL) { + return NULL; + } + + *length = nextBlock->blockSize + sizeof(RTP_PACKET); + memcpy((uint8_t*)packet + customHeaderLength, nextBlock->dataPackets[nextBlock->nextDataPacketIndex], *length); + nextBlock->nextDataPacketIndex++; + + queue->nextRtpSequenceNumber++; + + // If we've read everything from this FEC block, remove and free it + if (nextBlock->nextDataPacketIndex == RTPA_DATA_SHARDS) { + freeFecBlockHead(queue); + } + + return packet; + } + + return NULL; +} diff --git a/src/RtpAudioQueue.h b/src/RtpAudioQueue.h new file mode 100644 index 0000000..2b73792 --- /dev/null +++ b/src/RtpAudioQueue.h @@ -0,0 +1,67 @@ +#pragma once + +#include "Video.h" + +#include "rs.h" + +#define RTPQ_DEFAULT_QUEUE_TIME 40 + +#define RTPA_DATA_SHARDS 4 +#define RTPA_FEC_SHARDS 2 +#define RTPA_TOTAL_SHARDS (RTPA_DATA_SHARDS + RTPA_FEC_SHARDS) + +typedef struct _AUDIO_FEC_HEADER { + uint8_t fecShardIndex; + uint8_t payloadType; + uint16_t baseSequenceNumber; + uint32_t baseTimestamp; + uint32_t ssrc; +} AUDIO_FEC_HEADER, *PAUDIO_FEC_HEADER; + +typedef struct _RTPA_FEC_BLOCK { + struct _RTPA_FEC_BLOCK* prev; + struct _RTPA_FEC_BLOCK* next; + + PRTP_PACKET dataPackets[RTPA_DATA_SHARDS]; + uint8_t* fecPackets[RTPA_FEC_SHARDS]; + uint8_t marks[RTPA_TOTAL_SHARDS]; + + AUDIO_FEC_HEADER fecHeader; + + uint64_t queueTimeMs; + uint8_t dataShardsReceived; + uint8_t fecShardsReceived; + bool fullyReassembled; + + // Used when dequeuing data from FEC blocks for the caller + uint8_t nextDataPacketIndex; + + uint16_t blockSize; + + // Data for shards comes here +} RTPA_FEC_BLOCK, *PRTPA_FEC_BLOCK; + +typedef struct _RTP_AUDIO_QUEUE { + PRTPA_FEC_BLOCK blockHead; + PRTPA_FEC_BLOCK blockTail; + + reed_solomon* rs; + + uint32_t maxQueueTimeMs; + + uint16_t nextRtpSequenceNumber; + uint16_t oldestRtpBaseSequenceNumber; +} RTP_AUDIO_QUEUE, *PRTP_AUDIO_QUEUE; + +#define RTPQ_RET_PACKET_CONSUMED 0x1 +#define RTPQ_RET_PACKET_READY 0x2 +#define RTPQ_RET_HANDLE_NOW 0x4 + +#define RTPQ_PACKET_CONSUMED(x) ((x) & RTPQ_RET_PACKET_CONSUMED) +#define RTPQ_PACKET_READY(x) ((x) & RTPQ_RET_PACKET_READY) +#define RTPQ_HANDLE_NOW(x) ((x) == RTPQ_RET_HANDLE_NOW) + +void RtpaInitializeQueue(PRTP_AUDIO_QUEUE queue); +void RtpaCleanupQueue(PRTP_AUDIO_QUEUE queue); +int RtpaAddPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACKET packet, uint16_t length); +PRTP_PACKET RtpaGetQueuedPacket(PRTP_AUDIO_QUEUE queue, uint16_t customHeaderLength, uint16_t* length); diff --git a/src/RtpReorderQueue.c b/src/RtpReorderQueue.c deleted file mode 100644 index 26e21c9..0000000 --- a/src/RtpReorderQueue.c +++ /dev/null @@ -1,255 +0,0 @@ -#include "Limelight-internal.h" - -void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs) { - memset(queue, 0, sizeof(*queue)); - queue->maxSize = maxSize; - queue->maxQueueTimeMs = maxQueueTimeMs; - queue->nextRtpSequenceNumber = UINT16_MAX; - queue->oldestQueuedTimeMs = UINT64_MAX; -} - -void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue) { - while (queue->queueHead != NULL) { - PRTP_QUEUE_ENTRY entry = queue->queueHead; - queue->queueHead = entry->next; - free(entry->packet); - } -} - -// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet -static bool queuePacket(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY newEntry, bool head, PRTP_PACKET packet) { - PRTP_QUEUE_ENTRY entry; - - LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)); - - // Don't queue duplicates - entry = queue->queueHead; - while (entry != NULL) { - if (entry->packet->sequenceNumber == packet->sequenceNumber) { - return false; - } - - entry = entry->next; - } - - newEntry->packet = packet; - newEntry->queueTimeMs = PltGetMillis(); - newEntry->prev = NULL; - newEntry->next = NULL; - - if (queue->oldestQueuedTimeMs == UINT64_MAX) { - queue->oldestQueuedTimeMs = newEntry->queueTimeMs; - } - - if (queue->queueHead == NULL) { - LC_ASSERT(queue->queueSize == 0); - queue->queueHead = queue->queueTail = newEntry; - } - else if (head) { - LC_ASSERT(queue->queueSize > 0); - PRTP_QUEUE_ENTRY oldHead = queue->queueHead; - newEntry->next = oldHead; - LC_ASSERT(oldHead->prev == NULL); - oldHead->prev = newEntry; - queue->queueHead = newEntry; - } - else { - LC_ASSERT(queue->queueSize > 0); - PRTP_QUEUE_ENTRY oldTail = queue->queueTail; - newEntry->prev = oldTail; - LC_ASSERT(oldTail->next == NULL); - oldTail->next = newEntry; - queue->queueTail = newEntry; - } - queue->queueSize++; - - return true; -} - -static void updateOldestQueued(PRTP_REORDER_QUEUE queue) { - PRTP_QUEUE_ENTRY entry; - - queue->oldestQueuedTimeMs = UINT64_MAX; - - entry = queue->queueHead; - while (entry != NULL) { - if (entry->queueTimeMs < queue->oldestQueuedTimeMs) { - queue->oldestQueuedTimeMs = entry->queueTimeMs; - } - - entry = entry->next; - } -} - -static PRTP_QUEUE_ENTRY getEntryByLowestSeq(PRTP_REORDER_QUEUE queue) { - PRTP_QUEUE_ENTRY lowestSeqEntry, entry; - - lowestSeqEntry = queue->queueHead; - entry = queue->queueHead; - while (entry != NULL) { - if (isBefore16(entry->packet->sequenceNumber, lowestSeqEntry->packet->sequenceNumber)) { - lowestSeqEntry = entry; - } - - entry = entry->next; - } - - // Remember the updated lowest sequence number - if (lowestSeqEntry != NULL) { - queue->nextRtpSequenceNumber = lowestSeqEntry->packet->sequenceNumber; - } - - return lowestSeqEntry; -} - -static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) { - LC_ASSERT(entry != NULL); - LC_ASSERT(queue->queueSize > 0); - LC_ASSERT(queue->queueHead != NULL); - LC_ASSERT(queue->queueTail != NULL); - - if (queue->queueHead == entry) { - queue->queueHead = entry->next; - } - if (queue->queueTail == entry) { - queue->queueTail = entry->prev; - } - - if (entry->prev != NULL) { - entry->prev->next = entry->next; - } - if (entry->next != NULL) { - entry->next->prev = entry->prev; - } - queue->queueSize--; -} - -static PRTP_QUEUE_ENTRY enforceQueueConstraints(PRTP_REORDER_QUEUE queue) { - bool dequeuePacket = false; - - // Empty queue is fine - if (queue->queueHead == NULL) { - return NULL; - } - - // Check that the queue's time constraint is satisfied - if (PltGetMillis() - queue->oldestQueuedTimeMs > queue->maxQueueTimeMs) { - Limelog("Returning RTP packet queued for too long\n"); - dequeuePacket = true; - } - - // Check that the queue's size constraint is satisfied. We subtract one - // because this is validating that the queue will meet constraints _after_ - // the current packet is enqueued. - if (!dequeuePacket && queue->queueSize == queue->maxSize - 1) { - Limelog("Returning RTP packet after queue overgrowth\n"); - dequeuePacket = true; - } - - if (dequeuePacket) { - // Return the lowest seq queued - return getEntryByLowestSeq(queue); - } - else { - return NULL; - } -} - -int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry) { - if (queue->nextRtpSequenceNumber != UINT16_MAX && - isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) { - // Reject packets behind our current sequence number - return 0; - } - - if (queue->queueHead == NULL) { - // Return immediately for an exact match with an empty queue - if (queue->nextRtpSequenceNumber == UINT16_MAX || - packet->sequenceNumber == queue->nextRtpSequenceNumber) { - queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; - return RTPQ_RET_HANDLE_NOW; - } - else { - // Queue is empty currently so we'll put this packet on there - if (!queuePacket(queue, packetEntry, false, packet)) { - return 0; - } - else { - return RTPQ_RET_PACKET_CONSUMED; - } - } - } - else { - PRTP_QUEUE_ENTRY lowestEntry; - - // Validate that the queue remains within our contraints - // and get the lowest element - lowestEntry = enforceQueueConstraints(queue); - - // If the queue is now empty after validating queue constraints, - // this packet can be returned immediately - if (lowestEntry == NULL && queue->queueHead == NULL) { - queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; - return RTPQ_RET_HANDLE_NOW; - } - else if (lowestEntry != NULL && queue->nextRtpSequenceNumber != UINT16_MAX && - isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) { - // The queue constraints were enforced and a new lowest entry was - // made available for retrieval. This packet was behind the new lowest - // so it will not be consumed by the queue. - return RTPQ_RET_PACKET_READY; - } - - // Queue has data inside, so we need to see where this packet fits - if (packet->sequenceNumber == queue->nextRtpSequenceNumber) { - // It fits in a hole where we need a packet, now we have some ready - if (!queuePacket(queue, packetEntry, false, packet)) { - return 0; - } - else { - return RTPQ_RET_PACKET_READY | RTPQ_RET_PACKET_CONSUMED; - } - } - else { - if (!queuePacket(queue, packetEntry, false, packet)) { - return 0; - } - else { - // Constraint validation may have changed the oldest packet to one that - // matches the next sequence number - return RTPQ_RET_PACKET_CONSUMED | ((lowestEntry != NULL) ? RTPQ_RET_PACKET_READY : 0); - } - } - } -} - -PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue) { - PRTP_QUEUE_ENTRY queuedEntry, entry; - - // Find the next queued packet - queuedEntry = NULL; - entry = queue->queueHead; - while (entry != NULL) { - if (entry->packet->sequenceNumber == queue->nextRtpSequenceNumber) { - queue->nextRtpSequenceNumber++; - queuedEntry = entry; - removeEntry(queue, entry); - break; - } - - entry = entry->next; - } - - // Bail if we found nothing - if (queuedEntry == NULL) { - // Update the oldest queued packet time - updateOldestQueued(queue); - - return NULL; - } - - // We don't update the oldest queued entry here, because we know - // the caller will call again until it receives null - - return queuedEntry->packet; -} diff --git a/src/RtpReorderQueue.h b/src/RtpReorderQueue.h deleted file mode 100644 index 947104a..0000000 --- a/src/RtpReorderQueue.h +++ /dev/null @@ -1,41 +0,0 @@ -#pragma once - -#include "Video.h" - -#define RTPQ_DEFAULT_MAX_SIZE 16 -#define RTPQ_DEFAULT_QUEUE_TIME 40 - -typedef struct _RTP_QUEUE_ENTRY { - PRTP_PACKET packet; - - uint64_t queueTimeMs; - - struct _RTP_QUEUE_ENTRY* next; - struct _RTP_QUEUE_ENTRY* prev; -} RTP_QUEUE_ENTRY, *PRTP_QUEUE_ENTRY; - -typedef struct _RTP_REORDER_QUEUE { - PRTP_QUEUE_ENTRY queueHead; - PRTP_QUEUE_ENTRY queueTail; - - uint64_t oldestQueuedTimeMs; - - uint32_t maxQueueTimeMs; - int maxSize; - int queueSize; - - uint16_t nextRtpSequenceNumber; -} RTP_REORDER_QUEUE, *PRTP_REORDER_QUEUE; - -#define RTPQ_RET_PACKET_CONSUMED 0x1 -#define RTPQ_RET_PACKET_READY 0x2 -#define RTPQ_RET_HANDLE_NOW 0x4 - -#define RTPQ_PACKET_CONSUMED(x) ((x) & RTPQ_RET_PACKET_CONSUMED) -#define RTPQ_PACKET_READY(x) ((x) & RTPQ_RET_PACKET_READY) -#define RTPQ_HANDLE_NOW(x) ((x) == RTPQ_RET_HANDLE_NOW) - -void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs); -void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue); -int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry); -PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue);