Save an allocation per video frame, audio packet, and input event by embedding an LBQ entry inside the linked data structure

This commit is contained in:
Cameron Gutman 2015-05-28 06:06:40 -05:00
parent fcd2a20e01
commit 601dd55d7c
8 changed files with 89 additions and 97 deletions

View File

@ -19,6 +19,12 @@ static PLT_THREAD decoderThread;
#define MAX_PACKET_SIZE 100
typedef struct _QUEUED_AUDIO_PACKET {
char data[MAX_PACKET_SIZE];
int size;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET;
/* Initialize the audio stream */
void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks) {
memcpy(&callbacks, arCallbacks, sizeof(callbacks));
@ -33,8 +39,10 @@ static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
free(entry);
entry = nextEntry;
}
}
@ -46,8 +54,6 @@ void destroyAudioStream(void) {
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
}
static void UdpPingThreadProc(void *context) {
/* Ping in ASCII */
char pingData[] = { 0x50, 0x49, 0x4E, 0x47 };
@ -73,48 +79,45 @@ static void UdpPingThreadProc(void *context) {
}
static void ReceiveThreadProc(void* context) {
SOCK_RET err;
PRTP_PACKET rtp;
int packetSize;
char* buffer = NULL;
PQUEUED_AUDIO_PACKET packet;
int err;
packet = NULL;
while (!PltIsThreadInterrupted(&receiveThread)) {
if (buffer == NULL) {
buffer = (char*) malloc(MAX_PACKET_SIZE + sizeof(int));
if (buffer == NULL) {
if (packet == NULL) {
packet = (PQUEUED_AUDIO_PACKET) malloc(sizeof(*packet));
if (packet == NULL) {
Limelog("Receive thread terminating\n");
listenerCallbacks->connectionTerminated(-1);
return;
}
}
err = recv(rtpSocket, &buffer[sizeof(int)], MAX_PACKET_SIZE, 0);
if (err <= 0) {
packet->size = (int) recv(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, 0);
if (packet->size <= 0) {
Limelog("Receive thread terminating #2\n");
free(buffer);
free(packet);
listenerCallbacks->connectionTerminated(LastSocketError());
return;
}
packetSize = (int)err;
if (packetSize < sizeof(RTP_PACKET)) {
if (packet->size < sizeof(RTP_PACKET)) {
// Runt packet
continue;
}
rtp = (PRTP_PACKET) &buffer[sizeof(int)];
rtp = (PRTP_PACKET) &packet->data[0];
if (rtp->packetType != 97) {
// Not audio
continue;
}
memcpy(buffer, &packetSize, sizeof(int));
err = LbqOfferQueueItem(&packetQueue, buffer);
err = LbqOfferQueueItem(&packetQueue, packet, &packet->entry);
if (err == LBQ_SUCCESS) {
// The queue owns the buffer now
buffer = NULL;
packet = NULL;
}
if (err == LBQ_BOUND_EXCEEDED) {
@ -123,7 +126,7 @@ static void ReceiveThreadProc(void* context) {
}
else if (err == LBQ_INTERRUPTED) {
Limelog("Receive thread terminating #2\n");
free(buffer);
free(packet);
return;
}
}
@ -131,30 +134,18 @@ static void ReceiveThreadProc(void* context) {
static void DecoderThreadProc(void* context) {
PRTP_PACKET rtp;
int length;
int err;
char *data;
PQUEUED_AUDIO_PACKET packet;
unsigned short lastSeq = 0;
while (!PltIsThreadInterrupted(&decoderThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**) &data);
err = LbqWaitForQueueElement(&packetQueue, (void**) &packet);
if (err != LBQ_SUCCESS) {
Limelog("Decoder thread terminating\n");
return;
}
memcpy(&length, data, sizeof(int));
rtp = (PRTP_PACKET) &data[sizeof(int)];
if (length < sizeof(RTP_PACKET)) {
// Runt packet
goto freeandcontinue;
}
if (rtp->packetType != 97) {
// Not audio
goto freeandcontinue;
}
rtp = (PRTP_PACKET) &packet->data[0];
rtp->sequenceNumber = htons(rtp->sequenceNumber);
@ -166,10 +157,9 @@ static void DecoderThreadProc(void* context) {
lastSeq = rtp->sequenceNumber;
callbacks.decodeAndPlaySample((char *) (rtp + 1), length - sizeof(*rtp));
callbacks.decodeAndPlaySample((char *) (rtp + 1), packet->size - sizeof(*rtp));
freeandcontinue:
free(data);
free(packet);
}
}

View File

@ -29,6 +29,7 @@ typedef struct _PACKET_HOLDER {
NV_MULTI_CONTROLLER_PACKET multiController;
NV_SCROLL_PACKET scroll;
} packet;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} PACKET_HOLDER, *PPACKET_HOLDER;
/* Initializes the input stream */
@ -83,8 +84,10 @@ void destroyInputStream(void) {
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored in the data buffer
free(entry->data);
free(entry);
entry = nextEntry;
}
@ -195,7 +198,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) {
holder->packet.mouseMove.deltaX = htons(deltaX);
holder->packet.mouseMove.deltaY = htons(deltaY);
err = LbqOfferQueueItem(&packetQueue, holder);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
free(holder);
}
@ -222,7 +225,7 @@ int LiSendMouseButtonEvent(char action, int button) {
holder->packet.mouseButton.action = action;
holder->packet.mouseButton.button = htonl(button);
err = LbqOfferQueueItem(&packetQueue, holder);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
free(holder);
}
@ -252,7 +255,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) {
holder->packet.keyboard.modifiers = modifiers;
holder->packet.keyboard.zero2 = 0;
err = LbqOfferQueueItem(&packetQueue, holder);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
free(holder);
}
@ -312,7 +315,7 @@ static int sendControllerEventInternal(short controllerNumber, short buttonFlags
holder->packet.multiController.tailB = MC_TAIL_B;
}
err = LbqOfferQueueItem(&packetQueue, holder);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
free(holder);
}
@ -359,7 +362,7 @@ int LiSendScrollEvent(char scrollClicks) {
holder->packet.scroll.scrollAmt2 = holder->packet.scroll.scrollAmt1;
holder->packet.scroll.zero3 = 0;
err = LbqOfferQueueItem(&packetQueue, holder);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
free(holder);
}

View File

@ -30,8 +30,6 @@ void terminateRtspHandshake(void);
void initializeVideoDepacketizer(int pktSize);
void destroyVideoDepacketizer(void);
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length);
int getNextDecodeUnit(PDECODE_UNIT *du);
void freeDecodeUnit(PDECODE_UNIT decodeUnit);
void queueRtpPacket(PRTP_PACKET rtpPacket, int length);
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks);

View File

@ -50,14 +50,7 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB
return 0;
}
int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) {
PLINKED_BLOCKING_QUEUE_ENTRY entry;
entry = (PLINKED_BLOCKING_QUEUE_ENTRY) malloc(sizeof(*entry));
if (entry == NULL) {
return LBQ_NO_MEMORY;
}
int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) {
entry->flink = NULL;
entry->data = data;
@ -65,7 +58,6 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) {
if (queueHead->currentSize == queueHead->sizeBound) {
PltUnlockMutex(&queueHead->mutex);
free(entry);
return LBQ_BOUND_EXCEEDED;
}
@ -125,8 +117,6 @@ int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
*data = entry->data;
free(entry);
PltUnlockMutex(&queueHead->mutex);
break;

View File

@ -6,7 +6,6 @@
#define LBQ_SUCCESS 0
#define LBQ_INTERRUPTED 1
#define LBQ_BOUND_EXCEEDED 2
#define LBQ_NO_MEMORY 3
typedef struct _LINKED_BLOCKING_QUEUE_ENTRY {
struct _LINKED_BLOCKING_QUEUE_ENTRY *flink;
@ -24,7 +23,7 @@ typedef struct _LINKED_BLOCKING_QUEUE {
} LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE;
int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound);
int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data);
int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry);
int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data);
PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead);
PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead);

View File

@ -1,5 +1,15 @@
#pragma once
#include "LinkedBlockingQueue.h"
typedef struct _QUEUED_DECODE_UNIT {
DECODE_UNIT decodeUnit;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_DECODE_UNIT, *PQUEUED_DECODE_UNIT;
void freeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu);
int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT *qdu);
#pragma pack(push, 1)
#define FLAG_CONTAINS_PIC_DATA 0x1

View File

@ -82,8 +82,10 @@ static void freeDecodeUnitList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
free(entry);
entry = nextEntry;
}
}
@ -147,27 +149,51 @@ static int getSpecialSeq(PBUFFER_DESC current, PBUFFER_DESC candidate) {
return 0;
}
/* Get the first decode unit available */
int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT *qdu) {
int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**) qdu);
if (err == LBQ_SUCCESS) {
return 1;
}
else {
return 0;
}
}
/* Cleanup a decode unit by freeing the buffer chain and the holder */
void freeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu) {
PLENTRY lastEntry;
while (qdu->decodeUnit.bufferList != NULL) {
lastEntry = qdu->decodeUnit.bufferList;
qdu->decodeUnit.bufferList = lastEntry->next;
free(lastEntry);
}
free(qdu);
}
/* Reassemble the frame with the given frame number */
static void reassembleAvcFrame(int frameNumber) {
if (nalChainHead != NULL) {
PDECODE_UNIT du = (PDECODE_UNIT) malloc(sizeof(*du));
if (du != NULL) {
du->bufferList = nalChainHead;
du->fullLength = nalChainDataLength;
PQUEUED_DECODE_UNIT qdu = (PQUEUED_DECODE_UNIT) malloc(sizeof(*qdu));
if (qdu != NULL) {
qdu->decodeUnit.bufferList = nalChainHead;
qdu->decodeUnit.fullLength = nalChainDataLength;
nalChainHead = NULL;
nalChainDataLength = 0;
if (LbqOfferQueueItem(&decodeUnitQueue, du) == LBQ_BOUND_EXCEEDED) {
if (LbqOfferQueueItem(&decodeUnitQueue, qdu, &qdu->entry) == LBQ_BOUND_EXCEEDED) {
Limelog("Decode unit queue overflow\n");
// Clear frame state and wait for an IDR
nalChainHead = du->bufferList;
nalChainDataLength = du->fullLength;
nalChainHead = qdu->decodeUnit.bufferList;
nalChainDataLength = qdu->decodeUnit.fullLength;
dropAvcFrameState();
// Free the DU
free(du);
free(qdu);
// Flush the decode unit queue
freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue));
@ -186,30 +212,6 @@ static void reassembleAvcFrame(int frameNumber) {
}
}
/* Given a decode unit, get the next one in the linked blocking queue */
int getNextDecodeUnit(PDECODE_UNIT *du) {
int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**)du);
if (err == LBQ_SUCCESS) {
return 1;
}
else {
return 0;
}
}
/* Cleanup a decode unit by freeing malloced memory */
void freeDecodeUnit(PDECODE_UNIT decodeUnit) {
PLENTRY lastEntry;
while (decodeUnit->bufferList != NULL) {
lastEntry = decodeUnit->bufferList;
decodeUnit->bufferList = lastEntry->next;
free(lastEntry);
}
free(decodeUnit);
}
static void queueFragment(char *data, int offset, int length) {
PLENTRY entry = (PLENTRY) malloc(sizeof(*entry) + length);
if (entry != NULL) {

View File

@ -92,16 +92,16 @@ static void ReceiveThreadProc(void* context) {
/* Decoder thread proc */
static void DecoderThreadProc(void* context) {
PDECODE_UNIT du;
PQUEUED_DECODE_UNIT qdu;
while (!PltIsThreadInterrupted(&decoderThread)) {
if (!getNextDecodeUnit(&du)) {
if (!getNextQueuedDecodeUnit(&qdu)) {
printf("Decoder thread terminating\n");
return;
}
int ret = callbacks.submitDecodeUnit(du);
int ret = callbacks.submitDecodeUnit(&qdu->decodeUnit);
freeDecodeUnit(du);
freeQueuedDecodeUnit(qdu);
if (ret == DR_NEED_IDR) {
Limelog("Request IDR frame on behalf of DR\n");