mirror of
https://github.com/moonlight-stream/moonlight-common-c.git
synced 2026-04-14 03:36:12 +00:00
Forward Error Correction (#24)
* Preperation for FEC by adding new queue which buffers whole frames * Add code for creating recovered RTP packet * Add checks before repair missing packets * Initial implementation for single FEC packet * Implement FEC for multiple packets
This commit is contained in:
committed by
Cameron Gutman
parent
dbb7cee399
commit
9bf8d361a1
291
src/RtpFecQueue.c
Normal file
291
src/RtpFecQueue.c
Normal file
@@ -0,0 +1,291 @@
|
||||
#include "Limelight-internal.h"
|
||||
#include "RtpFecQueue.h"
|
||||
#include "rs.h"
|
||||
|
||||
#define ushort(x) (unsigned short) ((x) % (UINT16_MAX+1))
|
||||
#define isBefore(x, y) ushort((x) - (y)) > (UINT16_MAX/2)
|
||||
|
||||
void RtpfInitializeQueue(PRTP_FEC_QUEUE queue) {
|
||||
reed_solomon_init();
|
||||
memset(queue, 0, sizeof(*queue));
|
||||
queue->nextRtpSequenceNumber = UINT16_MAX;
|
||||
|
||||
queue->currentFrameNumber = UINT16_MAX;
|
||||
}
|
||||
|
||||
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue) {
|
||||
while (queue->bufferHead != NULL) {
|
||||
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead;
|
||||
queue->bufferHead = entry->next;
|
||||
free(entry->packet);
|
||||
}
|
||||
}
|
||||
|
||||
// newEntry is contained within the packet buffer so we free the whole entry by freeing entry->packet
|
||||
static int queuePacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY newEntry, int head, PRTP_PACKET packet) {
|
||||
PRTPFEC_QUEUE_ENTRY entry;
|
||||
|
||||
// Don't queue packets we're already ahead of
|
||||
if (isBefore(packet->sequenceNumber, queue->nextRtpSequenceNumber)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Don't queue duplicates either
|
||||
entry = queue->bufferHead;
|
||||
while (entry != NULL) {
|
||||
if (entry->packet->sequenceNumber == packet->sequenceNumber) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
entry = entry->next;
|
||||
}
|
||||
|
||||
newEntry->packet = packet;
|
||||
newEntry->prev = NULL;
|
||||
newEntry->next = NULL;
|
||||
|
||||
if (queue->bufferHead == NULL) {
|
||||
LC_ASSERT(queue->bufferSize == 0);
|
||||
queue->bufferHead = queue->bufferTail = newEntry;
|
||||
}
|
||||
else if (head) {
|
||||
LC_ASSERT(queue->bufferSize > 0);
|
||||
PRTPFEC_QUEUE_ENTRY oldHead = queue->bufferHead;
|
||||
newEntry->next = oldHead;
|
||||
LC_ASSERT(oldHead->prev == NULL);
|
||||
oldHead->prev = newEntry;
|
||||
queue->bufferHead = newEntry;
|
||||
}
|
||||
else {
|
||||
LC_ASSERT(queue->bufferSize > 0);
|
||||
PRTPFEC_QUEUE_ENTRY oldTail = queue->bufferTail;
|
||||
newEntry->prev = oldTail;
|
||||
LC_ASSERT(oldTail->next == NULL);
|
||||
oldTail->next = newEntry;
|
||||
queue->bufferTail = newEntry;
|
||||
}
|
||||
queue->bufferSize++;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
static void queueRecoveredPacket(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry, PRTP_PACKET packet, int sequenceNumber) {
|
||||
//Correct RTP header as it isn't in the FEC data
|
||||
packet->header = queue->queueHead->packet->header;
|
||||
packet->sequenceNumber = sequenceNumber;
|
||||
|
||||
int dataOffset = sizeof(*packet);
|
||||
if (packet->header & FLAG_EXTENSION) {
|
||||
dataOffset += 4; // 2 additional fields
|
||||
}
|
||||
|
||||
PNV_VIDEO_PACKET nvPacket = (PNV_VIDEO_PACKET)(((char*)packet) + dataOffset);
|
||||
nvPacket->frameIndex = queue->currentFrameNumber;
|
||||
|
||||
//Set size of generated packet
|
||||
int size = StreamConfig.packetSize + dataOffset;
|
||||
|
||||
int receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE;
|
||||
memcpy(&((unsigned char*) packet)[receiveSize], &size, sizeof(int));
|
||||
|
||||
queuePacket(queue, entry, 0, packet);
|
||||
}
|
||||
|
||||
static void repairPackets(PRTP_FEC_QUEUE queue) {
|
||||
int totalPackets = ushort(queue->bufferHighestSequenceNumber - queue->bufferLowestSequenceNumber) + 1;
|
||||
int totalParityPackets = (queue->bufferDataPackets * queue->fecPercentage + 99) / 100;
|
||||
int parityPackets = totalPackets - queue->bufferDataPackets;
|
||||
int missingPackets = totalPackets - queue->bufferSize;
|
||||
|
||||
if (parityPackets < missingPackets || parityPackets <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
reed_solomon* rs = reed_solomon_new(queue->bufferDataPackets, totalParityPackets);
|
||||
|
||||
int* missing = malloc(missingPackets * sizeof(int));
|
||||
unsigned char** packets = malloc(totalPackets * sizeof(unsigned char*));
|
||||
unsigned char* marks = malloc(totalPackets * sizeof(unsigned char));
|
||||
if (rs == NULL || missing == NULL || packets == NULL || marks == NULL)
|
||||
goto cleanup;
|
||||
|
||||
rs->shards = queue->bufferDataPackets + missingPackets; //Don't let RS complain about missing parity packets
|
||||
|
||||
memset(marks, 1, sizeof(char) * (totalPackets));
|
||||
|
||||
int receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE;
|
||||
int packetBufferSize = receiveSize + sizeof(int) + sizeof(RTPFEC_QUEUE_ENTRY);
|
||||
|
||||
PRTPFEC_QUEUE_ENTRY entry = queue->bufferHead;
|
||||
while (entry != NULL) {
|
||||
int index = ushort(entry->packet->sequenceNumber - queue->bufferLowestSequenceNumber);
|
||||
packets[index] = (unsigned char*) entry->packet;
|
||||
marks[index] = 0;
|
||||
|
||||
int size;
|
||||
memcpy(&size, &packets[index][StreamConfig.packetSize + MAX_RTP_HEADER_SIZE], sizeof(int));
|
||||
|
||||
//Set padding to zero
|
||||
if (size < receiveSize) {
|
||||
memset(&packets[index][size], 0, receiveSize - size);
|
||||
}
|
||||
|
||||
entry = entry->next;
|
||||
}
|
||||
|
||||
int i;
|
||||
int ret = -1;
|
||||
for (i = 0; i < totalPackets; i++) {
|
||||
if (marks[i]) {
|
||||
packets[i] = malloc(packetBufferSize);
|
||||
if (packets[i] == NULL) {
|
||||
goto cleanup_packets;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret = reed_solomon_reconstruct(rs, packets, marks, totalPackets, receiveSize);
|
||||
|
||||
cleanup_packets:
|
||||
for (i = 0; i < totalPackets; i++) {
|
||||
if (marks[i]) {
|
||||
if (ret == 0) {
|
||||
PRTPFEC_QUEUE_ENTRY queueEntry = (PRTPFEC_QUEUE_ENTRY)&packets[i][receiveSize + sizeof(int)];
|
||||
PRTP_PACKET rtpPacket = (PRTP_PACKET) packets[i];
|
||||
rtpPacket->sequenceNumber = ushort(i + queue->bufferLowestSequenceNumber);
|
||||
rtpPacket->header = queue->bufferHead->packet->header;
|
||||
|
||||
int dataOffset = sizeof(*rtpPacket);
|
||||
if (rtpPacket->header & FLAG_EXTENSION) {
|
||||
dataOffset += 4; // 2 additional fields
|
||||
}
|
||||
|
||||
PNV_VIDEO_PACKET nvPacket = (PNV_VIDEO_PACKET)(((char*)rtpPacket) + dataOffset);
|
||||
nvPacket->frameIndex = queue->currentFrameNumber;
|
||||
|
||||
//Remove padding of generated packet
|
||||
int size = StreamConfig.packetSize + dataOffset;
|
||||
if (rtpPacket->sequenceNumber == ushort(queue->bufferLowestSequenceNumber + queue->bufferDataPackets - 1)) {
|
||||
while (packets[i][size-1] == 0) {
|
||||
size--;
|
||||
}
|
||||
}
|
||||
|
||||
memcpy(&packets[i][receiveSize], &size, sizeof(int));
|
||||
queuePacket(queue, queueEntry, 0, rtpPacket);
|
||||
} else if (packets[i] != NULL) {
|
||||
free(packets[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cleanup:
|
||||
reed_solomon_release(rs);
|
||||
|
||||
if (missing != NULL)
|
||||
free(missing);
|
||||
|
||||
if (packets != NULL)
|
||||
free(packets);
|
||||
|
||||
if (marks != NULL)
|
||||
free(marks);
|
||||
}
|
||||
|
||||
static void removeEntry(PRTP_FEC_QUEUE queue, PRTPFEC_QUEUE_ENTRY entry) {
|
||||
LC_ASSERT(entry != NULL);
|
||||
LC_ASSERT(queue->queueSize > 0);
|
||||
LC_ASSERT(queue->readyQueueHead != NULL);
|
||||
LC_ASSERT(queue->readyQueueTail != NULL);
|
||||
|
||||
if (queue->queueHead == entry) {
|
||||
queue->queueHead = entry->next;
|
||||
}
|
||||
if (queue->queueTail == entry) {
|
||||
queue->queueTail = entry->prev;
|
||||
}
|
||||
|
||||
if (entry->prev != NULL) {
|
||||
entry->prev->next = entry->next;
|
||||
}
|
||||
if (entry->next != NULL) {
|
||||
entry->next->prev = entry->prev;
|
||||
}
|
||||
queue->queueSize--;
|
||||
}
|
||||
|
||||
int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, PRTPFEC_QUEUE_ENTRY packetEntry) {
|
||||
if (isBefore(packet->sequenceNumber, queue->nextRtpSequenceNumber)) {
|
||||
// Reject packets behind our current sequence number
|
||||
return RTPF_RET_REJECTED;
|
||||
}
|
||||
|
||||
int dataOffset = sizeof(*packet);
|
||||
if (packet->header & FLAG_EXTENSION) {
|
||||
dataOffset += 4; // 2 additional fields
|
||||
}
|
||||
|
||||
PNV_VIDEO_PACKET nvPacket = (PNV_VIDEO_PACKET)(((char*)packet) + dataOffset);
|
||||
|
||||
if (nvPacket->frameIndex != queue->currentFrameNumber) {
|
||||
//Make current frame available to depacketizer
|
||||
queue->currentFrameNumber = nvPacket->frameIndex;
|
||||
if (queue->queueTail == NULL) {
|
||||
queue->queueHead = queue->bufferHead;
|
||||
queue->queueTail = queue->bufferTail;
|
||||
} else {
|
||||
queue->queueTail->next = queue->bufferHead;
|
||||
queue->queueTail = queue->bufferTail;
|
||||
}
|
||||
queue->bufferHead = NULL;
|
||||
queue->bufferTail = NULL;
|
||||
|
||||
queue->queueSize = queue->bufferSize;
|
||||
queue->nextRtpSequenceNumber = queue->bufferHighestSequenceNumber;
|
||||
|
||||
int fecIndex = (nvPacket->fecInfo & 0xFF000) >> 12;
|
||||
queue->bufferLowestSequenceNumber = ushort(packet->sequenceNumber - fecIndex);
|
||||
queue->bufferSize = 0;
|
||||
queue->bufferHighestSequenceNumber = packet->sequenceNumber;
|
||||
queue->bufferDataPackets = ((nvPacket->fecInfo & 0xFF00000) >> 20) / 4;
|
||||
queue->fecPercentage = ((nvPacket->fecInfo & 0xFF0) >> 4);
|
||||
} else if (isBefore(queue->bufferHighestSequenceNumber, packet->sequenceNumber)) {
|
||||
queue->bufferHighestSequenceNumber = packet->sequenceNumber;
|
||||
}
|
||||
|
||||
if (!queuePacket(queue, packetEntry, 0, packet)) {
|
||||
return RTPF_RET_REJECTED;
|
||||
}
|
||||
else {
|
||||
if (queue->bufferSize < (ushort(packet->sequenceNumber - queue->bufferLowestSequenceNumber) + 1)) {
|
||||
repairPackets(queue);
|
||||
}
|
||||
|
||||
return (queue->queueHead != NULL) ? RTPF_RET_QUEUED_PACKETS_READY : RTPF_RET_QUEUED_NOTHING_READY;
|
||||
}
|
||||
}
|
||||
|
||||
PRTP_PACKET RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue) {
|
||||
PRTPFEC_QUEUE_ENTRY queuedEntry, entry;
|
||||
|
||||
// Find the next queued packet
|
||||
queuedEntry = NULL;
|
||||
entry = queue->queueHead;
|
||||
unsigned int lowestRtpSequenceNumber = UINT16_MAX;
|
||||
|
||||
while (entry != NULL) {
|
||||
if (queuedEntry == NULL || isBefore(entry->packet->sequenceNumber, lowestRtpSequenceNumber)) {
|
||||
lowestRtpSequenceNumber = entry->packet->sequenceNumber;
|
||||
queuedEntry = entry;
|
||||
}
|
||||
|
||||
entry = entry->next;
|
||||
}
|
||||
|
||||
if (queuedEntry != NULL) {
|
||||
removeEntry(queue, queuedEntry);
|
||||
return queuedEntry->packet;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
36
src/RtpFecQueue.h
Normal file
36
src/RtpFecQueue.h
Normal file
@@ -0,0 +1,36 @@
|
||||
#pragma once
|
||||
|
||||
#include "Video.h"
|
||||
|
||||
typedef struct _RTPFEC_QUEUE_ENTRY {
|
||||
PRTP_PACKET packet;
|
||||
|
||||
struct _RTPFEC_QUEUE_ENTRY* next;
|
||||
struct _RTPFEC_QUEUE_ENTRY* prev;
|
||||
} RTPFEC_QUEUE_ENTRY, *PRTPFEC_QUEUE_ENTRY;
|
||||
|
||||
typedef struct _RTP_FEC_QUEUE {
|
||||
PRTPFEC_QUEUE_ENTRY queueHead;
|
||||
PRTPFEC_QUEUE_ENTRY queueTail;
|
||||
int queueSize;
|
||||
|
||||
PRTPFEC_QUEUE_ENTRY bufferHead;
|
||||
PRTPFEC_QUEUE_ENTRY bufferTail;
|
||||
int bufferSize;
|
||||
int bufferLowestSequenceNumber;
|
||||
int bufferHighestSequenceNumber;
|
||||
int bufferDataPackets;
|
||||
int fecPercentage;
|
||||
|
||||
unsigned short currentFrameNumber;
|
||||
unsigned int nextRtpSequenceNumber;
|
||||
} RTP_FEC_QUEUE, *PRTP_FEC_QUEUE;
|
||||
|
||||
#define RTPF_RET_QUEUED_NOTHING_READY 0
|
||||
#define RTPF_RET_QUEUED_PACKETS_READY 1
|
||||
#define RTPF_RET_REJECTED 2
|
||||
|
||||
void RtpfInitializeQueue(PRTP_FEC_QUEUE queue);
|
||||
void RtpfCleanupQueue(PRTP_FEC_QUEUE queue);
|
||||
int RtpfAddPacket(PRTP_FEC_QUEUE queue, PRTP_PACKET packet, PRTPFEC_QUEUE_ENTRY packetEntry);
|
||||
PRTP_PACKET RtpfGetQueuedPacket(PRTP_FEC_QUEUE queue);
|
||||
@@ -202,12 +202,6 @@ static PSDP_OPTION getAttributesList(char*urlSafeAddr) {
|
||||
err |= addAttributeString(&optionHead, "x-nv-vqos[0].bw.maximumBitrate", payloadStr);
|
||||
}
|
||||
|
||||
// Using FEC turns padding on which makes us have to take the slow path
|
||||
// in the depacketizer, not to mention exposing some ambiguous cases with
|
||||
// distinguishing padding from valid sequences. Since we can only perform
|
||||
// execute an FEC recovery on a 1 packet frame, we'll just turn it off completely.
|
||||
err |= addAttributeString(&optionHead, "x-nv-vqos[0].fec.enable", "0");
|
||||
|
||||
err |= addAttributeString(&optionHead, "x-nv-vqos[0].videoQualityScoreUpdateTime", "5000");
|
||||
|
||||
if (StreamConfig.streamingRemotely) {
|
||||
|
||||
@@ -21,7 +21,7 @@ typedef struct _NV_VIDEO_PACKET {
|
||||
int frameIndex;
|
||||
char flags;
|
||||
char reserved[3];
|
||||
int reserved2;
|
||||
int fecInfo;
|
||||
} NV_VIDEO_PACKET, *PNV_VIDEO_PACKET;
|
||||
|
||||
#define FLAG_EXTENSION 0x10
|
||||
@@ -36,4 +36,4 @@ typedef struct _RTP_PACKET {
|
||||
char reserved[8];
|
||||
} RTP_PACKET, *PRTP_PACKET;
|
||||
|
||||
#pragma pack(pop)
|
||||
#pragma pack(pop)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
#include "Limelight-internal.h"
|
||||
#include "PlatformSockets.h"
|
||||
#include "PlatformThreads.h"
|
||||
#include "RtpReorderQueue.h"
|
||||
#include "RtpFecQueue.h"
|
||||
|
||||
#define FIRST_FRAME_MAX 1500
|
||||
#define FIRST_FRAME_TIMEOUT_SEC 10
|
||||
@@ -11,7 +11,7 @@
|
||||
|
||||
#define RTP_RECV_BUFFER (512 * 1024)
|
||||
|
||||
static RTP_REORDER_QUEUE rtpQueue;
|
||||
static RTP_FEC_QUEUE rtpQueue;
|
||||
|
||||
static SOCKET rtpSocket = INVALID_SOCKET;
|
||||
static SOCKET firstFrameSocket = INVALID_SOCKET;
|
||||
@@ -25,16 +25,17 @@ static PLT_THREAD decoderThread;
|
||||
// the RTP queue will wait for missing/reordered packets.
|
||||
#define RTP_QUEUE_DELAY 10
|
||||
|
||||
|
||||
// Initialize the video stream
|
||||
void initializeVideoStream(void) {
|
||||
initializeVideoDepacketizer(StreamConfig.packetSize);
|
||||
RtpqInitializeQueue(&rtpQueue, RTPQ_DEFAULT_MAX_SIZE, RTP_QUEUE_DELAY);
|
||||
RtpfInitializeQueue(&rtpQueue); //TODO RTP_QUEUE_DELAY
|
||||
}
|
||||
|
||||
// Clean up the video stream
|
||||
void destroyVideoStream(void) {
|
||||
destroyVideoDepacketizer();
|
||||
RtpqCleanupQueue(&rtpQueue);
|
||||
RtpfCleanupQueue(&rtpQueue);
|
||||
}
|
||||
|
||||
// UDP Ping proc
|
||||
@@ -66,7 +67,7 @@ static void ReceiveThreadProc(void* context) {
|
||||
int queueStatus;
|
||||
|
||||
receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE;
|
||||
bufferSize = receiveSize + sizeof(int) + sizeof(RTP_QUEUE_ENTRY);
|
||||
bufferSize = receiveSize + sizeof(int) + sizeof(RTPFEC_QUEUE_ENTRY);
|
||||
buffer = NULL;
|
||||
|
||||
while (!PltIsThreadInterrupted(&receiveThread)) {
|
||||
@@ -98,20 +99,16 @@ static void ReceiveThreadProc(void* context) {
|
||||
packet = (PRTP_PACKET)&buffer[0];
|
||||
packet->sequenceNumber = htons(packet->sequenceNumber);
|
||||
|
||||
queueStatus = RtpqAddPacket(&rtpQueue, packet, (PRTP_QUEUE_ENTRY)&buffer[receiveSize + sizeof(int)]);
|
||||
if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) {
|
||||
// queueRtpPacket() copies the data it needs to we can reuse the buffer
|
||||
queueRtpPacket(packet, err);
|
||||
}
|
||||
else if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) {
|
||||
queueStatus = RtpfAddPacket(&rtpQueue, packet, (PRTPFEC_QUEUE_ENTRY)&buffer[receiveSize + sizeof(int)]);
|
||||
if (queueStatus == RTPF_RET_QUEUED_PACKETS_READY) {
|
||||
// The packet queue now has packets ready
|
||||
while ((buffer = (char*)RtpqGetQueuedPacket(&rtpQueue)) != NULL) {
|
||||
while ((buffer = (char*)RtpfGetQueuedPacket(&rtpQueue)) != NULL) {
|
||||
memcpy(&err, &buffer[receiveSize], sizeof(int));
|
||||
queueRtpPacket((PRTP_PACKET)buffer, err);
|
||||
free(buffer);
|
||||
}
|
||||
}
|
||||
else if (queueStatus == RTPQ_RET_QUEUED_NOTHING_READY) {
|
||||
else if (queueStatus == RTPF_RET_QUEUED_NOTHING_READY) {
|
||||
// The queue owns the buffer
|
||||
buffer = NULL;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user