mirror of
https://github.com/moonlight-stream/moonlight-common-c.git
synced 2025-08-17 17:05:50 +00:00
257 lines
6.7 KiB
C
257 lines
6.7 KiB
C
#include "Limelight-internal.h"
|
|
#include "RtpReorderQueue.h"
|
|
|
|
void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs) {
|
|
queue->maxSize = maxSize;
|
|
queue->maxQueueTimeMs = maxQueueTimeMs;
|
|
queue->queueHead = NULL;
|
|
queue->queueTail = NULL;
|
|
queue->nextRtpSequenceNumber = UINT16_MAX;
|
|
queue->oldestQueuedTimeMs = UINT64_MAX;
|
|
queue->oldestQueuedEntry = NULL;
|
|
}
|
|
|
|
void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue) {
|
|
while (queue->queueHead != NULL) {
|
|
PRTP_QUEUE_ENTRY entry = queue->queueHead;
|
|
queue->queueHead = entry->next;
|
|
free(entry);
|
|
}
|
|
}
|
|
|
|
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
|
|
static int queuePacket(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet) {
|
|
if (queue->nextRtpSequenceNumber != UINT16_MAX) {
|
|
PRTP_QUEUE_ENTRY entry;
|
|
|
|
// Don't queue packets we're already ahead of
|
|
if (isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) {
|
|
return 0;
|
|
}
|
|
|
|
// Don't queue duplicates either
|
|
entry = queue->queueHead;
|
|
while (entry != NULL) {
|
|
if (entry->packet->sequenceNumber == packet->sequenceNumber) {
|
|
return 0;
|
|
}
|
|
|
|
entry = entry->next;
|
|
}
|
|
}
|
|
|
|
newEntry->packet = packet;
|
|
newEntry->queueTimeMs = PltGetMillis();
|
|
newEntry->prev = NULL;
|
|
newEntry->next = NULL;
|
|
|
|
if (queue->oldestQueuedTimeMs == UINT64_MAX) {
|
|
queue->oldestQueuedTimeMs = newEntry->queueTimeMs;
|
|
}
|
|
|
|
if (queue->queueHead == NULL) {
|
|
LC_ASSERT(queue->queueSize == 0);
|
|
queue->queueHead = queue->queueTail = newEntry;
|
|
}
|
|
else if (head) {
|
|
LC_ASSERT(queue->queueSize > 0);
|
|
PRTP_QUEUE_ENTRY oldHead = queue->queueHead;
|
|
newEntry->next = oldHead;
|
|
LC_ASSERT(oldHead->prev == NULL);
|
|
oldHead->prev = newEntry;
|
|
queue->queueHead = newEntry;
|
|
}
|
|
else {
|
|
LC_ASSERT(queue->queueSize > 0);
|
|
PRTP_QUEUE_ENTRY oldTail = queue->queueTail;
|
|
newEntry->prev = oldTail;
|
|
LC_ASSERT(oldTail->next == NULL);
|
|
oldTail->next = newEntry;
|
|
queue->queueTail = newEntry;
|
|
}
|
|
queue->queueSize++;
|
|
|
|
return 1;
|
|
}
|
|
|
|
static void updateOldestQueued(PRTP_REORDER_QUEUE queue) {
|
|
PRTP_QUEUE_ENTRY entry;
|
|
|
|
queue->oldestQueuedTimeMs = UINT64_MAX;
|
|
queue->oldestQueuedEntry = NULL;
|
|
|
|
entry = queue->queueHead;
|
|
while (entry != NULL) {
|
|
if (entry->queueTimeMs < queue->oldestQueuedTimeMs) {
|
|
queue->oldestQueuedEntry = entry;
|
|
queue->oldestQueuedTimeMs = entry->queueTimeMs;
|
|
}
|
|
|
|
entry = entry->next;
|
|
}
|
|
}
|
|
|
|
static PRTP_QUEUE_ENTRY getEntryByLowestSeq(PRTP_REORDER_QUEUE queue) {
|
|
PRTP_QUEUE_ENTRY lowestSeqEntry, entry;
|
|
|
|
lowestSeqEntry = queue->queueHead;
|
|
entry = queue->queueHead;
|
|
while (entry != NULL) {
|
|
if (isBeforeSignedInt(entry->packet->sequenceNumber, lowestSeqEntry->packet->sequenceNumber, 1)) {
|
|
lowestSeqEntry = entry;
|
|
}
|
|
|
|
entry = entry->next;
|
|
}
|
|
|
|
// Remember the updated lowest sequence number
|
|
if (lowestSeqEntry != NULL) {
|
|
queue->nextRtpSequenceNumber = lowestSeqEntry->packet->sequenceNumber;
|
|
}
|
|
|
|
return lowestSeqEntry;
|
|
}
|
|
|
|
static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) {
|
|
LC_ASSERT(entry != NULL);
|
|
LC_ASSERT(queue->queueSize > 0);
|
|
LC_ASSERT(queue->queueHead != NULL);
|
|
LC_ASSERT(queue->queueTail != NULL);
|
|
|
|
if (queue->queueHead == entry) {
|
|
queue->queueHead = entry->next;
|
|
}
|
|
if (queue->queueTail == entry) {
|
|
queue->queueTail = entry->prev;
|
|
}
|
|
|
|
if (entry->prev != NULL) {
|
|
entry->prev->next = entry->next;
|
|
}
|
|
if (entry->next != NULL) {
|
|
entry->next->prev = entry->prev;
|
|
}
|
|
queue->queueSize--;
|
|
}
|
|
|
|
static PRTP_QUEUE_ENTRY validateQueueConstraints(PRTP_REORDER_QUEUE queue) {
|
|
int needsUpdate = 0;
|
|
|
|
// Empty queue is fine
|
|
if (queue->queueHead == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
// Check that the queue's time constraint is satisfied
|
|
if (PltGetMillis() - queue->oldestQueuedTimeMs > queue->maxQueueTimeMs) {
|
|
Limelog("Discarding RTP packet queued for too long");
|
|
removeEntry(queue, queue->oldestQueuedEntry);
|
|
free(queue->oldestQueuedEntry->packet);
|
|
needsUpdate = 1;
|
|
}
|
|
|
|
// Check that the queue's size constraint is satisfied
|
|
if (!needsUpdate && queue->queueSize == queue->maxSize) {
|
|
Limelog("Discarding RTP packet after queue overgrowth");
|
|
removeEntry(queue, queue->oldestQueuedEntry);
|
|
free(queue->oldestQueuedEntry->packet);
|
|
needsUpdate = 1;
|
|
}
|
|
|
|
if (needsUpdate) {
|
|
// Recalculate the oldest entry if needed
|
|
updateOldestQueued(queue);
|
|
|
|
// Return the lowest seq queued
|
|
return getEntryByLowestSeq(queue);
|
|
}
|
|
else {
|
|
return NULL;
|
|
}
|
|
}
|
|
|
|
int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry) {
|
|
if (queue->nextRtpSequenceNumber != UINT16_MAX &&
|
|
isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) {
|
|
// Reject packets behind our current sequence number
|
|
return RTPQ_RET_REJECTED;
|
|
}
|
|
|
|
if (queue->queueHead == NULL) {
|
|
// Return immediately for an exact match with an empty queue
|
|
if (queue->nextRtpSequenceNumber == UINT16_MAX ||
|
|
packet->sequenceNumber == queue->nextRtpSequenceNumber) {
|
|
queue->nextRtpSequenceNumber = packet->sequenceNumber + 1;
|
|
return RTPQ_RET_HANDLE_IMMEDIATELY;
|
|
}
|
|
else {
|
|
// Queue is empty currently so we'll put this packet on there
|
|
if (!queuePacket(queue, packetEntry, 0, packet)) {
|
|
return RTPQ_RET_REJECTED;
|
|
}
|
|
else {
|
|
return RTPQ_RET_QUEUED_NOTHING_READY;
|
|
}
|
|
}
|
|
}
|
|
else {
|
|
PRTP_QUEUE_ENTRY lowestEntry;
|
|
|
|
// Validate that the queue remains within our contraints
|
|
// and get the lowest element
|
|
lowestEntry = validateQueueConstraints(queue);
|
|
|
|
// Queue has data inside, so we need to see where this packet fits
|
|
if (packet->sequenceNumber == queue->nextRtpSequenceNumber) {
|
|
// It fits in a hole where we need a packet, now we have some ready
|
|
if (!queuePacket(queue, packetEntry, 0, packet)) {
|
|
return RTPQ_RET_REJECTED;
|
|
}
|
|
else {
|
|
return RTPQ_RET_QUEUED_PACKETS_READY;
|
|
}
|
|
}
|
|
else {
|
|
if (!queuePacket(queue, packetEntry, 0, packet)) {
|
|
return RTPQ_RET_REJECTED;
|
|
}
|
|
else {
|
|
// Constraint validation may have changed the oldest packet to one that
|
|
// matches the next sequence number
|
|
return (lowestEntry != NULL) ? RTPQ_RET_QUEUED_PACKETS_READY :
|
|
RTPQ_RET_QUEUED_NOTHING_READY;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue) {
|
|
PRTP_QUEUE_ENTRY queuedEntry, entry;
|
|
|
|
// Find the next queued packet
|
|
queuedEntry = NULL;
|
|
entry = queue->queueHead;
|
|
while (entry != NULL) {
|
|
if (entry->packet->sequenceNumber == queue->nextRtpSequenceNumber) {
|
|
queue->nextRtpSequenceNumber++;
|
|
queuedEntry = entry;
|
|
removeEntry(queue, entry);
|
|
break;
|
|
}
|
|
|
|
entry = entry->next;
|
|
}
|
|
|
|
// Bail if we found nothing
|
|
if (queuedEntry == NULL) {
|
|
// Update the oldest queued packet time
|
|
updateOldestQueued(queue);
|
|
|
|
return NULL;
|
|
}
|
|
|
|
// We don't update the oldest queued entry here, because we know
|
|
// the caller will call again until it receives null
|
|
|
|
return queuedEntry->packet;
|
|
} |