diff --git a/src/AudioStream.c b/src/AudioStream.c index c42ab5d..9400ef5 100644 --- a/src/AudioStream.c +++ b/src/AudioStream.c @@ -200,7 +200,7 @@ static void ReceiveThreadProc(void* context) { rtp->sequenceNumber = htons(rtp->sequenceNumber); queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET)packet, &packet->q.rentry); - if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) { + if (RTPQ_HANDLE_NOW(queueStatus)) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if (!queuePacketToLbq(&packet)) { // An exit signal was received @@ -212,12 +212,12 @@ static void ReceiveThreadProc(void* context) { } } else { - if (queueStatus != RTPQ_RET_REJECTED) { + if (RTPQ_PACKET_CONSUMED(queueStatus)) { // The queue consumed our packet, so we must allocate a new one packet = NULL; } - if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) { + if (RTPQ_PACKET_READY(queueStatus)) { // If packets are ready, pull them and send them to the decoder while ((packet = (PQUEUED_AUDIO_PACKET)RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { diff --git a/src/RtpReorderQueue.c b/src/RtpReorderQueue.c index 5bd0a8a..d0267ec 100644 --- a/src/RtpReorderQueue.c +++ b/src/RtpReorderQueue.c @@ -125,7 +125,7 @@ static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) { queue->queueSize--; } -static PRTP_QUEUE_ENTRY validateQueueConstraints(PRTP_REORDER_QUEUE queue) { +static PRTP_QUEUE_ENTRY enforceQueueConstraints(PRTP_REORDER_QUEUE queue) { int dequeuePacket = 0; // Empty queue is fine @@ -160,7 +160,7 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY if (queue->nextRtpSequenceNumber != UINT16_MAX && isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) { // Reject packets behind our current sequence number - return RTPQ_RET_REJECTED; + return 0; } if (queue->queueHead == NULL) { @@ -168,15 +168,15 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY if (queue->nextRtpSequenceNumber == UINT16_MAX || packet->sequenceNumber == queue->nextRtpSequenceNumber) { queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; - return RTPQ_RET_HANDLE_IMMEDIATELY; + return RTPQ_RET_HANDLE_NOW; } else { // Queue is empty currently so we'll put this packet on there if (!queuePacket(queue, packetEntry, 0, packet)) { - return RTPQ_RET_REJECTED; + return 0; } else { - return RTPQ_RET_QUEUED_NOTHING_READY; + return RTPQ_RET_PACKET_CONSUMED; } } } @@ -185,34 +185,40 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY // Validate that the queue remains within our contraints // and get the lowest element - lowestEntry = validateQueueConstraints(queue); + lowestEntry = enforceQueueConstraints(queue); // If the queue is now empty after validating queue constraints, // this packet can be returned immediately if (lowestEntry == NULL && queue->queueHead == NULL) { queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; - return RTPQ_RET_HANDLE_IMMEDIATELY; + return RTPQ_RET_HANDLE_NOW; + } + else if (lowestEntry != NULL && queue->nextRtpSequenceNumber != UINT16_MAX && + isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) { + // The queue constraints were enforced and a new lowest entry was + // made available for retrieval. This packet was behind the new lowest + // so it will not be consumed by the queue. + return RTPQ_RET_PACKET_READY; } // Queue has data inside, so we need to see where this packet fits if (packet->sequenceNumber == queue->nextRtpSequenceNumber) { // It fits in a hole where we need a packet, now we have some ready if (!queuePacket(queue, packetEntry, 0, packet)) { - return RTPQ_RET_REJECTED; + return 0; } else { - return RTPQ_RET_QUEUED_PACKETS_READY; + return RTPQ_RET_PACKET_READY | RTPQ_RET_PACKET_CONSUMED; } } else { if (!queuePacket(queue, packetEntry, 0, packet)) { - return RTPQ_RET_REJECTED; + return 0; } else { // Constraint validation may have changed the oldest packet to one that // matches the next sequence number - return (lowestEntry != NULL) ? RTPQ_RET_QUEUED_PACKETS_READY : - RTPQ_RET_QUEUED_NOTHING_READY; + return RTPQ_RET_PACKET_CONSUMED | ((lowestEntry != NULL) ? RTPQ_RET_PACKET_READY : 0); } } } diff --git a/src/RtpReorderQueue.h b/src/RtpReorderQueue.h index 10624aa..a24eee8 100644 --- a/src/RtpReorderQueue.h +++ b/src/RtpReorderQueue.h @@ -27,10 +27,13 @@ typedef struct _RTP_REORDER_QUEUE { uint64_t oldestQueuedTimeMs; } RTP_REORDER_QUEUE, *PRTP_REORDER_QUEUE; -#define RTPQ_RET_HANDLE_IMMEDIATELY 0 -#define RTPQ_RET_QUEUED_NOTHING_READY 1 -#define RTPQ_RET_QUEUED_PACKETS_READY 2 -#define RTPQ_RET_REJECTED 3 +#define RTPQ_RET_PACKET_CONSUMED 0x1 +#define RTPQ_RET_PACKET_READY 0x2 +#define RTPQ_RET_HANDLE_NOW 0x4 + +#define RTPQ_PACKET_CONSUMED(x) ((x) & RTPQ_RET_PACKET_CONSUMED) +#define RTPQ_PACKET_READY(x) ((x) & RTPQ_RET_PACKET_READY) +#define RTPQ_HANDLE_NOW(x) ((x) == RTPQ_RET_HANDLE_NOW) void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs); void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue);