Implement queue shutdown

This commit is contained in:
Cameron Gutman 2016-02-15 17:00:31 -05:00
parent cb43494cd0
commit 67b70e9cbd
7 changed files with 45 additions and 2 deletions

View File

@ -250,6 +250,8 @@ void stopAudioStream(void) {
PltInterruptThread(&udpPingThread); PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread); PltInterruptThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Signal threads waiting on the LBQ
LbqSignalQueueShutdown(&packetQueue);
PltInterruptThread(&decoderThread); PltInterruptThread(&decoderThread);
} }

View File

@ -289,6 +289,8 @@ int startInputStream(void) {
// Stops the input stream // Stops the input stream
int stopInputStream(void) { int stopInputStream(void) {
// Signal the input send thread
LbqSignalQueueShutdown(&packetQueue);
PltInterruptThread(&inputSendThread); PltInterruptThread(&inputSendThread);
if (inputSock != INVALID_SOCKET) { if (inputSock != INVALID_SOCKET) {

View File

@ -39,6 +39,7 @@ void initializeVideoDepacketizer(int pktSize);
void destroyVideoDepacketizer(void); void destroyVideoDepacketizer(void);
void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length); void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length);
void queueRtpPacket(PRTP_PACKET rtpPacket, int length); void queueRtpPacket(PRTP_PACKET rtpPacket, int length);
void stopVideoDepacketizer(void);
void initializeVideoStream(void); void initializeVideoStream(void);
void destroyVideoStream(void); void destroyVideoStream(void);

View File

@ -2,6 +2,8 @@
// Destroy the linked blocking queue and associated mutex and event // Destroy the linked blocking queue and associated mutex and event
PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) { PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) {
LC_ASSERT(queueHead->shutdown);
PltDeleteMutex(&queueHead->mutex); PltDeleteMutex(&queueHead->mutex);
PltCloseEvent(&queueHead->containsDataEvent); PltCloseEvent(&queueHead->containsDataEvent);
@ -46,11 +48,21 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB
queueHead->tail = NULL; queueHead->tail = NULL;
queueHead->sizeBound = sizeBound; queueHead->sizeBound = sizeBound;
queueHead->currentSize = 0; queueHead->currentSize = 0;
queueHead->shutdown = 0;
return 0; return 0;
} }
void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead) {
queueHead->shutdown = 1;
PltSetEvent(&queueHead->containsDataEvent);
}
int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) { int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) {
if (queueHead->shutdown) {
return LBQ_INTERRUPTED;
}
entry->flink = NULL; entry->flink = NULL;
entry->data = data; entry->data = data;
@ -87,6 +99,10 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOC
// This must be synchronized with LbqFlushQueueItems by the caller // This must be synchronized with LbqFlushQueueItems by the caller
int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
if (queueHead->shutdown) {
return LBQ_INTERRUPTED;
}
if (queueHead->head == NULL) { if (queueHead->head == NULL) {
return LBQ_NO_ELEMENT; return LBQ_NO_ELEMENT;
} }
@ -108,6 +124,10 @@ int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
PLINKED_BLOCKING_QUEUE_ENTRY entry; PLINKED_BLOCKING_QUEUE_ENTRY entry;
if (queueHead->shutdown) {
return LBQ_INTERRUPTED;
}
if (queueHead->head == NULL) { if (queueHead->head == NULL) {
return LBQ_NO_ELEMENT; return LBQ_NO_ELEMENT;
} }
@ -143,12 +163,20 @@ int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) {
PLINKED_BLOCKING_QUEUE_ENTRY entry; PLINKED_BLOCKING_QUEUE_ENTRY entry;
int err; int err;
if (queueHead->shutdown) {
return LBQ_INTERRUPTED;
}
for (;;) { for (;;) {
err = PltWaitForEvent(&queueHead->containsDataEvent); err = PltWaitForEvent(&queueHead->containsDataEvent);
if (err != PLT_WAIT_SUCCESS) { if (err != PLT_WAIT_SUCCESS) {
return LBQ_INTERRUPTED; return LBQ_INTERRUPTED;
} }
if (queueHead->shutdown) {
return LBQ_INTERRUPTED;
}
PltLockMutex(&queueHead->mutex); PltLockMutex(&queueHead->mutex);
if (queueHead->head == NULL) { if (queueHead->head == NULL) {

View File

@ -19,6 +19,7 @@ typedef struct _LINKED_BLOCKING_QUEUE {
PLT_EVENT containsDataEvent; PLT_EVENT containsDataEvent;
int sizeBound; int sizeBound;
int currentSize; int currentSize;
int shutdown;
PLINKED_BLOCKING_QUEUE_ENTRY head; PLINKED_BLOCKING_QUEUE_ENTRY head;
PLINKED_BLOCKING_QUEUE_ENTRY tail; PLINKED_BLOCKING_QUEUE_ENTRY tail;
} LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE; } LINKED_BLOCKING_QUEUE, *PLINKED_BLOCKING_QUEUE;
@ -30,3 +31,4 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data);
int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data);
PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead); PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead);
PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead); PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead);
void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead);

View File

@ -97,6 +97,12 @@ static void freeDecodeUnitList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
} }
} }
void stopVideoDepacketizer(void) {
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
LbqSignalQueueShutdown(&decodeUnitQueue);
}
}
// Cleanup video depacketizer and free malloced memory // Cleanup video depacketizer and free malloced memory
void destroyVideoDepacketizer(void) { void destroyVideoDepacketizer(void) {
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {

View File

@ -1,7 +1,6 @@
#include "Limelight-internal.h" #include "Limelight-internal.h"
#include "PlatformSockets.h" #include "PlatformSockets.h"
#include "PlatformThreads.h" #include "PlatformThreads.h"
#include "LinkedBlockingQueue.h"
#include "RtpReorderQueue.h" #include "RtpReorderQueue.h"
#define FIRST_FRAME_MAX 1500 #define FIRST_FRAME_MAX 1500
@ -150,6 +149,9 @@ int readFirstFrame(void) {
// Terminate the video stream // Terminate the video stream
void stopVideoStream(void) { void stopVideoStream(void) {
// Wake up client code that may be waiting on the decode unit queue
stopVideoDepacketizer();
PltInterruptThread(&udpPingThread); PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread); PltInterruptThread(&receiveThread);
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {