From c1744de06938b5a5c8897a705be1bc6508dc7580 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 18 Sep 2023 23:02:44 -0500 Subject: [PATCH] Batch async control stream callbacks --- src/ControlStream.c | 82 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 74 insertions(+), 8 deletions(-) diff --git a/src/ControlStream.c b/src/ControlStream.c index 3476d71..fc6a7aa 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -819,26 +819,75 @@ static int ignoreDisconnectIntercept(ENetHost* host, ENetEvent* event) { } static void asyncCallbackThreadFunc(void* context) { - PQUEUED_ASYNC_CALLBACK queuedCb; + PQUEUED_ASYNC_CALLBACK queuedCb, nextCb; while (LbqWaitForQueueElement(&asyncCallbackQueue, (void**)&queuedCb) == LBQ_SUCCESS) { switch (queuedCb->typeIndex) { case IDX_RUMBLE_DATA: + // Look for another rumble packet to batch with + while (LbqPeekQueueElement(&asyncCallbackQueue, (void**)&nextCb) == LBQ_SUCCESS) { + // Don't batch with the next packet if it is a different type or controller number + if (nextCb->typeIndex != queuedCb->typeIndex || + nextCb->data.rumble.controllerNumber != queuedCb->data.rumble.controllerNumber) { + break; + } + + // This entry is batchable, so pop it off the queue + if (LbqPollQueueElement(&asyncCallbackQueue, (void**)&nextCb) != LBQ_SUCCESS) { + break; + } + + // Replace the old entry with the new one + free(queuedCb); + queuedCb = nextCb; + } + ListenerCallbacks.rumble(queuedCb->data.rumble.controllerNumber, queuedCb->data.rumble.lowFreqRumble, queuedCb->data.rumble.highFreqRumble); break; case IDX_RUMBLE_TRIGGER_DATA: + // Look for another rumble triggers packet to batch with + while (LbqPeekQueueElement(&asyncCallbackQueue, (void**)&nextCb) == LBQ_SUCCESS) { + // Don't batch with the next packet if it is a different type or controller number + if (nextCb->typeIndex != queuedCb->typeIndex || + nextCb->data.rumbleTriggers.controllerNumber != queuedCb->data.rumbleTriggers.controllerNumber) { + break; + } + + // This entry is batchable, so pop it off the queue + if (LbqPollQueueElement(&asyncCallbackQueue, (void**)&nextCb) != LBQ_SUCCESS) { + break; + } + + // Replace the old entry with the new one + free(queuedCb); + queuedCb = nextCb; + } + ListenerCallbacks.rumbleTriggers(queuedCb->data.rumbleTriggers.controllerNumber, queuedCb->data.rumbleTriggers.leftTriggerMotor, queuedCb->data.rumbleTriggers.rightTriggerMotor); break; - case IDX_SET_MOTION_EVENT: - ListenerCallbacks.setMotionEventState(queuedCb->data.setMotionEventState.controllerNumber, - queuedCb->data.setMotionEventState.motionType, - queuedCb->data.setMotionEventState.reportRateHz); - break; case IDX_SET_RGB_LED: + // Look for another controller LED packet to batch with + while (LbqPeekQueueElement(&asyncCallbackQueue, (void**)&nextCb) == LBQ_SUCCESS) { + // Don't batch with the next packet if it is a different type or controller number + if (nextCb->typeIndex != queuedCb->typeIndex || + nextCb->data.setControllerLed.controllerNumber != queuedCb->data.setControllerLed.controllerNumber) { + break; + } + + // This entry is batchable, so pop it off the queue + if (LbqPollQueueElement(&asyncCallbackQueue, (void**)&nextCb) != LBQ_SUCCESS) { + break; + } + + // Replace the old entry with the new one + free(queuedCb); + queuedCb = nextCb; + } + ListenerCallbacks.setControllerLED(queuedCb->data.setControllerLed.controllerNumber, queuedCb->data.setControllerLed.r, queuedCb->data.setControllerLed.g, @@ -846,10 +895,27 @@ static void asyncCallbackThreadFunc(void* context) { break; case IDX_HDR_INFO: // HDR state is maintained globally, so we just invoke the client callback here. - // If the state has changed multiple times, the client could receive duplicate - // callbacks but that is fine. + // These events are stateless, so we can consume all of them now. + while (LbqPeekQueueElement(&asyncCallbackQueue, (void**)&nextCb) == LBQ_SUCCESS && nextCb->typeIndex == queuedCb->typeIndex) { + // This entry is batchable, so pop it off the queue + if (LbqPollQueueElement(&asyncCallbackQueue, (void**)&nextCb) != LBQ_SUCCESS) { + break; + } + + // Replace the old entry with the new one + free(queuedCb); + queuedCb = nextCb; + } + ListenerCallbacks.setHdrMode(hdrEnabled); break; + + case IDX_SET_MOTION_EVENT: + // These events are infrequent and cannot be batched + ListenerCallbacks.setMotionEventState(queuedCb->data.setMotionEventState.controllerNumber, + queuedCb->data.setMotionEventState.motionType, + queuedCb->data.setMotionEventState.reportRateHz); + break; default: // Unhandled packet type from queueAsyncCallback() LC_ASSERT(false);