From fb9aab0e573cb7489139f8b16fc964b7fd9a8b4d Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Wed, 9 Jun 2021 19:59:52 -0500 Subject: [PATCH] Reimplement LBQ on condition variables --- src/LinkedBlockingQueue.c | 142 +++++++++++++++----------------------- src/LinkedBlockingQueue.h | 2 +- 2 files changed, 58 insertions(+), 86 deletions(-) diff --git a/src/LinkedBlockingQueue.c b/src/LinkedBlockingQueue.c index 3d96855..1551425 100644 --- a/src/LinkedBlockingQueue.c +++ b/src/LinkedBlockingQueue.c @@ -5,7 +5,7 @@ PLINKED_BLOCKING_QUEUE_ENTRY LbqDestroyLinkedBlockingQueue(PLINKED_BLOCKING_QUEU LC_ASSERT(queueHead->shutdown || queueHead->draining || queueHead->lifetimeSize == 0); PltDeleteMutex(&queueHead->mutex); - PltCloseEvent(&queueHead->containsDataEvent); + PltDeleteConditionVariable(&queueHead->cond); return queueHead->head; } @@ -24,7 +24,6 @@ PLINKED_BLOCKING_QUEUE_ENTRY LbqFlushQueueItems(PLINKED_BLOCKING_QUEUE queueHead queueHead->head = NULL; queueHead->tail = NULL; queueHead->currentSize = 0; - PltClearEvent(&queueHead->containsDataEvent); } else { LC_ASSERT(queueHead->tail == NULL); @@ -42,13 +41,14 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB memset(queueHead, 0, sizeof(*queueHead)); - err = PltCreateEvent(&queueHead->containsDataEvent); + err = PltCreateMutex(&queueHead->mutex); if (err != 0) { return err; } - err = PltCreateMutex(&queueHead->mutex); + err = PltCreateConditionVariable(&queueHead->cond, &queueHead->mutex); if (err != 0) { + PltDeleteMutex(&queueHead->mutex); return err; } @@ -58,13 +58,17 @@ int LbqInitializeLinkedBlockingQueue(PLINKED_BLOCKING_QUEUE queueHead, int sizeB } void LbqSignalQueueShutdown(PLINKED_BLOCKING_QUEUE queueHead) { + PltLockMutex(&queueHead->mutex); queueHead->shutdown = true; - PltSetEvent(&queueHead->containsDataEvent); + PltUnlockMutex(&queueHead->mutex); + PltSignalConditionVariable(&queueHead->cond); } void LbqSignalQueueDrain(PLINKED_BLOCKING_QUEUE queueHead) { + PltLockMutex(&queueHead->mutex); queueHead->draining = true; - PltSetEvent(&queueHead->containsDataEvent); + PltUnlockMutex(&queueHead->mutex); + PltSignalConditionVariable(&queueHead->cond); } int LbqGetItemCount(PLINKED_BLOCKING_QUEUE queueHead) { @@ -73,16 +77,17 @@ int LbqGetItemCount(PLINKED_BLOCKING_QUEUE queueHead) { int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOCKING_QUEUE_ENTRY entry) { bool wasEmpty; - - if (queueHead->shutdown || queueHead->draining) { - return LBQ_INTERRUPTED; - } entry->flink = NULL; entry->data = data; PltLockMutex(&queueHead->mutex); + if (queueHead->shutdown || queueHead->draining) { + PltUnlockMutex(&queueHead->mutex); + return LBQ_INTERRUPTED; + } + if (queueHead->currentSize == queueHead->sizeBound) { PltUnlockMutex(&queueHead->mutex); return LBQ_BOUND_EXCEEDED; @@ -110,9 +115,9 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOC PltUnlockMutex(&queueHead->mutex); if (wasEmpty) { - // Only call PltSetEvent() when transitioning from empty -> non-empty - // to avoid a useless syscall for each additional entry. - PltSetEvent(&queueHead->containsDataEvent); + // Only call PltSignalConditionVariable() when transitioning from + // empty -> non-empty to avoid a useless syscall for each new entry. + PltSignalConditionVariable(&queueHead->cond); } return LBQ_SUCCESS; @@ -120,20 +125,12 @@ int LbqOfferQueueItem(PLINKED_BLOCKING_QUEUE queueHead, void* data, PLINKED_BLOC // This must be synchronized with LbqFlushQueueItems by the caller int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { + PltLockMutex(&queueHead->mutex); + if (queueHead->shutdown) { + PltUnlockMutex(&queueHead->mutex); return LBQ_INTERRUPTED; } - - if (queueHead->head == NULL) { - if (queueHead->draining) { - return LBQ_INTERRUPTED; - } - else { - return LBQ_NO_ELEMENT; - } - } - - PltLockMutex(&queueHead->mutex); if (queueHead->head == NULL) { if (queueHead->draining) { @@ -155,22 +152,14 @@ int LbqPeekQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; - - if (queueHead->shutdown) { - return LBQ_INTERRUPTED; - } - - if (queueHead->head == NULL) { - if (queueHead->draining) { - return LBQ_INTERRUPTED; - } - else { - return LBQ_NO_ELEMENT; - } - } PltLockMutex(&queueHead->mutex); + if (queueHead->shutdown) { + PltUnlockMutex(&queueHead->mutex); + return LBQ_INTERRUPTED; + } + if (queueHead->head == NULL) { if (queueHead->draining) { PltUnlockMutex(&queueHead->mutex); @@ -188,7 +177,6 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { if (queueHead->head == NULL) { LC_ASSERT(queueHead->currentSize == 0); queueHead->tail = NULL; - PltClearEvent(&queueHead->containsDataEvent); } else { LC_ASSERT(queueHead->currentSize != 0); @@ -205,59 +193,43 @@ int LbqPollQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { int LbqWaitForQueueElement(PLINKED_BLOCKING_QUEUE queueHead, void** data) { PLINKED_BLOCKING_QUEUE_ENTRY entry; + PltLockMutex(&queueHead->mutex); + + // Wait for a waking condition: either data available or rundown + while (queueHead->head == NULL && !queueHead->draining && !queueHead->shutdown) { + PltWaitForConditionVariable(&queueHead->cond, &queueHead->mutex); + } + + // If we're shutting down, abort immediately, even if there's data available if (queueHead->shutdown) { + PltUnlockMutex(&queueHead->mutex); return LBQ_INTERRUPTED; } - for (;;) { - // We can also avoid a syscall if the head pointer is non-NULL. - // It's not safe to dereference the value due to potential memory - // ordering hazards, but we can use its presence to elide the wait. - // - // If we're draining, we will never wait on the queue. - if (queueHead->head == NULL && !queueHead->draining) { - PltWaitForEvent(&queueHead->containsDataEvent); - } - - PltLockMutex(&queueHead->mutex); - - if (queueHead->shutdown) { - PltUnlockMutex(&queueHead->mutex); - return LBQ_INTERRUPTED; - } - - if (queueHead->head == NULL) { - PltClearEvent(&queueHead->containsDataEvent); - - if (queueHead->draining) { - PltUnlockMutex(&queueHead->mutex); - return LBQ_INTERRUPTED; - } - else { - PltUnlockMutex(&queueHead->mutex); - continue; - } - } - - entry = queueHead->head; - queueHead->head = entry->flink; - queueHead->currentSize--; - if (queueHead->head == NULL) { - LC_ASSERT(queueHead->currentSize == 0); - queueHead->tail = NULL; - PltClearEvent(&queueHead->containsDataEvent); - } - else { - LC_ASSERT(queueHead->currentSize != 0); - queueHead->head->blink = NULL; - } - - *data = entry->data; - + // If we're draining, only abort if we have no data available + if (queueHead->draining && queueHead->head == NULL) { PltUnlockMutex(&queueHead->mutex); - - break; + return LBQ_INTERRUPTED; } + // We should have bailed by this point if there was no data + LC_ASSERT(queueHead->head != NULL); + + entry = queueHead->head; + queueHead->head = entry->flink; + queueHead->currentSize--; + if (queueHead->head == NULL) { + LC_ASSERT(queueHead->currentSize == 0); + queueHead->tail = NULL; + } + else { + LC_ASSERT(queueHead->currentSize != 0); + queueHead->head->blink = NULL; + } + + *data = entry->data; + + PltUnlockMutex(&queueHead->mutex); + return LBQ_SUCCESS; } diff --git a/src/LinkedBlockingQueue.h b/src/LinkedBlockingQueue.h index 1836325..5f58802 100644 --- a/src/LinkedBlockingQueue.h +++ b/src/LinkedBlockingQueue.h @@ -16,7 +16,7 @@ typedef struct _LINKED_BLOCKING_QUEUE_ENTRY { typedef struct _LINKED_BLOCKING_QUEUE { PLT_MUTEX mutex; - PLT_EVENT containsDataEvent; + PLT_COND cond; PLINKED_BLOCKING_QUEUE_ENTRY head; PLINKED_BLOCKING_QUEUE_ENTRY tail; int sizeBound;