From 601dd55d7c33099c0704fbc80e3ccf07f6dce33a Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Thu, 28 May 2015 06:06:40 -0500 Subject: [PATCH] Save an allocation per video frame, audio packet, and input event by embedding an LBQ entry inside the linked data structure --- limelight-common/AudioStream.c | 68 +++++++++++--------------- limelight-common/InputStream.c | 15 +++--- limelight-common/Limelight-internal.h | 2 - limelight-common/LinkedBlockingQueue.c | 12 +---- limelight-common/LinkedBlockingQueue.h | 3 +- limelight-common/Video.h | 10 ++++ limelight-common/VideoDepacketizer.c | 68 +++++++++++++------------- limelight-common/VideoStream.c | 8 +-- 8 files changed, 89 insertions(+), 97 deletions(-) diff --git a/limelight-common/AudioStream.c b/limelight-common/AudioStream.c index 7b79313..c6efc36 100644 --- a/limelight-common/AudioStream.c +++ b/limelight-common/AudioStream.c @@ -19,6 +19,12 @@ static PLT_THREAD decoderThread; #define MAX_PACKET_SIZE 100 +typedef struct _QUEUED_AUDIO_PACKET { + char data[MAX_PACKET_SIZE]; + int size; + LINKED_BLOCKING_QUEUE_ENTRY entry; +} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET; + /* Initialize the audio stream */ void initializeAudioStream(IP_ADDRESS host, PAUDIO_RENDERER_CALLBACKS arCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks) { memcpy(&callbacks, arCallbacks, sizeof(callbacks)); @@ -33,8 +39,10 @@ static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { while (entry != NULL) { nextEntry = entry->flink; + + // The entry is stored within the data allocation free(entry->data); - free(entry); + entry = nextEntry; } } @@ -46,8 +54,6 @@ void destroyAudioStream(void) { freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue)); } - - static void UdpPingThreadProc(void *context) { /* Ping in ASCII */ char pingData[] = { 0x50, 0x49, 0x4E, 0x47 }; @@ -73,48 +79,45 @@ static void UdpPingThreadProc(void *context) { } static void ReceiveThreadProc(void* context) { - SOCK_RET err; PRTP_PACKET rtp; - int packetSize; - char* buffer = NULL; + PQUEUED_AUDIO_PACKET packet; + int err; + + packet = NULL; while (!PltIsThreadInterrupted(&receiveThread)) { - if (buffer == NULL) { - buffer = (char*) malloc(MAX_PACKET_SIZE + sizeof(int)); - if (buffer == NULL) { + if (packet == NULL) { + packet = (PQUEUED_AUDIO_PACKET) malloc(sizeof(*packet)); + if (packet == NULL) { Limelog("Receive thread terminating\n"); listenerCallbacks->connectionTerminated(-1); return; } } - err = recv(rtpSocket, &buffer[sizeof(int)], MAX_PACKET_SIZE, 0); - if (err <= 0) { + packet->size = (int) recv(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, 0); + if (packet->size <= 0) { Limelog("Receive thread terminating #2\n"); - free(buffer); + free(packet); listenerCallbacks->connectionTerminated(LastSocketError()); return; } - - packetSize = (int)err; - if (packetSize < sizeof(RTP_PACKET)) { + if (packet->size < sizeof(RTP_PACKET)) { // Runt packet continue; } - rtp = (PRTP_PACKET) &buffer[sizeof(int)]; + rtp = (PRTP_PACKET) &packet->data[0]; if (rtp->packetType != 97) { // Not audio continue; } - memcpy(buffer, &packetSize, sizeof(int)); - - err = LbqOfferQueueItem(&packetQueue, buffer); + err = LbqOfferQueueItem(&packetQueue, packet, &packet->entry); if (err == LBQ_SUCCESS) { // The queue owns the buffer now - buffer = NULL; + packet = NULL; } if (err == LBQ_BOUND_EXCEEDED) { @@ -123,7 +126,7 @@ static void ReceiveThreadProc(void* context) { } else if (err == LBQ_INTERRUPTED) { Limelog("Receive thread terminating #2\n"); - free(buffer); + free(packet); return; } } @@ -131,30 +134,18 @@ static void ReceiveThreadProc(void* context) { static void DecoderThreadProc(void* context) { PRTP_PACKET rtp; - int length; int err; - char *data; + PQUEUED_AUDIO_PACKET packet; unsigned short lastSeq = 0; while (!PltIsThreadInterrupted(&decoderThread)) { - err = LbqWaitForQueueElement(&packetQueue, (void**) &data); + err = LbqWaitForQueueElement(&packetQueue, (void**) &packet); if (err != LBQ_SUCCESS) { Limelog("Decoder thread terminating\n"); return; } - memcpy(&length, data, sizeof(int)); - rtp = (PRTP_PACKET) &data[sizeof(int)]; - - if (length < sizeof(RTP_PACKET)) { - // Runt packet - goto freeandcontinue; - } - - if (rtp->packetType != 97) { - // Not audio - goto freeandcontinue; - } + rtp = (PRTP_PACKET) &packet->data[0]; rtp->sequenceNumber = htons(rtp->sequenceNumber); @@ -166,10 +157,9 @@ static void DecoderThreadProc(void* context) { lastSeq = rtp->sequenceNumber; - callbacks.decodeAndPlaySample((char *) (rtp + 1), length - sizeof(*rtp)); + callbacks.decodeAndPlaySample((char *) (rtp + 1), packet->size - sizeof(*rtp)); - freeandcontinue: - free(data); + free(packet); } } diff --git a/limelight-common/InputStream.c b/limelight-common/InputStream.c index 008e535..cc59348 100644 --- a/limelight-common/InputStream.c +++ b/limelight-common/InputStream.c @@ -29,6 +29,7 @@ typedef struct _PACKET_HOLDER { NV_MULTI_CONTROLLER_PACKET multiController; NV_SCROLL_PACKET scroll; } packet; + LINKED_BLOCKING_QUEUE_ENTRY entry; } PACKET_HOLDER, *PPACKET_HOLDER; /* Initializes the input stream */ @@ -83,8 +84,10 @@ void destroyInputStream(void) { while (entry != NULL) { nextEntry = entry->flink; + + // The entry is stored in the data buffer free(entry->data); - free(entry); + entry = nextEntry; } @@ -195,7 +198,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) { holder->packet.mouseMove.deltaX = htons(deltaX); holder->packet.mouseMove.deltaY = htons(deltaY); - err = LbqOfferQueueItem(&packetQueue, holder); + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); if (err != LBQ_SUCCESS) { free(holder); } @@ -222,7 +225,7 @@ int LiSendMouseButtonEvent(char action, int button) { holder->packet.mouseButton.action = action; holder->packet.mouseButton.button = htonl(button); - err = LbqOfferQueueItem(&packetQueue, holder); + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); if (err != LBQ_SUCCESS) { free(holder); } @@ -252,7 +255,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) { holder->packet.keyboard.modifiers = modifiers; holder->packet.keyboard.zero2 = 0; - err = LbqOfferQueueItem(&packetQueue, holder); + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); if (err != LBQ_SUCCESS) { free(holder); } @@ -312,7 +315,7 @@ static int sendControllerEventInternal(short controllerNumber, short buttonFlags holder->packet.multiController.tailB = MC_TAIL_B; } - err = LbqOfferQueueItem(&packetQueue, holder); + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); if (err != LBQ_SUCCESS) { free(holder); } @@ -359,7 +362,7 @@ int LiSendScrollEvent(char scrollClicks) { holder->packet.scroll.scrollAmt2 = holder->packet.scroll.scrollAmt1; holder->packet.scroll.zero3 = 0; - err = LbqOfferQueueItem(&packetQueue, holder); + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); if (err != LBQ_SUCCESS) { free(holder); } diff --git a/limelight-common/Limelight-internal.h b/limelight-common/Limelight-internal.h index e1d322d..9d9d1e5 100644 --- a/limelight-common/Limelight-internal.h +++ b/limelight-common/Limelight-internal.h @@ -30,8 +30,6 @@ void terminateRtspHandshake(void); void initializeVideoDepacketizer(int pktSize); void destroyVideoDepacketizer(void); void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length); -int getNextDecodeUnit(PDECODE_UNIT *du); -void freeDecodeUnit(PDECODE_UNIT decodeUnit); void queueRtpPacket(PRTP_PACKET rtpPacket, int length); void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, PCONNECTION_LISTENER_CALLBACKS clCallbacks); diff --git a/limelight-common/LinkedBlockingQueue.c b/limelight-common/LinkedBlockingQueue.c index 5f5e3d2..c30b165 100644 --- a/limelight-common/LinkedBlockingQueue.c +++ b/limelight-common/LinkedBlockingQueue.c @@ -50,14 +50,7 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB return 0; } -int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { - PLINKED_BLOCKING_QUEUE_ENTRY entry; - - entry = (PLINKED_BLOCKING_QUEUE_ENTRY) malloc(sizeof(*entry)); - if (entry == NULL) { - return LBQ_NO_MEMORY; - } - +int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) { entry->flink = NULL; entry->data = data; @@ -65,7 +58,6 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data) { if (queueHead->currentSize == queueHead->sizeBound) { PltUnlockMutex(&queueHead->mutex); - free(entry); return LBQ_BOUND_EXCEEDED; } @@ -125,8 +117,6 @@ int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { *data = entry->data; - free(entry); - PltUnlockMutex(&queueHead->mutex); break; diff --git a/limelight-common/LinkedBlockingQueue.h b/limelight-common/LinkedBlockingQueue.h index e501bd5..59a2571 100644 --- a/limelight-common/LinkedBlockingQueue.h +++ b/limelight-common/LinkedBlockingQueue.h @@ -6,7 +6,6 @@ #define LBQ_SUCCESS 0 #define LBQ_INTERRUPTED 1 #define LBQ_BOUND_EXCEEDED 2 -#define LBQ_NO_MEMORY 3 typedef struct _LINKED_BLOCKING_QUEUE_ENTRY { struct _LINKED_BLOCKING_QUEUE_ENTRY *flink; @@ -24,7 +23,7 @@ typedef struct _LINKED_BLOCKING_QUEUE { } LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE; int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeBound); -int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data); +int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry); int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead); PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead); diff --git a/limelight-common/Video.h b/limelight-common/Video.h index db40805..a447120 100644 --- a/limelight-common/Video.h +++ b/limelight-common/Video.h @@ -1,5 +1,15 @@ #pragma once +#include "LinkedBlockingQueue.h" + +typedef struct _QUEUED_DECODE_UNIT { + DECODE_UNIT decodeUnit; + LINKED_BLOCKING_QUEUE_ENTRY entry; +} QUEUED_DECODE_UNIT, *PQUEUED_DECODE_UNIT; + +void freeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu); +int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT *qdu); + #pragma pack(push, 1) #define FLAG_CONTAINS_PIC_DATA 0x1 diff --git a/limelight-common/VideoDepacketizer.c b/limelight-common/VideoDepacketizer.c index f54696c..ba22065 100644 --- a/limelight-common/VideoDepacketizer.c +++ b/limelight-common/VideoDepacketizer.c @@ -82,8 +82,10 @@ static void freeDecodeUnitList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { while (entry != NULL) { nextEntry = entry->flink; + + // The entry is stored within the data allocation free(entry->data); - free(entry); + entry = nextEntry; } } @@ -147,27 +149,51 @@ static int getSpecialSeq(PBUFFER_DESC current, PBUFFER_DESC candidate) { return 0; } +/* Get the first decode unit available */ +int getNextQueuedDecodeUnit(PQUEUED_DECODE_UNIT *qdu) { + int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**) qdu); + if (err == LBQ_SUCCESS) { + return 1; + } + else { + return 0; + } +} + +/* Cleanup a decode unit by freeing the buffer chain and the holder */ +void freeQueuedDecodeUnit(PQUEUED_DECODE_UNIT qdu) { + PLENTRY lastEntry; + + while (qdu->decodeUnit.bufferList != NULL) { + lastEntry = qdu->decodeUnit.bufferList; + qdu->decodeUnit.bufferList = lastEntry->next; + free(lastEntry); + } + + free(qdu); +} + /* Reassemble the frame with the given frame number */ static void reassembleAvcFrame(int frameNumber) { if (nalChainHead != NULL) { - PDECODE_UNIT du = (PDECODE_UNIT) malloc(sizeof(*du)); - if (du != NULL) { - du->bufferList = nalChainHead; - du->fullLength = nalChainDataLength; + PQUEUED_DECODE_UNIT qdu = (PQUEUED_DECODE_UNIT) malloc(sizeof(*qdu)); + if (qdu != NULL) { + qdu->decodeUnit.bufferList = nalChainHead; + qdu->decodeUnit.fullLength = nalChainDataLength; nalChainHead = NULL; nalChainDataLength = 0; - if (LbqOfferQueueItem(&decodeUnitQueue, du) == LBQ_BOUND_EXCEEDED) { + if (LbqOfferQueueItem(&decodeUnitQueue, qdu, &qdu->entry) == LBQ_BOUND_EXCEEDED) { Limelog("Decode unit queue overflow\n"); // Clear frame state and wait for an IDR - nalChainHead = du->bufferList; - nalChainDataLength = du->fullLength; + nalChainHead = qdu->decodeUnit.bufferList; + nalChainDataLength = qdu->decodeUnit.fullLength; dropAvcFrameState(); // Free the DU - free(du); + free(qdu); // Flush the decode unit queue freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue)); @@ -186,30 +212,6 @@ static void reassembleAvcFrame(int frameNumber) { } } -/* Given a decode unit, get the next one in the linked blocking queue */ -int getNextDecodeUnit(PDECODE_UNIT *du) { - int err = LbqWaitForQueueElement(&decodeUnitQueue, (void**)du); - if (err == LBQ_SUCCESS) { - return 1; - } - else { - return 0; - } -} - -/* Cleanup a decode unit by freeing malloced memory */ -void freeDecodeUnit(PDECODE_UNIT decodeUnit) { - PLENTRY lastEntry; - - while (decodeUnit->bufferList != NULL) { - lastEntry = decodeUnit->bufferList; - decodeUnit->bufferList = lastEntry->next; - free(lastEntry); - } - - free(decodeUnit); -} - static void queueFragment(char *data, int offset, int length) { PLENTRY entry = (PLENTRY) malloc(sizeof(*entry) + length); if (entry != NULL) { diff --git a/limelight-common/VideoStream.c b/limelight-common/VideoStream.c index a3f0d4b..22689e8 100644 --- a/limelight-common/VideoStream.c +++ b/limelight-common/VideoStream.c @@ -92,16 +92,16 @@ static void ReceiveThreadProc(void* context) { /* Decoder thread proc */ static void DecoderThreadProc(void* context) { - PDECODE_UNIT du; + PQUEUED_DECODE_UNIT qdu; while (!PltIsThreadInterrupted(&decoderThread)) { - if (!getNextDecodeUnit(&du)) { + if (!getNextQueuedDecodeUnit(&qdu)) { printf("Decoder thread terminating\n"); return; } - int ret = callbacks.submitDecodeUnit(du); + int ret = callbacks.submitDecodeUnit(&qdu->decodeUnit); - freeDecodeUnit(du); + freeQueuedDecodeUnit(qdu); if (ret == DR_NEED_IDR) { Limelog("Request IDR frame on behalf of DR\n");