From 431e188b0796f94091d5221ae79bd473a0228e71 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 15 May 2021 22:01:50 -0500 Subject: [PATCH] Allow the input queue to drain before terminating the thread This ensures any final input messages (like key ups) make it to the host --- src/InputStream.c | 7 +++-- src/LinkedBlockingQueue.c | 65 ++++++++++++++++++++++++++++++--------- src/LinkedBlockingQueue.h | 2 ++ 3 files changed, 57 insertions(+), 17 deletions(-) diff --git a/src/InputStream.c b/src/InputStream.c index 29f0546..734ed15 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -361,14 +361,15 @@ int stopInputStream(void) { // No more packets should be queued now initialized = false; - // Signal the input send thread - LbqSignalQueueShutdown(&packetQueue); - PltInterruptThread(&inputSendThread); + // Signal the input send thread to drain all pending + // input packets before shutting down. + LbqSignalQueueDrain(&packetQueue); if (inputSock != INVALID_SOCKET) { shutdownTcpSocket(inputSock); } + PltInterruptThread(&inputSendThread); PltJoinThread(&inputSendThread); PltCloseThread(&inputSendThread); diff --git a/src/LinkedBlockingQueue.c b/src/LinkedBlockingQueue.c index 04a8734..0b40101 100644 --- a/src/LinkedBlockingQueue.c +++ b/src/LinkedBlockingQueue.c @@ -2,7 +2,7 @@ // Destroy the linked blocking queue and associated mutex and event PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead) { - LC_ASSERT(queueHead->shutdown || queueHead->lifetimeSize == 0); + LC_ASSERT(queueHead->shutdown || queueHead->draining || queueHead->lifetimeSize == 0); PltDeleteMutex(&queueHead->mutex); PltCloseEvent(&queueHead->containsDataEvent); @@ -62,6 +62,11 @@ void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead) { PltSetEvent(&queueHead->containsDataEvent); } +void LbqSignalQueueDrain(PLINKED_BLOCKING_QUEUE queueHead) { + queueHead->draining = true; + PltSetEvent(&queueHead->containsDataEvent); +} + int LbqGetItemCount(PLINKED_BLOCKING_QUEUE queueHead) { return queueHead->currentSize; } @@ -69,7 +74,7 @@ int LbqGetItemCount(PLINKED_BLOCKING_QUEUE queueHead) { int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) { bool wasEmpty; - if (queueHead->shutdown) { + if (queueHead->shutdown || queueHead->draining) { return LBQ_INTERRUPTED; } @@ -120,14 +125,25 @@ int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { } if (queueHead->head == NULL) { - return LBQ_NO_ELEMENT; + if (queueHead->draining) { + return LBQ_INTERRUPTED; + } + else { + return LBQ_NO_ELEMENT; + } } PltLockMutex(&queueHead->mutex); if (queueHead->head == NULL) { - PltUnlockMutex(&queueHead->mutex); - return LBQ_NO_ELEMENT; + if (queueHead->draining) { + PltUnlockMutex(&queueHead->mutex); + return LBQ_INTERRUPTED; + } + else { + PltUnlockMutex(&queueHead->mutex); + return LBQ_NO_ELEMENT; + } } *data = queueHead->head->data; @@ -145,14 +161,25 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { } if (queueHead->head == NULL) { - return LBQ_NO_ELEMENT; + if (queueHead->draining) { + return LBQ_INTERRUPTED; + } + else { + return LBQ_NO_ELEMENT; + } } PltLockMutex(&queueHead->mutex); if (queueHead->head == NULL) { - PltUnlockMutex(&queueHead->mutex); - return LBQ_NO_ELEMENT; + if (queueHead->draining) { + PltUnlockMutex(&queueHead->mutex); + return LBQ_INTERRUPTED; + } + else { + PltUnlockMutex(&queueHead->mutex); + return LBQ_NO_ELEMENT; + } } entry = queueHead->head; @@ -178,7 +205,7 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; int err; - + if (queueHead->shutdown) { return LBQ_INTERRUPTED; } @@ -187,23 +214,33 @@ int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { // We can also avoid a syscall if the head pointer is non-NULL. // It's not safe to dereference the value due to potential memory // ordering hazards, but we can use its presence to elide the wait. - if (queueHead->head == NULL) { + // + // If we're draining, we will never wait on the queue. + if (queueHead->head == NULL && !queueHead->draining) { err = PltWaitForEvent(&queueHead->containsDataEvent); if (err != PLT_WAIT_SUCCESS) { return LBQ_INTERRUPTED; } } + PltLockMutex(&queueHead->mutex); + if (queueHead->shutdown) { + PltUnlockMutex(&queueHead->mutex); return LBQ_INTERRUPTED; } - PltLockMutex(&queueHead->mutex); - if (queueHead->head == NULL) { PltClearEvent(&queueHead->containsDataEvent); - PltUnlockMutex(&queueHead->mutex); - continue; + + if (queueHead->draining) { + PltUnlockMutex(&queueHead->mutex); + return LBQ_INTERRUPTED; + } + else { + PltUnlockMutex(&queueHead->mutex); + continue; + } } entry = queueHead->head; diff --git a/src/LinkedBlockingQueue.h b/src/LinkedBlockingQueue.h index 7df7ff3..ac3b126 100644 --- a/src/LinkedBlockingQueue.h +++ b/src/LinkedBlockingQueue.h @@ -20,6 +20,7 @@ typedef struct _LINKED_BLOCKING_QUEUE { int sizeBound; int currentSize; bool shutdown; + bool draining; int lifetimeSize; PLINKED_BLOCKING_QUEUE_ENTRY head; PLINKED_BLOCKING_QUEUE_ENTRY tail; @@ -33,4 +34,5 @@ int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data); PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead); PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead); void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead); +void LbqSignalQueueDrain(PLINKED_BLOCKING_QUEUE queueHead); int LbqGetItemCount(PLINKED_BLOCKING_QUEUE queueHead);