Eliminate a copy and allocation for each incoming video packet

This commit is contained in:
Cameron Gutman 2019-04-25 16:07:34 -07:00
parent c87d4a755e
commit f395d3056a
4 changed files with 107 additions and 42 deletions

View File

@ -65,7 +65,6 @@ int performRtspHandshake(void);
void initializeVideoDepacketizer(int pktSize);
void destroyVideoDepacketizer(void);
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, unsigned long long receiveTimeMs);
void queueRtpPacket(PRTPFEC_QUEUE_ENTRY queueEntry);
void stopVideoDepacketizer(void);
void requestDecoderRefresh(void);

View File

@ -5,6 +5,7 @@
typedef struct _QUEUED_DECODE_UNIT {
DECODE_UNIT decodeUnit;
LINKED_BLOCKING_QUEUE_ENTRY entry;
int onStack;
} QUEUED_DECODE_UNIT, *PQUEUED_DECODE_UNIT;
void completeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu, int drStatus);

View File

@ -4,6 +4,7 @@
#include "Video.h"
static PLENTRY nalChainHead;
static PLENTRY nalChainTail;
static int nalChainDataLength;
static unsigned int nextFrameNumber;
@ -30,6 +31,11 @@ typedef struct _BUFFER_DESC {
unsigned int length;
} BUFFER_DESC, *PBUFFER_DESC;
typedef struct _LENTRY_INTERNAL {
LENTRY entry;
void* allocPtr;
} LENTRY_INTERNAL, *PLENTRY_INTERNAL;
// Init
void initializeVideoDepacketizer(int pktSize) {
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
@ -50,14 +56,16 @@ void initializeVideoDepacketizer(int pktSize) {
// Free the NAL chain
static void cleanupFrameState(void) {
PLENTRY lastEntry;
PLENTRY_INTERNAL lastEntry;
while (nalChainHead != NULL) {
lastEntry = nalChainHead;
nalChainHead = lastEntry->next;
free(lastEntry);
lastEntry = (PLENTRY_INTERNAL)nalChainHead;
nalChainHead = lastEntry->entry.next;
free(lastEntry->allocPtr);
}
nalChainTail = NULL;
nalChainDataLength = 0;
}
@ -178,6 +186,7 @@ static int getSpecialSeq(PBUFFER_DESC current, PBUFFER_DESC candidate) {
int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT* qdu) {
int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**)qdu);
if (err == LBQ_SUCCESS) {
LC_ASSERT(!(*qdu)->onStack);
return 1;
}
else {
@ -187,7 +196,7 @@ int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT* qdu) {
// Cleanup a decode unit by freeing the buffer chain and the holder
void completeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu, int drStatus) {
PLENTRY lastEntry;
PLENTRY_INTERNAL lastEntry;
if (drStatus == DR_NEED_IDR) {
Limelog("Requesting IDR frame on behalf of DR\n");
@ -200,12 +209,14 @@ void completeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu, int drStatus) {
}
while (qdu->decodeUnit.bufferList != NULL) {
lastEntry = qdu->decodeUnit.bufferList;
qdu->decodeUnit.bufferList = lastEntry->next;
free(lastEntry);
lastEntry = (PLENTRY_INTERNAL)qdu->decodeUnit.bufferList;
qdu->decodeUnit.bufferList = lastEntry->entry.next;
free(lastEntry->allocPtr);
}
free(qdu);
if (!qdu->onStack) {
free(qdu);
}
}
// Returns 1 if the special sequence describes an I-frame
@ -241,7 +252,17 @@ static int isIdrFrameStart(PBUFFER_DESC buffer) {
// Reassemble the frame with the given frame number
static void reassembleFrame(int frameNumber) {
if (nalChainHead != NULL) {
PQUEUED_DECODE_UNIT qdu = (PQUEUED_DECODE_UNIT)malloc(sizeof(*qdu));
QUEUED_DECODE_UNIT qduDS;
PQUEUED_DECODE_UNIT qdu;
// Use a stack allocation if we won't be queuing this
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
qdu = (PQUEUED_DECODE_UNIT)malloc(sizeof(*qdu));
}
else {
qdu = &qduDS;
}
if (qdu != NULL) {
qdu->decodeUnit.bufferList = nalChainHead;
qdu->decodeUnit.fullLength = nalChainDataLength;
@ -256,10 +277,13 @@ static void reassembleFrame(int frameNumber) {
qdu->decodeUnit.frameType = FRAME_TYPE_PFRAME;
}
nalChainHead = NULL;
nalChainHead = nalChainTail = NULL;
nalChainDataLength = 0;
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Dynamically allocated
qdu->onStack = 0;
if (LbqOfferQueueItem(&decodeUnitQueue, qdu, &qdu->entry) == LBQ_BOUND_EXCEEDED) {
Limelog("Video decode unit queue overflow\n");
@ -280,6 +304,9 @@ static void reassembleFrame(int frameNumber) {
}
}
else {
// Allocated on stack
qdu->onStack = 1;
int ret = VideoCallbacks.submitDecodeUnit(&qdu->decodeUnit);
completeQueuedDecodeUnit(qdu, ret);
@ -330,44 +357,68 @@ static int getBufferFlags(char* data, int length) {
}
}
static void queueFragment(char* data, int offset, int length) {
PLENTRY entry = (PLENTRY)malloc(sizeof(*entry) + length);
// As an optimization, we can cast the existing packet buffer to a PLENTRY and avoid
// a malloc() and a memcpy() of the packet data.
static void queueFragment(PLENTRY_INTERNAL* existingEntry, char* data, int offset, int length) {
PLENTRY_INTERNAL entry;
if (existingEntry == NULL || *existingEntry == NULL) {
entry = (PLENTRY_INTERNAL)malloc(sizeof(*entry) + length);
}
else {
entry = *existingEntry;
}
if (entry != NULL) {
entry->next = NULL;
entry->length = length;
entry->data = (char*)(entry + 1);
entry->entry.next = NULL;
entry->entry.length = length;
memcpy(entry->data, &data[offset], entry->length);
// If we had to allocate a new entry, we must copy the data. If not,
// the data already resides within the LENTRY allocation.
if (existingEntry == NULL || *existingEntry == NULL) {
entry->allocPtr = entry;
entry->bufferType = getBufferFlags(entry->data, entry->length);
nalChainDataLength += entry->length;
if (nalChainHead == NULL) {
nalChainHead = entry;
entry->entry.data = (char*)(entry + 1);
memcpy(entry->entry.data, &data[offset], entry->entry.length);
}
else {
PLENTRY currentEntry = nalChainHead;
entry->entry.data = &data[offset];
while (currentEntry->next != NULL) {
currentEntry = currentEntry->next;
}
// The caller should have already set this up for us
LC_ASSERT(entry->allocPtr != NULL);
currentEntry->next = entry;
// We now own the packet buffer and will manage freeing it
*existingEntry = NULL;
}
entry->entry.bufferType = getBufferFlags(entry->entry.data, entry->entry.length);
nalChainDataLength += entry->entry.length;
if (nalChainTail == NULL) {
LC_ASSERT(nalChainHead == NULL);
nalChainHead = nalChainTail = (PLENTRY)entry;
}
else {
LC_ASSERT(nalChainHead != NULL);
nalChainTail->next = (PLENTRY)entry;
nalChainTail = nalChainTail->next;
}
}
}
// Process an RTP Payload
static void processRtpPayloadSlow(PNV_VIDEO_PACKET videoPacket, PBUFFER_DESC currentPos) {
// Process an RTP Payload using the slow path that handles multiple NALUs per packet
static void processRtpPayloadSlow(PBUFFER_DESC currentPos, PLENTRY_INTERNAL* existingEntry) {
BUFFER_DESC specialSeq;
int decodingVideo = 0;
// We should not have any NALUs when processing the first packet in an IDR frame
LC_ASSERT(nalChainHead == NULL);
LC_ASSERT(nalChainTail == NULL);
while (currentPos->length != 0) {
int start = currentPos->offset;
int containsPicData = 0;
if (getSpecialSeq(currentPos, &specialSeq)) {
if (isSeqAnnexBStart(&specialSeq)) {
@ -384,6 +435,10 @@ static void processRtpPayloadSlow(PNV_VIDEO_PACKET videoPacket, PBUFFER_DESC cur
// Cancel any pending IDR frame request
waitingForNextSuccessfulFrame = 0;
// Use the cached LENTRY for this NALU since it will be
// the bulk of the data in this packet.
containsPicData = 1;
}
}
@ -416,7 +471,10 @@ static void processRtpPayloadSlow(PNV_VIDEO_PACKET videoPacket, PBUFFER_DESC cur
}
if (decodingVideo) {
queueFragment(currentPos->data, start, currentPos->offset - start);
// To minimize copies, we'll use allocate for SPS, PPS, and VPS to allow
// us to reuse the packet buffer for the picture data in the I-frame.
queueFragment(containsPicData ? existingEntry : NULL,
currentPos->data, start, currentPos->offset - start);
}
}
}
@ -452,13 +510,9 @@ static int isFirstPacket(char flags) {
flags == FLAG_SOF);
}
// Adds a fragment directly to the queue
static void processRtpPayloadFast(BUFFER_DESC location) {
queueFragment(location.data, location.offset, location.length);
}
// Process an RTP Payload
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, unsigned long long receiveTimeMs) {
// The caller will free *existingEntry unless we NULL it
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, unsigned long long receiveTimeMs, PLENTRY_INTERNAL* existingEntry) {
BUFFER_DESC currentPos;
int frameIndex;
char flags;
@ -557,11 +611,11 @@ void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, unsigned long l
if (firstPacket && isIdrFrameStart(&currentPos))
{
// SPS and PPS prefix is padded between NALs, so we must decode it with the slow path
processRtpPayloadSlow(videoPacket, &currentPos);
processRtpPayloadSlow(&currentPos, existingEntry);
}
else
{
processRtpPayloadFast(currentPos);
queueFragment(existingEntry, currentPos.data, currentPos.offset, currentPos.length);
}
if (flags & FLAG_EOF) {
@ -620,7 +674,18 @@ void queueRtpPacket(PRTPFEC_QUEUE_ENTRY queueEntry) {
dataOffset += 4; // 2 additional fields
}
// Reuse the memory reserved for the RTPFEC_QUEUE_ENTRY to store the LENTRY_INTERNAL
// now that we're in the depacketizer.
PLENTRY_INTERNAL existingEntry = (PLENTRY_INTERNAL)queueEntry;
existingEntry->allocPtr = queueEntry->packet;
processRtpPayload((PNV_VIDEO_PACKET)(((char*)queueEntry->packet) + dataOffset),
queueEntry->length - dataOffset,
queueEntry->receiveTimeMs);
queueEntry->receiveTimeMs,
&existingEntry);
if (existingEntry != NULL) {
// processRtpPayload didn't want this packet, so just free it
free(existingEntry->allocPtr);
}
}

View File

@ -126,8 +126,8 @@ static void ReceiveThreadProc(void* context) {
// The packet queue now has packets ready
buffer = NULL;
while ((queueEntry = RtpfGetQueuedPacket(&rtpQueue)) != NULL) {
// queueRtpPacket takes ownership of the packet
queueRtpPacket(queueEntry);
free(queueEntry->packet);
}
}
else if (queueStatus == RTPF_RET_QUEUED_NOTHING_READY) {