Add support for LTR ACK control messages (#122)

* Add support for LTR ACK control messages
This commit is contained in:
ns6089
2026-01-21 06:47:38 +03:00
committed by GitHub
parent 435bc6a5a4
commit 2a5a1f3e8a
5 changed files with 121 additions and 44 deletions

View File

@@ -31,11 +31,12 @@ typedef struct _NVCTL_ENCRYPTED_PACKET_HEADER {
// encrypted NVCTL_ENET_PACKET_HEADER_V2 and payload data follow
} NVCTL_ENCRYPTED_PACKET_HEADER, *PNVCTL_ENCRYPTED_PACKET_HEADER;
typedef struct _QUEUED_FRAME_INVALIDATION_TUPLE {
typedef struct _QUEUED_REFERENCE_FRAME_CONTROL {
uint32_t startFrame;
uint32_t endFrame;
bool invalidate; // true: RFI(startFrame, endFrame); false: LTR_ACK(startFrame)
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_FRAME_INVALIDATION_TUPLE, *PQUEUED_FRAME_INVALIDATION_TUPLE;
} QUEUED_REFERENCE_FRAME_CONTROL, *PQUEUED_REFERENCE_FRAME_CONTROL;
typedef struct _QUEUED_FRAME_FEC_STATUS {
SS_FRAME_FEC_STATUS fecStatus;
@@ -113,7 +114,7 @@ static int lastConnectionStatusUpdate;
static uint32_t currentEnetSequenceNumber;
static uint64_t firstFrameTimeMs;
static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
static LINKED_BLOCKING_QUEUE referenceFrameControlQueue;
static LINKED_BLOCKING_QUEUE frameFecStatusQueue;
static LINKED_BLOCKING_QUEUE asyncCallbackQueue;
static PLT_EVENT idrFrameRequiredEvent;
@@ -300,7 +301,7 @@ static bool supportsIdrFrameRequest;
int initializeControlStream(void) {
stopping = false;
PltCreateEvent(&idrFrameRequiredEvent);
LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20);
LbqInitializeLinkedBlockingQueue(&referenceFrameControlQueue, 20);
LbqInitializeLinkedBlockingQueue(&frameFecStatusQueue, 8); // Limits number of frame status reports per periodic ping interval
LbqInitializeLinkedBlockingQueue(&asyncCallbackQueue, 30);
PltCreateMutex(&enetMutex);
@@ -375,7 +376,7 @@ void destroyControlStream(void) {
PltDestroyCryptoContext(encryptionCtx);
PltDestroyCryptoContext(decryptionCtx);
PltCloseEvent(&idrFrameRequiredEvent);
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&referenceFrameControlQueue));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&frameFecStatusQueue));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&asyncCallbackQueue));
@@ -386,12 +387,15 @@ static void queueFrameInvalidationTuple(uint32_t startFrame, uint32_t endFrame)
LC_ASSERT(startFrame <= endFrame);
if (isReferenceFrameInvalidationEnabled()) {
PQUEUED_FRAME_INVALIDATION_TUPLE qfit;
PQUEUED_REFERENCE_FRAME_CONTROL qfit;
qfit = malloc(sizeof(*qfit));
if (qfit != NULL) {
qfit->startFrame = startFrame;
qfit->endFrame = endFrame;
if (LbqOfferQueueItem(&invalidReferenceFrameTuples, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) {
*qfit = (QUEUED_REFERENCE_FRAME_CONTROL){
.startFrame = startFrame,
.endFrame = endFrame,
.invalidate = true,
};
if (LbqOfferQueueItem(&referenceFrameControlQueue, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) {
// Too many invalidation tuples, so we need an IDR frame now
Limelog("RFI range list reached maximum size limit\n");
free(qfit);
@@ -411,7 +415,7 @@ static void queueFrameInvalidationTuple(uint32_t startFrame, uint32_t endFrame)
void LiRequestIdrFrame(void) {
// Any reference frame invalidation requests should be dropped now.
// We require a full IDR frame to recover.
freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqFlushQueueItems(&referenceFrameControlQueue));
// Request the IDR frame
PltSetEvent(&idrFrameRequiredEvent);
@@ -423,9 +427,29 @@ void connectionDetectedFrameLoss(uint32_t startFrame, uint32_t endFrame) {
}
// When we receive a frame, update the number of our current frame
void connectionReceivedCompleteFrame(uint32_t frameIndex) {
// and send ACK control message if the frame is LTR
void connectionReceivedCompleteFrame(uint32_t frameIndex, bool frameIsLTR) {
lastGoodFrame = frameIndex;
intervalGoodFrameCount++;
if (frameIsLTR && IS_SUNSHINE() && isReferenceFrameInvalidationEnabled()) {
// Queue LTR frame ACK control message
PQUEUED_REFERENCE_FRAME_CONTROL qfit;
qfit = malloc(sizeof(*qfit));
if (qfit != NULL) {
*qfit = (QUEUED_REFERENCE_FRAME_CONTROL){
.startFrame = frameIndex,
.invalidate = false,
};
if (LbqOfferQueueItem(&referenceFrameControlQueue, qfit, &qfit->entry) == LBQ_BOUND_EXCEEDED) {
// This shouldn't happen and indicates that something has gone wrong with the queue
LC_ASSERT(false);
Limelog("Couldn't queue LTR ACK because the list has reached maximum size limit\n");
free(qfit);
LiRequestIdrFrame();
}
}
}
}
void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus) {
@@ -1512,22 +1536,22 @@ static void requestIdrFrame(void) {
}
static void requestInvalidateReferenceFrames(uint32_t startFrame, uint32_t endFrame) {
int64_t payload[3];
LC_ASSERT(startFrame <= endFrame);
LC_ASSERT(isReferenceFrameInvalidationEnabled());
payload[0] = LE64(startFrame);
payload[1] = LE64(endFrame);
payload[2] = 0;
SS_RFI_REQUEST payload = {
.firstFrameIndex = LE32(startFrame),
.lastFrameIndex = LE32(endFrame),
};
// Send the reference frame invalidation request and read the response
if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES],
sizeof(payload),
payload, CTRL_CHANNEL_URGENT,
&payload,
CTRL_CHANNEL_URGENT,
ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int)LastSocketError());
Limelog("Request Invalidate Reference Frames: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
return;
}
@@ -1535,32 +1559,65 @@ static void requestInvalidateReferenceFrames(uint32_t startFrame, uint32_t endFr
Limelog("Invalidate reference frame request sent (%d to %d)\n", startFrame, endFrame);
}
static void invalidateRefFramesFunc(void* context) {
static void confirmLongtermReferenceFrame(uint32_t frameIndex) {
LC_ASSERT(isReferenceFrameInvalidationEnabled());
SS_LTR_FRAME_ACK payload = {
.frameIndex = LE32(frameIndex),
};
// Send LTR frame ACK and don't wait for response
if (!sendMessageAndForget(SS_LTR_FRAME_ACK_PTYPE,
sizeof(payload),
&payload,
CTRL_CHANNEL_URGENT,
ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("LTR frame ACK: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
return;
}
}
static void referenceFrameControlFunc(void* context) {
LC_ASSERT(isReferenceFrameInvalidationEnabled());
while (!PltIsThreadInterrupted(&invalidateRefFramesThread)) {
PQUEUED_FRAME_INVALIDATION_TUPLE qfit;
uint32_t startFrame;
uint32_t endFrame;
PQUEUED_REFERENCE_FRAME_CONTROL qfit;
uint32_t invalidateStartFrame;
uint32_t invalidateEndFrame;
bool invalidate = false;
// Wait for a reference frame invalidation request or a request to shutdown
if (LbqWaitForQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) != LBQ_SUCCESS) {
// Wait for a reference frame control message or a request to shutdown
if (LbqWaitForQueueElement(&referenceFrameControlQueue, (void**)&qfit) != LBQ_SUCCESS) {
// Bail if we're stopping
return;
}
startFrame = qfit->startFrame;
endFrame = qfit->endFrame;
// Aggregate all lost frames into one range
do {
LC_ASSERT(qfit->endFrame >= endFrame);
endFrame = qfit->endFrame;
if (qfit->invalidate) {
if (!invalidate) {
invalidateStartFrame = qfit->startFrame;
invalidateEndFrame = qfit->endFrame;
invalidate = true;
}
else {
// Aggregate all lost frames into one range
LC_ASSERT(qfit->endFrame >= invalidateEndFrame);
invalidateEndFrame = qfit->endFrame;
}
}
else {
// Send LTR frame ACK
confirmLongtermReferenceFrame(qfit->startFrame);
}
free(qfit);
} while (LbqPollQueueElement(&invalidReferenceFrameTuples, (void**)&qfit) == LBQ_SUCCESS);
} while (LbqPollQueueElement(&referenceFrameControlQueue, (void**)&qfit) == LBQ_SUCCESS);
if (invalidate) {
// Send the reference frame invalidation request
requestInvalidateReferenceFrames(startFrame, endFrame);
requestInvalidateReferenceFrames(invalidateStartFrame, invalidateEndFrame);
}
}
}
@@ -1574,8 +1631,8 @@ static void requestIdrFrameFunc(void* context) {
return;
}
// Any pending reference frame invalidation requests are now redundant
freeBasicLbqList(LbqFlushQueueItems(&invalidReferenceFrameTuples));
// Any pending RFI requests and LTR frame ACK messages are now redundant
freeBasicLbqList(LbqFlushQueueItems(&referenceFrameControlQueue));
// Request the IDR frame
requestIdrFrame();
@@ -1585,7 +1642,7 @@ static void requestIdrFrameFunc(void* context) {
// Stops the control stream
int stopControlStream(void) {
stopping = true;
LbqSignalQueueShutdown(&invalidReferenceFrameTuples);
LbqSignalQueueShutdown(&referenceFrameControlQueue);
LbqSignalQueueShutdown(&frameFecStatusQueue);
LbqSignalQueueDrain(&asyncCallbackQueue);
PltSetEvent(&idrFrameRequiredEvent);
@@ -1972,7 +2029,7 @@ int startControlStream(void) {
// Only create the reference frame invalidation thread if RFI is enabled
if (isReferenceFrameInvalidationEnabled()) {
err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread);
err = PltCreateThread("InvRefFrames", referenceFrameControlFunc, NULL, &invalidateRefFramesThread);
if (err != 0) {
stopping = true;
PltSetEvent(&idrFrameRequiredEvent);

View File

@@ -55,7 +55,7 @@ extern uint32_t EncryptionFeaturesEnabled;
// ENet channel ID values
#define CTRL_CHANNEL_GENERIC 0x00
#define CTRL_CHANNEL_URGENT 0x01 // IDR and reference frame invalidation requests
#define CTRL_CHANNEL_URGENT 0x01 // IDR, LTR ACK and RFI
#define CTRL_CHANNEL_KEYBOARD 0x02
#define CTRL_CHANNEL_MOUSE 0x03
#define CTRL_CHANNEL_PEN 0x04
@@ -119,7 +119,7 @@ int startControlStream(void);
int stopControlStream(void);
void destroyControlStream(void);
void connectionDetectedFrameLoss(uint32_t startFrame, uint32_t endFrame);
void connectionReceivedCompleteFrame(uint32_t frameIndex);
void connectionReceivedCompleteFrame(uint32_t frameIndex, bool frameIsLTR);
void connectionSawFrame(uint32_t frameIndex);
void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus);
int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags, bool moreData);

View File

@@ -382,9 +382,9 @@ cleanup_packets:
// Check all NV_VIDEO_PACKET fields except FEC stuff which differs in the recovered packet
LC_ASSERT_VT(nvPacket->flags == droppedNvPacket->flags);
LC_ASSERT_VT(nvPacket->extraFlags == droppedNvPacket->extraFlags);
LC_ASSERT_VT(nvPacket->frameIndex == droppedNvPacket->frameIndex);
LC_ASSERT_VT(nvPacket->streamPacketIndex == droppedNvPacket->streamPacketIndex);
LC_ASSERT_VT(nvPacket->reserved == droppedNvPacket->reserved);
LC_ASSERT_VT(!queue->multiFecCapable || nvPacket->multiFecBlocks == droppedNvPacket->multiFecBlocks);
// Check the data itself - use memcmp() and only loop if an error is detected

View File

@@ -22,11 +22,13 @@ typedef struct _ENC_VIDEO_HEADER {
#define FLAG_EOF 0x2
#define FLAG_SOF 0x4
#define NV_VIDEO_PACKET_EXTRA_FLAG_LTR_FRAME 0x1
typedef struct _NV_VIDEO_PACKET {
uint32_t streamPacketIndex;
uint32_t frameIndex;
uint8_t flags;
uint8_t reserved;
uint8_t extraFlags;
uint8_t multiFecFlags;
uint8_t multiFecBlocks;
uint32_t fecInfo;
@@ -67,4 +69,20 @@ typedef struct _SS_FRAME_FEC_STATUS {
uint8_t multiFecBlockCount;
} SS_FRAME_FEC_STATUS, *PSS_FRAME_FEC_STATUS;
// Fields are little-endian
#define SS_LTR_FRAME_ACK_PTYPE 0x0350
typedef struct _SS_LTR_FRAME_ACK {
uint32_t frameIndex;
uint32_t reserved;
} SS_LTR_FRAME_ACK, *PSS_LTR_FRAME_ACK;
// Fields are little-endian
#define SS_RFI_REQUEST_PTYPE 0x0301
typedef struct _SS_RFI_REQUEST {
uint32_t firstFrameIndex;
uint32_t reserved1;
uint32_t lastFrameIndex;
uint32_t reserved2[3];
} SS_RFI_REQUEST, *PSS_RFI_REQUEST;
#pragma pack(pop)

View File

@@ -466,7 +466,7 @@ static bool isIdrFrameStart(PBUFFER_DESC buffer) {
}
// Reassemble the frame with the given frame number
static void reassembleFrame(int frameNumber) {
static void reassembleFrame(int frameNumber, bool frameIsLTR) {
if (nalChainHead != NULL) {
QUEUED_DECODE_UNIT qduDS;
PQUEUED_DECODE_UNIT qdu;
@@ -539,7 +539,7 @@ static void reassembleFrame(int frameNumber) {
}
// Notify the control connection
connectionReceivedCompleteFrame(frameNumber);
connectionReceivedCompleteFrame(frameNumber, frameIsLTR);
// Clear frame drops
consecutiveFrameDrops = 0;
@@ -748,6 +748,7 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length,
BUFFER_DESC currentPos;
uint32_t frameIndex;
uint8_t flags;
uint8_t extraFlags;
bool firstPacket, lastPacket;
uint32_t streamPacketIndex;
uint8_t fecCurrentBlockNumber;
@@ -765,6 +766,7 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length,
fecLastBlockNumber = (videoPacket->multiFecBlocks >> 6) & 0x3;
frameIndex = videoPacket->frameIndex;
flags = videoPacket->flags;
extraFlags = videoPacket->extraFlags;
firstPacket = isFirstPacket(flags, fecCurrentBlockNumber);
lastPacket = (flags & FLAG_EOF) && fecCurrentBlockNumber == fecLastBlockNumber;
@@ -1119,7 +1121,7 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length,
}
}
reassembleFrame(frameIndex);
reassembleFrame(frameIndex, extraFlags & NV_VIDEO_PACKET_EXTRA_FLAG_LTR_FRAME);
}
}