Fix queue state error (and subsequent assert) when the queue advances after a constraint violation upon reception of a packet behind the new lowest entry

This commit is contained in:
Cameron Gutman
2018-08-29 08:24:58 -07:00
parent feb46b978e
commit b9290449b4
3 changed files with 28 additions and 19 deletions
+3 -3
View File
@@ -200,7 +200,7 @@ static void ReceiveThreadProc(void* context) {
rtp->sequenceNumber = htons(rtp->sequenceNumber); rtp->sequenceNumber = htons(rtp->sequenceNumber);
queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET)packet, &packet->q.rentry); queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET)packet, &packet->q.rentry);
if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) { if (RTPQ_HANDLE_NOW(queueStatus)) {
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) { if (!queuePacketToLbq(&packet)) {
// An exit signal was received // An exit signal was received
@@ -212,12 +212,12 @@ static void ReceiveThreadProc(void* context) {
} }
} }
else { else {
if (queueStatus != RTPQ_RET_REJECTED) { if (RTPQ_PACKET_CONSUMED(queueStatus)) {
// The queue consumed our packet, so we must allocate a new one // The queue consumed our packet, so we must allocate a new one
packet = NULL; packet = NULL;
} }
if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) { if (RTPQ_PACKET_READY(queueStatus)) {
// If packets are ready, pull them and send them to the decoder // If packets are ready, pull them and send them to the decoder
while ((packet = (PQUEUED_AUDIO_PACKET)RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) { while ((packet = (PQUEUED_AUDIO_PACKET)RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) {
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
+18 -12
View File
@@ -125,7 +125,7 @@ static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) {
queue->queueSize--; queue->queueSize--;
} }
static PRTP_QUEUE_ENTRY validateQueueConstraints(PRTP_REORDER_QUEUE queue) { static PRTP_QUEUE_ENTRY enforceQueueConstraints(PRTP_REORDER_QUEUE queue) {
int dequeuePacket = 0; int dequeuePacket = 0;
// Empty queue is fine // Empty queue is fine
@@ -160,7 +160,7 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY
if (queue->nextRtpSequenceNumber != UINT16_MAX && if (queue->nextRtpSequenceNumber != UINT16_MAX &&
isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) { isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) {
// Reject packets behind our current sequence number // Reject packets behind our current sequence number
return RTPQ_RET_REJECTED; return 0;
} }
if (queue->queueHead == NULL) { if (queue->queueHead == NULL) {
@@ -168,15 +168,15 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY
if (queue->nextRtpSequenceNumber == UINT16_MAX || if (queue->nextRtpSequenceNumber == UINT16_MAX ||
packet->sequenceNumber == queue->nextRtpSequenceNumber) { packet->sequenceNumber == queue->nextRtpSequenceNumber) {
queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; queue->nextRtpSequenceNumber = packet->sequenceNumber + 1;
return RTPQ_RET_HANDLE_IMMEDIATELY; return RTPQ_RET_HANDLE_NOW;
} }
else { else {
// Queue is empty currently so we'll put this packet on there // Queue is empty currently so we'll put this packet on there
if (!queuePacket(queue, packetEntry, 0, packet)) { if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED; return 0;
} }
else { else {
return RTPQ_RET_QUEUED_NOTHING_READY; return RTPQ_RET_PACKET_CONSUMED;
} }
} }
} }
@@ -185,34 +185,40 @@ int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY
// Validate that the queue remains within our contraints // Validate that the queue remains within our contraints
// and get the lowest element // and get the lowest element
lowestEntry = validateQueueConstraints(queue); lowestEntry = enforceQueueConstraints(queue);
// If the queue is now empty after validating queue constraints, // If the queue is now empty after validating queue constraints,
// this packet can be returned immediately // this packet can be returned immediately
if (lowestEntry == NULL && queue->queueHead == NULL) { if (lowestEntry == NULL && queue->queueHead == NULL) {
queue->nextRtpSequenceNumber = packet->sequenceNumber + 1; queue->nextRtpSequenceNumber = packet->sequenceNumber + 1;
return RTPQ_RET_HANDLE_IMMEDIATELY; return RTPQ_RET_HANDLE_NOW;
}
else if (lowestEntry != NULL && queue->nextRtpSequenceNumber != UINT16_MAX &&
isBefore16(packet->sequenceNumber, queue->nextRtpSequenceNumber)) {
// The queue constraints were enforced and a new lowest entry was
// made available for retrieval. This packet was behind the new lowest
// so it will not be consumed by the queue.
return RTPQ_RET_PACKET_READY;
} }
// Queue has data inside, so we need to see where this packet fits // Queue has data inside, so we need to see where this packet fits
if (packet->sequenceNumber == queue->nextRtpSequenceNumber) { if (packet->sequenceNumber == queue->nextRtpSequenceNumber) {
// It fits in a hole where we need a packet, now we have some ready // It fits in a hole where we need a packet, now we have some ready
if (!queuePacket(queue, packetEntry, 0, packet)) { if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED; return 0;
} }
else { else {
return RTPQ_RET_QUEUED_PACKETS_READY; return RTPQ_RET_PACKET_READY | RTPQ_RET_PACKET_CONSUMED;
} }
} }
else { else {
if (!queuePacket(queue, packetEntry, 0, packet)) { if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED; return 0;
} }
else { else {
// Constraint validation may have changed the oldest packet to one that // Constraint validation may have changed the oldest packet to one that
// matches the next sequence number // matches the next sequence number
return (lowestEntry != NULL) ? RTPQ_RET_QUEUED_PACKETS_READY : return RTPQ_RET_PACKET_CONSUMED | ((lowestEntry != NULL) ? RTPQ_RET_PACKET_READY : 0);
RTPQ_RET_QUEUED_NOTHING_READY;
} }
} }
} }
+7 -4
View File
@@ -27,10 +27,13 @@ typedef struct _RTP_REORDER_QUEUE {
uint64_t oldestQueuedTimeMs; uint64_t oldestQueuedTimeMs;
} RTP_REORDER_QUEUE, *PRTP_REORDER_QUEUE; } RTP_REORDER_QUEUE, *PRTP_REORDER_QUEUE;
#define RTPQ_RET_HANDLE_IMMEDIATELY 0 #define RTPQ_RET_PACKET_CONSUMED 0x1
#define RTPQ_RET_QUEUED_NOTHING_READY 1 #define RTPQ_RET_PACKET_READY 0x2
#define RTPQ_RET_QUEUED_PACKETS_READY 2 #define RTPQ_RET_HANDLE_NOW 0x4
#define RTPQ_RET_REJECTED 3
#define RTPQ_PACKET_CONSUMED(x) ((x) & RTPQ_RET_PACKET_CONSUMED)
#define RTPQ_PACKET_READY(x) ((x) & RTPQ_RET_PACKET_READY)
#define RTPQ_HANDLE_NOW(x) ((x) == RTPQ_RET_HANDLE_NOW)
void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs); void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs);
void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue); void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue);