From 67b70e9cbd72c5cda281e69242de5410614795a3 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 15 Feb 2016 17:00:31 -0500 Subject: [PATCH] Implement queue shutdown --- limelight-common/AudioStream.c | 4 +++- limelight-common/InputStream.c | 2 ++ limelight-common/Limelight-internal.h | 1 + limelight-common/LinkedBlockingQueue.c | 28 ++++++++++++++++++++++++++ limelight-common/LinkedBlockingQueue.h | 2 ++ limelight-common/VideoDepacketizer.c | 6 ++++++ limelight-common/VideoStream.c | 4 +++- 7 files changed, 45 insertions(+), 2 deletions(-) diff --git a/limelight-common/AudioStream.c b/limelight-common/AudioStream.c index 8b020a7..4f4e062 100644 --- a/limelight-common/AudioStream.c +++ b/limelight-common/AudioStream.c @@ -249,7 +249,9 @@ static void DecoderThreadProc(void* context) { void stopAudioStream(void) { PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); - if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { + if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { + // Signal threads waiting on the LBQ + LbqSignalQueueShutdown(&packetQueue); PltInterruptThread(&decoderThread); } diff --git a/limelight-common/InputStream.c b/limelight-common/InputStream.c index 60e431c..e99b2b7 100644 --- a/limelight-common/InputStream.c +++ b/limelight-common/InputStream.c @@ -289,6 +289,8 @@ int startInputStream(void) { // Stops the input stream int stopInputStream(void) { + // Signal the input send thread + LbqSignalQueueShutdown(&packetQueue); PltInterruptThread(&inputSendThread); if (inputSock != INVALID_SOCKET) { diff --git a/limelight-common/Limelight-internal.h b/limelight-common/Limelight-internal.h index 169214e..5a1b09d 100644 --- a/limelight-common/Limelight-internal.h +++ b/limelight-common/Limelight-internal.h @@ -39,6 +39,7 @@ void initializeVideoDepacketizer(int pktSize); void destroyVideoDepacketizer(void); void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length); void queueRtpPacket(PRTP_PACKET rtpPacket, int length); +void stopVideoDepacketizer(void); void initializeVideoStream(void); void destroyVideoStream(void); diff --git a/limelight-common/LinkedBlockingQueue.c b/limelight-common/LinkedBlockingQueue.c index 0364382..5b3eabf 100644 --- a/limelight-common/LinkedBlockingQueue.c +++ b/limelight-common/LinkedBlockingQueue.c @@ -2,6 +2,8 @@ // Destroy the linked blocking queue and associated mutex and event PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) { + LC_ASSERT(queueHead->shutdown); + PltDeleteMutex(&queueHead->mutex); PltCloseEvent(&queueHead->containsDataEvent); @@ -46,11 +48,21 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB queueHead->tail = NULL; queueHead->sizeBound = sizeBound; queueHead->currentSize = 0; + queueHead->shutdown = 0; return 0; } +void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead) { + queueHead->shutdown = 1; + PltSetEvent(&queueHead->containsDataEvent); +} + int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) { + if (queueHead->shutdown) { + return LBQ_INTERRUPTED; + } + entry->flink = NULL; entry->data = data; @@ -87,6 +99,10 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOC // This must be synchronized with LbqFlushQueueItems by the caller int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { + if (queueHead->shutdown) { + return LBQ_INTERRUPTED; + } + if (queueHead->head == NULL) { return LBQ_NO_ELEMENT; } @@ -107,6 +123,10 @@ int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; + + if (queueHead->shutdown) { + return LBQ_INTERRUPTED; + } if (queueHead->head == NULL) { return LBQ_NO_ELEMENT; @@ -142,6 +162,10 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; int err; + + if (queueHead->shutdown) { + return LBQ_INTERRUPTED; + } for (;;) { err = PltWaitForEvent(&queueHead->containsDataEvent); @@ -149,6 +173,10 @@ int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { return LBQ_INTERRUPTED; } + if (queueHead->shutdown) { + return LBQ_INTERRUPTED; + } + PltLockMutex(&queueHead->mutex); if (queueHead->head == NULL) { diff --git a/limelight-common/LinkedBlockingQueue.h b/limelight-common/LinkedBlockingQueue.h index e05c646..df74067 100644 --- a/limelight-common/LinkedBlockingQueue.h +++ b/limelight-common/LinkedBlockingQueue.h @@ -19,6 +19,7 @@ typedef struct _LINKED_BLOCKING_QUEUE { PLT_EVENT containsDataEvent; int sizeBound; int currentSize; + int shutdown; PLINKED_BLOCKING_QUEUE_ENTRY head; PLINKED_BLOCKING_QUEUE_ENTRY tail; } LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE; @@ -30,3 +31,4 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead); PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead); +void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead); diff --git a/limelight-common/VideoDepacketizer.c b/limelight-common/VideoDepacketizer.c index 526d551..193070b 100644 --- a/limelight-common/VideoDepacketizer.c +++ b/limelight-common/VideoDepacketizer.c @@ -97,6 +97,12 @@ static void freeDecodeUnitList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { } } +void stopVideoDepacketizer(void) { + if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { + LbqSignalQueueShutdown(&decodeUnitQueue); + } +} + // Cleanup video depacketizer and free malloced memory void destroyVideoDepacketizer(void) { if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { diff --git a/limelight-common/VideoStream.c b/limelight-common/VideoStream.c index f52f3df..5470c01 100644 --- a/limelight-common/VideoStream.c +++ b/limelight-common/VideoStream.c @@ -1,7 +1,6 @@ #include "Limelight-internal.h" #include "PlatformSockets.h" #include "PlatformThreads.h" -#include "LinkedBlockingQueue.h" #include "RtpReorderQueue.h" #define FIRST_FRAME_MAX 1500 @@ -150,6 +149,9 @@ int readFirstFrame(void) { // Terminate the video stream void stopVideoStream(void) { + // Wake up client code that may be waiting on the decode unit queue + stopVideoDepacketizer(); + PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {