#include "Limelight-internal.h" #include "RtpReorderQueue.h" void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs) { queue->maxSize = maxSize; queue->maxQueueTimeMs = maxQueueTimeMs; queue->queueHead = NULL; queue->queueTail = NULL; queue->nextRtpSequenceNumber = UINT16_MAX; queue->oldestQueuedTimeMs = UINT64_MAX; queue->oldestQueuedEntry = NULL; } void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue) { while (queue->queueHead != NULL) { PRTP_QUEUE_ENTRY entry = queue->queueHead; queue->queueHead = entry->next; free(entry->packet); } } // newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet static int queuePacket(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet) { if (queue->nextRtpSequenceNumber != UINT16_MAX) { PRTP_QUEUE_ENTRY entry; // Don't queue packets we're already ahead of if (isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) { return 0; } // Don't queue duplicates either entry = queue->queueHead; while (entry != NULL) { if (entry->packet->sequenceNumber == packet->sequenceNumber) { return 0; } entry = entry->next; } } newEntry->packet = packet; newEntry->queueTimeMs = PltGetMillis(); newEntry->prev = NULL; newEntry->next = NULL; if (queue->oldestQueuedTimeMs == UINT64_MAX) { queue->oldestQueuedTimeMs = newEntry->queueTimeMs; queue->oldestQueuedEntry = newEntry; } if (queue->queueHead == NULL) { LC_ASSERT(queue->queueSize == 0); queue->queueHead = queue->queueTail = newEntry; } else if (head) { LC_ASSERT(queue->queueSize > 0); PRTP_QUEUE_ENTRY oldHead = queue->queueHead; newEntry->next = oldHead; LC_ASSERT(oldHead->prev == NULL); oldHead->prev = newEntry; queue->queueHead = newEntry; } else { LC_ASSERT(queue->queueSize > 0); PRTP_QUEUE_ENTRY oldTail = queue->queueTail; newEntry->prev = oldTail; LC_ASSERT(oldTail->next == NULL); oldTail->next = newEntry; queue->queueTail = newEntry; } queue->queueSize++; return 1; } static void updateOldestQueued(PRTP_REORDER_QUEUE queue) { PRTP_QUEUE_ENTRY entry; queue->oldestQueuedTimeMs = UINT64_MAX; queue->oldestQueuedEntry = NULL; entry = queue->queueHead; while (entry != NULL) { if (entry->queueTimeMs < queue->oldestQueuedTimeMs) { queue->oldestQueuedEntry = entry; queue->oldestQueuedTimeMs = entry->queueTimeMs; } entry = entry->next; } } static PRTP_QUEUE_ENTRY getEntryByLowestSeq(PRTP_REORDER_QUEUE queue) { PRTP_QUEUE_ENTRY lowestSeqEntry, entry; lowestSeqEntry = queue->queueHead; entry = queue->queueHead; while (entry != NULL) { if (isBeforeSignedInt(entry->packet->sequenceNumber, lowestSeqEntry->packet->sequenceNumber, 1)) { lowestSeqEntry = entry; } entry = entry->next; } // Remember the updated lowest sequence number if (lowestSeqEntry != NULL) { queue->nextRtpSequenceNumber = lowestSeqEntry->packet->sequenceNumber; } return lowestSeqEntry; } static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) { LC_ASSERT(entry != NULL); LC_ASSERT(queue->queueSize > 0); LC_ASSERT(queue->queueHead != NULL); LC_ASSERT(queue->queueTail != NULL); if (queue->queueHead == entry) { queue->queueHead = entry->next; } if (queue->queueTail == entry) { queue->queueTail = entry->prev; } if (entry->prev != NULL) { entry->prev->next = entry->next; } if (entry->next != NULL) { entry->next->prev = entry->prev; } queue->queueSize--; } static PRTP_QUEUE_ENTRY validateQueueConstraints(PRTP_REORDER_QUEUE queue) { int needsUpdate = 0; // Empty queue is fine if (queue->queueHead == NULL) { return NULL; } // Check that the queue's time constraint is satisfied if (PltGetMillis() - queue->oldestQueuedTimeMs > queue->maxQueueTimeMs) { Limelog("Discarding RTP packet queued for too long\n"); removeEntry(queue, queue->oldestQueuedEntry); free(queue->oldestQueuedEntry->packet); needsUpdate = 1; } // Check that the queue's size constraint is satisfied if (!needsUpdate && queue->queueSize == queue->maxSize) { Limelog("Discarding RTP packet after queue overgrowth\n"); removeEntry(queue, queue->oldestQueuedEntry); free(queue->oldestQueuedEntry->packet); needsUpdate = 1; } if (needsUpdate) { // Recalculate the oldest entry if needed updateOldestQueued(queue); // Return the lowest seq queued return getEntryByLowestSeq(queue); } else { return NULL; } } int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry) { if (queue->nextRtpSequenceNumber != UINT16_MAX && isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) { // Reject packets behind our current sequence number return RTPQ_RET_REJECTED; } if (queue->queueHead == NULL) { // Return immediately for an exact match with an empty queue if (queue->nextRtpSequenceNumber == UINT16_MAX || packet->sequenceNumber == queue->nextRtpSequenceNumber) { queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; return RTPQ_RET_HANDLE_IMMEDIATELY; } else { // Queue is empty currently so we'll put this packet on there if (!queuePacket(queue, packetEntry, 0, packet)) { return RTPQ_RET_REJECTED; } else { return RTPQ_RET_QUEUED_NOTHING_READY; } } } else { PRTP_QUEUE_ENTRY lowestEntry; // Validate that the queue remains within our contraints // and get the lowest element lowestEntry = validateQueueConstraints(queue); // If the queue is now empty after validating queue constraints, // this packet can be returned immediately if (lowestEntry == NULL && queue->queueHead == NULL) { queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; return RTPQ_RET_HANDLE_IMMEDIATELY; } // Queue has data inside, so we need to see where this packet fits if (packet->sequenceNumber == queue->nextRtpSequenceNumber) { // It fits in a hole where we need a packet, now we have some ready if (!queuePacket(queue, packetEntry, 0, packet)) { return RTPQ_RET_REJECTED; } else { return RTPQ_RET_QUEUED_PACKETS_READY; } } else { if (!queuePacket(queue, packetEntry, 0, packet)) { return RTPQ_RET_REJECTED; } else { // Constraint validation may have changed the oldest packet to one that // matches the next sequence number return (lowestEntry != NULL) ? RTPQ_RET_QUEUED_PACKETS_READY : RTPQ_RET_QUEUED_NOTHING_READY; } } } } PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue) { PRTP_QUEUE_ENTRY queuedEntry, entry; // Find the next queued packet queuedEntry = NULL; entry = queue->queueHead; while (entry != NULL) { if (entry->packet->sequenceNumber == queue->nextRtpSequenceNumber) { queue->nextRtpSequenceNumber++; queuedEntry = entry; removeEntry(queue, entry); break; } entry = entry->next; } // Bail if we found nothing if (queuedEntry == NULL) { // Update the oldest queued packet time updateOldestQueued(queue); return NULL; } // We don't update the oldest queued entry here, because we know // the caller will call again until it receives null return queuedEntry->packet; }