Add an RTP reordering queue to handle out of order packets

This commit is contained in:
Cameron Gutman 2015-05-28 08:32:51 -05:00
parent e50df23a0a
commit 956d6bb217
11 changed files with 443 additions and 48 deletions

View File

@ -2,6 +2,7 @@
#include "PlatformSockets.h"
#include "PlatformThreads.h"
#include "LinkedBlockingQueue.h"
#include "RtpReorderQueue.h"
static AUDIO_RENDERER_CALLBACKS callbacks;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
@ -10,6 +11,7 @@ static IP_ADDRESS remoteHost;
static SOCKET rtpSocket = INVALID_SOCKET;
static LINKED_BLOCKING_QUEUE packetQueue;
static RTP_REORDER_QUEUE rtpReorderQueue;
static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
@ -20,9 +22,14 @@ static PLT_THREAD decoderThread;
#define MAX_PACKET_SIZE 100
typedef struct _QUEUED_AUDIO_PACKET {
// data must remain at the front
char data[MAX_PACKET_SIZE];
int size;
LINKED_BLOCKING_QUEUE_ENTRY entry;
union {
RTP_QUEUE_ENTRY rentry;
LINKED_BLOCKING_QUEUE_ENTRY lentry;
} q;
} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET;
/* Initialize the audio stream */
@ -32,6 +39,7 @@ void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallback
listenerCallbacks = clCallbacks;
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFUALT_QUEUE_TIME);
}
static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
@ -52,6 +60,7 @@ void destroyAudioStream(void) {
callbacks.release();
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
RtpqCleanupQueue(&rtpReorderQueue);
}
static void UdpPingThreadProc(void *context) {
@ -78,10 +87,31 @@ static void UdpPingThreadProc(void *context) {
}
}
static int queuePacketToLbq(PQUEUED_AUDIO_PACKET *packet) {
int err;
err = LbqOfferQueueItem(&packetQueue, packet, &(*packet)->q.lentry);
if (err == LBQ_SUCCESS) {
// The LBQ owns the buffer now
*packet = NULL;
}
else if (err == LBQ_BOUND_EXCEEDED) {
Limelog("Audio packet queue overflow\n");
freePacketList(LbqFlushQueueItems(&packetQueue));
}
else if (err == LBQ_INTERRUPTED) {
Limelog("Receive thread terminating #3\n");
free(*packet);
return 0;
}
return 1;
}
static void ReceiveThreadProc(void* context) {
PRTP_PACKET rtp;
PQUEUED_AUDIO_PACKET packet;
int err;
int queueStatus;
packet = NULL;
@ -114,20 +144,28 @@ static void ReceiveThreadProc(void* context) {
continue;
}
err = LbqOfferQueueItem(&packetQueue, packet, &packet->entry);
if (err == LBQ_SUCCESS) {
// The queue owns the buffer now
packet = NULL;
queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET) packet, &packet->q.rentry);
if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
}
}
else {
if (queueStatus != RTPQ_RET_REJECTED) {
// The queue consumed our packet, so we must allocate a new one
packet = NULL;
}
if (err == LBQ_BOUND_EXCEEDED) {
Limelog("Audio packet queue overflow\n");
freePacketList(LbqFlushQueueItems(&packetQueue));
}
else if (err == LBQ_INTERRUPTED) {
Limelog("Receive thread terminating #2\n");
free(packet);
return;
if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) {
// If packets are ready, pull them and send them to the decoder
while ((packet = (PQUEUED_AUDIO_PACKET) RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
}
}
}
}
}
}

View File

@ -8,6 +8,8 @@
extern int serverMajorVersion;
int isBeforeSignedInt(int numA, int numB, int ambiguousCase);
void fixupMissingCallbacks(PDECODER_RENDERER_CALLBACKS *drCallbacks, PAUDIO_RENDERER_CALLBACKS *arCallbacks,
PCONNECTION_LISTENER_CALLBACKS *clCallbacks, PPLATFORM_CALLBACKS *plCallbacks);

18
limelight-common/Misc.c Normal file
View File

@ -0,0 +1,18 @@
#include "Limelight-internal.h"
int isBeforeSignedInt(int numA, int numB, int ambiguousCase) {
// This should be the common case for most callers
if (numA == numB) {
return 0;
}
// If numA and numB have the same signs,
// we can just do a regular comparison.
if ((numA < 0 && numB < 0) || (numA >= 0 && numB >= 0)) {
return numA < numB;
}
else {
// The sign switch is ambiguous
return ambiguousCase;
}
}

View File

@ -310,6 +310,18 @@ int PltWaitForEvent(PLT_EVENT *event) {
#endif
}
uint64_t PltGetMillis(void) {
#if defined(LC_WINDOWS) || defined(LC_WINDOWS_PHONE)
return GetTickCount64();
#else
struct timeval tv;
gettimeofday(&tv, NULL);
return (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
#endif
}
int initializePlatform(void) {
int err;

View File

@ -2,6 +2,7 @@
#include <stdlib.h>
#include <string.h>
#include <stdint.h>
#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
@ -11,6 +12,7 @@
#else
#include <unistd.h>
#include <pthread.h>
#include <sys/time.h>
#endif
#ifdef _WIN32
@ -48,4 +50,6 @@ extern PLATFORM_CALLBACKS platformCallbacks;
#endif
int initializePlatform(void);
void cleanupPlatform(void);
void cleanupPlatform(void);
uint64_t PltGetMillis(void);

View File

@ -0,0 +1,257 @@
#include "Limelight-internal.h"
#include "RtpReorderQueue.h"
void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs) {
queue->maxSize = maxSize;
queue->maxQueueTimeMs = maxQueueTimeMs;
queue->queueHead = NULL;
queue->queueTail = NULL;
queue->nextRtpSequenceNumber = UINT16_MAX;
queue->oldestQueuedTimeMs = UINT64_MAX;
queue->oldestQueuedEntry = NULL;
}
void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue) {
while (queue->queueHead != NULL) {
PRTP_QUEUE_ENTRY entry = queue->queueHead;
queue->queueHead = entry->next;
free(entry);
}
}
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
static int queuePacket(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet) {
if (queue->nextRtpSequenceNumber != UINT16_MAX) {
PRTP_QUEUE_ENTRY entry;
// Don't queue packets we're already ahead of
if (isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) {
return 0;
}
// Don't queue duplicates either
entry = queue->queueHead;
while (entry != NULL) {
if (entry->packet->sequenceNumber == packet->sequenceNumber) {
return 0;
}
entry = entry->next;
}
}
newEntry->packet = packet;
newEntry->queueTimeMs = PltGetMillis();
newEntry->prev = NULL;
newEntry->next = NULL;
if (queue->oldestQueuedTimeMs == UINT64_MAX) {
queue->oldestQueuedTimeMs = newEntry->queueTimeMs;
}
if (queue->queueHead == NULL) {
LC_ASSERT(queue->queueSize == 0);
queue->queueHead = queue->queueTail = newEntry;
}
else if (head) {
LC_ASSERT(queue->queueSize > 0);
PRTP_QUEUE_ENTRY oldHead = queue->queueHead;
newEntry->next = oldHead;
LC_ASSERT(oldHead->prev == NULL);
oldHead->prev = newEntry;
queue->queueHead = newEntry;
}
else {
LC_ASSERT(queue->queueSize > 0);
PRTP_QUEUE_ENTRY oldTail = queue->queueTail;
newEntry->prev = oldTail;
LC_ASSERT(oldTail->next == NULL);
oldTail->next = newEntry;
queue->queueTail = newEntry;
}
queue->queueSize++;
return 1;
}
static void updateOldestQueued(PRTP_REORDER_QUEUE queue) {
PRTP_QUEUE_ENTRY entry;
queue->oldestQueuedTimeMs = UINT64_MAX;
queue->oldestQueuedEntry = NULL;
entry = queue->queueHead;
while (entry != NULL) {
if (entry->queueTimeMs < queue->oldestQueuedTimeMs) {
queue->oldestQueuedEntry = entry;
queue->oldestQueuedTimeMs = entry->queueTimeMs;
}
entry = entry->next;
}
}
static PRTP_QUEUE_ENTRY getEntryByLowestSeq(PRTP_REORDER_QUEUE queue) {
PRTP_QUEUE_ENTRY lowestSeqEntry, entry;
lowestSeqEntry = queue->queueHead;
entry = queue->queueHead;
while (entry != NULL) {
if (isBeforeSignedInt(entry->packet->sequenceNumber, lowestSeqEntry->packet->sequenceNumber, 1)) {
lowestSeqEntry = entry;
}
entry = entry->next;
}
// Remember the updated lowest sequence number
if (lowestSeqEntry != NULL) {
queue->nextRtpSequenceNumber = lowestSeqEntry->packet->sequenceNumber;
}
return lowestSeqEntry;
}
static void removeEntry(PRTP_REORDER_QUEUE queue, PRTP_QUEUE_ENTRY entry) {
LC_ASSERT(entry != NULL);
LC_ASSERT(queue->queueSize > 0);
LC_ASSERT(queue->queueHead != NULL);
LC_ASSERT(queue->queueTail != NULL);
if (queue->queueHead == entry) {
queue->queueHead = entry->next;
}
if (queue->queueTail == entry) {
queue->queueTail = entry->prev;
}
if (entry->prev != NULL) {
entry->prev->next = entry->next;
}
if (entry->next != NULL) {
entry->next->prev = entry->prev;
}
queue->queueSize--;
}
static PRTP_QUEUE_ENTRY validateQueueConstraints(PRTP_REORDER_QUEUE queue) {
int needsUpdate = 0;
// Empty queue is fine
if (queue->queueHead == NULL) {
return NULL;
}
// Check that the queue's time constraint is satisfied
if (PltGetMillis() - queue->oldestQueuedTimeMs > queue->maxQueueTimeMs) {
Limelog("Discarding RTP packet queued for too long");
removeEntry(queue, queue->oldestQueuedEntry);
free(queue->oldestQueuedEntry->packet);
needsUpdate = 1;
}
// Check that the queue's size constraint is satisfied
if (queue->queueSize == queue->maxSize) {
Limelog("Discarding RTP packet after queue overgrowth");
removeEntry(queue, queue->oldestQueuedEntry);
free(queue->oldestQueuedEntry->packet);
needsUpdate = 1;
}
if (needsUpdate) {
// Recalculate the oldest entry if needed
updateOldestQueued(queue);
// Return the lowest seq queued
return getEntryByLowestSeq(queue);
}
else {
return NULL;
}
}
int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry) {
if (queue->nextRtpSequenceNumber != UINT16_MAX &&
isBeforeSignedInt(packet->sequenceNumber, queue->nextRtpSequenceNumber, 0)) {
// Reject packets behind our current sequence number
return RTPQ_RET_REJECTED;
}
if (queue->queueHead == NULL) {
// Return immediately for an exact match with an empty queue
if (queue->nextRtpSequenceNumber == UINT16_MAX ||
packet->sequenceNumber == queue->nextRtpSequenceNumber) {
queue->nextRtpSequenceNumber = packet->sequenceNumber + 1;
return RTPQ_RET_HANDLE_IMMEDIATELY;
}
else {
// Queue is empty currently so we'll put this packet on there
if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED;
}
else {
return RTPQ_RET_QUEUED_NOTHING_READY;
}
}
}
else {
PRTP_QUEUE_ENTRY lowestEntry;
// Validate that the queue remains within our contraints
// and get the lowest element
lowestEntry = validateQueueConstraints(queue);
// Queue has data inside, so we need to see where this packet fits
if (packet->sequenceNumber == queue->nextRtpSequenceNumber) {
// It fits in a hole where we need a packet, now we have some ready
if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED;
}
else {
return RTPQ_RET_QUEUED_PACKETS_READY;
}
}
else {
if (!queuePacket(queue, packetEntry, 0, packet)) {
return RTPQ_RET_REJECTED;
}
else {
// Constraint validation may have changed the oldest packet to one that
// matches the next sequence number
return (lowestEntry != NULL) ? RTPQ_RET_QUEUED_PACKETS_READY :
RTPQ_RET_QUEUED_NOTHING_READY;
}
}
}
}
PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue) {
PRTP_QUEUE_ENTRY queuedEntry, entry;
// Find the next queued packet
queuedEntry = NULL;
entry = queue->queueHead;
while (entry != NULL) {
if (entry->packet->sequenceNumber == queue->nextRtpSequenceNumber) {
queue->nextRtpSequenceNumber++;
queuedEntry = entry;
removeEntry(queue, entry);
break;
}
entry = entry->next;
}
// Bail if we found nothing
if (queuedEntry == NULL) {
// Update the oldest queued packet time
updateOldestQueued(queue);
return NULL;
}
// We don't update the oldest queued entry here, because we know
// the caller will call again until it receives null
return queuedEntry->packet;
}

View File

@ -0,0 +1,39 @@
#pragma once
#include "Video.h"
#define RTPQ_DEFAULT_MAX_SIZE 16
#define RTPQ_DEFUALT_QUEUE_TIME 40
typedef struct _RTP_QUEUE_ENTRY {
PRTP_PACKET packet;
uint64_t queueTimeMs;
struct _RTP_QUEUE_ENTRY *next;
struct _RTP_QUEUE_ENTRY *prev;
} RTP_QUEUE_ENTRY, *PRTP_QUEUE_ENTRY;
typedef struct _RTP_REORDER_QUEUE {
int maxSize;
int maxQueueTimeMs;
PRTP_QUEUE_ENTRY queueHead;
PRTP_QUEUE_ENTRY queueTail;
int queueSize;
unsigned short nextRtpSequenceNumber;
uint64_t oldestQueuedTimeMs;
PRTP_QUEUE_ENTRY oldestQueuedEntry;
} RTP_REORDER_QUEUE, *PRTP_REORDER_QUEUE;
#define RTPQ_RET_HANDLE_IMMEDIATELY 0
#define RTPQ_RET_QUEUED_NOTHING_READY 1
#define RTPQ_RET_QUEUED_PACKETS_READY 2
#define RTPQ_RET_REJECTED 3
void RtpqInitializeQueue(PRTP_REORDER_QUEUE queue, int maxSize, int maxQueueTimeMs);
void RtpqCleanupQueue(PRTP_REORDER_QUEUE queue);
int RtpqAddPacket(PRTP_REORDER_QUEUE queue, PRTP_PACKET packet, PRTP_QUEUE_ENTRY packetEntry);
PRTP_PACKET RtpqGetQueuedPacket(PRTP_REORDER_QUEUE queue);

