This commit is contained in:
Cameron Gutman 2015-08-11 19:24:11 -07:00
commit beb7ccb456
2 changed files with 90 additions and 13 deletions

View File

@ -10,6 +10,12 @@ typedef struct _NVCTL_PACKET_HEADER {
unsigned short payloadLength;
} NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER;
typedef struct _QUEUED_LOSS_STAT {
int startFrame;
int endFrame;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_LOSS_STAT, *PQUEUED_LOSS_STAT;
static SOCKET ctlSock = INVALID_SOCKET;
static PLT_THREAD lossStatsThread;
static PLT_THREAD invalidateRefFramesThread;
@ -17,6 +23,9 @@ static PLT_EVENT invalidateRefFramesEvent;
static int lossCountSinceLastReport = 0;
static long currentFrame = 0;
static int requestIdr;
static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
#define IDX_START_A 0
#define IDX_REQUEST_IDR_FRAME 0
#define IDX_START_B 1
@ -77,6 +86,7 @@ static char **preconstructedPayloads;
/* Initializes the control stream */
int initializeControlStream(void) {
PltCreateEvent(&invalidateRefFramesEvent);
LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20);
if (ServerMajorVersion == 3) {
packetTypes = (short*)packetTypesGen3;
@ -92,26 +102,57 @@ int initializeControlStream(void) {
return 0;
}
void freeLossStatList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
nextEntry = entry->flink;
free(entry->data);
entry = nextEntry;
}
}
/* Cleans up control stream */
void destroyControlStream(void) {
PltCloseEvent(&invalidateRefFramesEvent);
freeLossStatList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
}
int getNextLossStat(PQUEUED_LOSS_STAT *qls) {
int err = LbqPollQueueElement(&invalidReferenceFrameTuples, (void**) qls);
return (err == LBQ_SUCCESS);
}
void queueLossStat(int startFrame, int endFrame) {
PQUEUED_LOSS_STAT qls;
qls = malloc(sizeof(QUEUED_LOSS_STAT));
if (qls != NULL) {
qls->startFrame = startFrame;
qls->endFrame = endFrame;
if (LbqOfferQueueItem(&invalidReferenceFrameTuples, qls, &qls->entry) == LBQ_BOUND_EXCEEDED) {
free(qls);
requestIdr = 1;
}
PltSetEvent(&invalidateRefFramesEvent);
}
}
/* Request an IDR frame on demand by the decoder */
void requestIdrOnDemand(void) {
requestIdr = 1;
PltSetEvent(&invalidateRefFramesEvent);
}
/* Invalidate reference frames if the decoder is too slow */
void connectionSinkTooSlow(int startFrame, int endFrame) {
// FIXME: Send ranges
PltSetEvent(&invalidateRefFramesEvent);
queueLossStat(startFrame, endFrame);
}
/* Invalidate reference frames lost by the network */
void connectionDetectedFrameLoss(int startFrame, int endFrame) {
// FIXME: Send ranges
PltSetEvent(&invalidateRefFramesEvent);
queueLossStat(startFrame, endFrame);
}
/* When we receive a frame, update the number of our current frame */
@ -239,6 +280,7 @@ static void lossStatsThreadFunc(void* context) {
static void requestIdrFrame(void) {
long long payload[3];
Limelog("IDR frame requested\n");
if (ServerMajorVersion == 3) {
// Form the payload
payload[0] = 0;
@ -266,14 +308,53 @@ static void requestIdrFrame(void) {
Limelog("IDR frame request sent\n");
}
static void requestInvalidateRefFrames(void) {
long long payload[3];
PQUEUED_LOSS_STAT qls;
if (!getNextLossStat(&qls)) {
return;
}
payload[0] = qls->startFrame + 1;
// Aggregate all lost frames into one range
while (getNextLossStat(&qls));
// The server expects this to be the firstLostFrame + 1
payload[1] = qls->endFrame;
payload[2] = 0;
free(qls);
// Send the reference frame invalidation request and read the response
if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES],
payloadLengths[IDX_INVALIDATE_REF_FRAMES], payload)) {
Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int) LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketError());
return;
}
Limelog("Invalidate Reference Frame request sent\n");
}
static void invalidateRefFramesFunc(void* context) {
while (!PltIsThreadInterrupted(&invalidateRefFramesThread)) {
// Wait for a request to invalidate reference frames
PltWaitForEvent(&invalidateRefFramesEvent);
PltClearEvent(&invalidateRefFramesEvent);
// Send an IDR frame request
requestIdrFrame();
if (requestIdr) {
// Send an IDR frame request
requestIdrFrame();
requestIdr = 0;
//Empty invalidate reference frames tuples
PQUEUED_LOSS_STAT qls;
while (getNextLossStat(&qls));
free(qls);
} else {
requestInvalidateRefFrames();
}
}
}

View File

@ -59,8 +59,6 @@ static void cleanupAvcFrameState(void) {
/* Cleanup AVC frame state and set that we're waiting for an IDR Frame*/
static void dropAvcFrameState(void) {
waitingForIdrFrame = 1;
// Count the number of consecutive frames dropped
consecutiveFrameDrops++;
@ -72,7 +70,8 @@ static void dropAvcFrameState(void) {
consecutiveFrameDrops = 0;
// Request an IDR frame
connectionDetectedFrameLoss(0, 0);
waitingForIdrFrame = 1;
requestIdrOnDemand();
}
cleanupAvcFrameState();
@ -371,9 +370,6 @@ void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length) {
// Unexpected start of next frame before terminating the last
waitingForNextSuccessfulFrame = 1;
waitingForIdrFrame = 1;
// Clear the old state and wait for an IDR
dropAvcFrameState();
}
// Look for a non-frame start before a frame start
@ -405,7 +401,7 @@ void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length) {
Limelog("Network dropped an entire frame\n");
nextFrameNumber = frameIndex;
// Wait until an IDR frame comes
// Wait until next complete frame
waitingForNextSuccessfulFrame = 1;
dropAvcFrameState();
}