View File

@ -319,23 +319,6 @@ static void processRtpPayloadFast(PNV_VIDEO_PACKET videoPacket, BUFFER_DESC loca
queueFragment(location.data, location.offset, location.length);
}
static int isBeforeSigned(int numA, int numB, int ambiguousCase) {
// This should be the common case for most callers
if (numA == numB) {
return 0;
}
// If numA and numB have the same signs,
// we can just do a regular comparison.
if ((numA < 0 && numB < 0) || (numA >= 0 && numB >= 0)) {
return numA < numB;
}
else {
// The sign switch is ambiguous
return ambiguousCase;
}
}
/* Process an RTP Payload */
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length) {
BUFFER_DESC currentPos, specialSeq;
@ -358,12 +341,12 @@ void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length) {
// Drop duplicates or re-ordered packets
streamPacketIndex = videoPacket->streamPacketIndex;
if (isBeforeSigned((short) streamPacketIndex, (short) (lastPacketInStream + 1), 0)) {
if (isBeforeSignedInt((short) streamPacketIndex, (short) (lastPacketInStream + 1), 0)) {
return;
}
// Drop packets from a previously completed frame
if (isBeforeSigned(frameIndex, nextFrameNumber, 0)) {
if (isBeforeSignedInt(frameIndex, nextFrameNumber, 0)) {
return;
}
@ -405,7 +388,7 @@ void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length) {
// miss one in between
else if (firstPacket) {
// Make sure this is the next consecutive frame
if (isBeforeSigned(nextFrameNumber, frameIndex, 1)) {
if (isBeforeSignedInt(nextFrameNumber, frameIndex, 1)) {
Limelog("Network dropped an entire frame\n");
nextFrameNumber = frameIndex;

View File

@ -2,6 +2,7 @@
#include "PlatformSockets.h"
#include "PlatformThreads.h"
#include "LinkedBlockingQueue.h"
#include "RtpReorderQueue.h"
#define FIRST_FRAME_MAX 1500
@ -12,6 +13,7 @@ static DECODER_RENDERER_CALLBACKS callbacks;
static STREAM_CONFIGURATION configuration;
static IP_ADDRESS remoteHost;
static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static RTP_REORDER_QUEUE rtpQueue;
static SOCKET rtpSocket = INVALID_SOCKET;
static SOCKET firstFrameSocket = INVALID_SOCKET;
@ -20,6 +22,11 @@ static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
// We can't request an IDR frame until the depacketizer knows
// that a packet was lost. This timeout bounds the time that
// the RTP queue will wait for missing/reordered packets.
#define RTP_QUEUE_DELAY 10
/* Initialize the video stream */
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks,
PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
@ -29,6 +36,7 @@ void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig,
listenerCallbacks = clCallbacks;
initializeVideoDepacketizer(configuration.packetSize);
RtpqInitializeQueue(&rtpQueue, RTPQ_DEFAULT_MAX_SIZE, RTP_QUEUE_DELAY);
}
/* Clean up the video stream */
@ -36,6 +44,7 @@ void destroyVideoStream(void) {
callbacks.release();
destroyVideoDepacketizer();
RtpqCleanupQueue(&rtpQueue);
}
/* UDP Ping proc */
@ -63,31 +72,52 @@ static void UdpPingThreadProc(void *context) {
/* Receive thread proc */
static void ReceiveThreadProc(void* context) {
SOCK_RET err;
int bufferSize;
int err;
int bufferSize, receiveSize;
char* buffer;
int queueStatus;
bufferSize = configuration.packetSize + MAX_RTP_HEADER_SIZE;
buffer = (char*)malloc(bufferSize);
if (buffer == NULL) {
Limelog("Receive thread terminating\n");
listenerCallbacks->connectionTerminated(-1);
return;
}
receiveSize = configuration.packetSize + MAX_RTP_HEADER_SIZE;
bufferSize = receiveSize + sizeof(int) + sizeof(RTP_QUEUE_ENTRY);
buffer = NULL;
while (!PltIsThreadInterrupted(&receiveThread)) {
err = recv(rtpSocket, buffer, bufferSize, 0);
if (buffer == NULL) {
buffer = (char*) malloc(bufferSize);
if (buffer == NULL) {
Limelog("Receive thread terminating\n");
listenerCallbacks->connectionTerminated(-1);
return;
}
}
err = (int) recv(rtpSocket, buffer, receiveSize, 0);
if (err <= 0) {
Limelog("Receive thread terminating #2\n");
listenerCallbacks->connectionTerminated(LastSocketError());
break;
}
// queueRtpPacket() copies the data it needs to we can reuse the buffer
queueRtpPacket((PRTP_PACKET) buffer, (int)err);
memcpy(&buffer[receiveSize], &err, sizeof(int));
queueStatus = RtpqAddPacket(&rtpQueue, (PRTP_PACKET) &buffer[0], (PRTP_QUEUE_ENTRY) &buffer[receiveSize + sizeof(int)]);
if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) {
// queueRtpPacket() copies the data it needs to we can reuse the buffer
queueRtpPacket((PRTP_PACKET) buffer, err);
}
else if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) {
// The packet queue now has packets ready
while ((buffer = (char*) RtpqGetQueuedPacket(&rtpQueue)) != NULL) {
memcpy(&err, &buffer[receiveSize], sizeof(int));
queueRtpPacket((PRTP_PACKET) buffer, err);
free(buffer);
}
}
}
free(buffer);
if (buffer != NULL) {
free(buffer);
}
}
/* Decoder thread proc */

View File

@ -138,10 +138,12 @@
<ClCompile Include="FakeCallbacks.c" />
<ClCompile Include="InputStream.c" />
<ClCompile Include="LinkedBlockingQueue.c" />
<ClCompile Include="Misc.c" />
<ClCompile Include="OpenAES\oaes_base64.c" />
<ClCompile Include="OpenAES\oaes_lib.c" />
<ClCompile Include="PlatformSockets.c" />
<ClCompile Include="Platform.c" />
<ClCompile Include="RtpReorderQueue.c" />
<ClCompile Include="RtspConnection.c" />
<ClCompile Include="RtspParser.c" />
<ClCompile Include="SdpGenerator.c" />
@ -161,6 +163,7 @@
<ClInclude Include="Platform.h" />
<ClInclude Include="PlatformSockets.h" />
<ClInclude Include="PlatformThreads.h" />
<ClInclude Include="RtpReorderQueue.h" />
<ClInclude Include="Rtsp.h" />
<ClInclude Include="Video.h" />
</ItemGroup>

View File

@ -63,9 +63,15 @@
<ClCompile Include="FakeCallbacks.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="RtpReorderQueue.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="Platform.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="Misc.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="ByteBuffer.h">
@ -110,5 +116,8 @@
<ClInclude Include="OpenAES\oaes_lib.h">
<Filter>Source Files\OpenAES</Filter>
</ClInclude>
<ClInclude Include="RtpReorderQueue.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>