From e59a5f5b5384b11b5de063fc4dc6e17307f9eedc Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sun, 9 Nov 2025 15:46:36 -0600 Subject: [PATCH] Rewrite gamepad input batching to batch on the enqueue-side - Allows successful batching of interleaved input from multiple controllers - Avoids enqueue/dequeue overhead and memory allocation when batching --- src/InputStream.c | 188 +++++++++++++++++++++++++--------------------- 1 file changed, 101 insertions(+), 87 deletions(-) diff --git a/src/InputStream.c b/src/InputStream.c index d9e97ce..06913ff 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -23,21 +23,6 @@ static float absCurrentPosY; static uint8_t currentPenButtonState; -static PLT_MUTEX batchedInputMutex; -static struct { - float x, y, z; - bool dirty; // Update ready to send (queued packet holder in packetQueue) -} currentGamepadSensorState[MAX_GAMEPADS][MAX_MOTION_EVENTS]; -static struct { - int deltaX, deltaY; - bool dirty; // Update ready to send (queued packet holder in packetQueue) -} currentRelativeMouseState; -static struct { - int x, y; - int width, height; - bool dirty; // Update ready to send (queued packet holder in packetQueue) -} currentAbsoluteMouseState; - #define CLAMP(val, min, max) (((val) < (min)) ? (min) : (((val) > (max)) ? (max) : (val))) #define MAX_INPUT_PACKET_SIZE 128 @@ -51,11 +36,10 @@ static struct { // Matches Win32 WHEEL_DELTA definition #define LI_WHEEL_DELTA 120 -// If we try to send more than one gamepad or mouse motion event +// If we try to send more than one stylus or mouse motion event // per millisecond, we'll wait a little bit to try to batch with // the next one. This batching wait paradoxically _decreases_ // effective input latency by avoiding packet queuing in ENet. -#define CONTROLLER_BATCHING_INTERVAL_MS 1 #define MOUSE_BATCHING_INTERVAL_MS 1 #define PEN_BATCHING_INTERVAL_MS 1 @@ -92,6 +76,22 @@ typedef struct _PACKET_HOLDER { } packet; } PACKET_HOLDER, *PPACKET_HOLDER; +static PLT_MUTEX batchedInputMutex; +static PPACKET_HOLDER currentQueuedControllerPacket[MAX_GAMEPADS]; +static struct { + float x, y, z; + bool dirty; // Update ready to send (queued packet holder in packetQueue) +} currentGamepadSensorState[MAX_GAMEPADS][MAX_MOTION_EVENTS]; +static struct { + int deltaX, deltaY; + bool dirty; // Update ready to send (queued packet holder in packetQueue) +} currentRelativeMouseState; +static struct { + int x, y; + int width, height; + bool dirty; // Update ready to send (queued packet holder in packetQueue) +} currentAbsoluteMouseState; + // Initializes the input stream int initializeInputStream(void) { memcpy(currentAesIv, StreamConfig.remoteInputAesIv, sizeof(currentAesIv)); @@ -335,7 +335,6 @@ static void inputSendThreadProc(void* context) { relMouseMagicLE = LE32(MOUSE_MOVE_REL_MAGIC); } - uint64_t lastControllerPacketTime[MAX_GAMEPADS] = { 0 }; uint64_t lastMousePacketTime = 0; uint64_t lastPenPacketTime = 0; @@ -345,70 +344,25 @@ static void inputSendThreadProc(void* context) { return; } - // If it's a multi-controller packet we can do batching + // If it's a multi-controller packet, latch it by clearing currentQueuedControllerPacket. + // This will prevent another thread from batching additional data into it while we're + // trying to send it. if (holder->packet.header.magic == multiControllerMagicLE) { - PPACKET_HOLDER controllerBatchHolder; - PNV_MULTI_CONTROLLER_PACKET origPkt; short controllerNumber = LE16(holder->packet.multiController.controllerNumber); - uint64_t now = PltGetMillis(); - LC_ASSERT(controllerNumber < MAX_GAMEPADS); + PltLockMutex(&batchedInputMutex); - // Delay for batching if required - if (now < lastControllerPacketTime[controllerNumber] + CONTROLLER_BATCHING_INTERVAL_MS) { - flushInputOnControlStream(); - PltSleepMs((int)(lastControllerPacketTime[controllerNumber] + CONTROLLER_BATCHING_INTERVAL_MS - now)); - now = PltGetMillis(); + // It's possible that the enqueuing code already moved on to batching into a new + // packet because something (like a button change) forced it to end the batch. + // We only need to stop batching into the current packet we're sending here, so + // it's fine if the input code continues to update a later packet concurrently. + if (holder == currentQueuedControllerPacket[controllerNumber]) { + currentQueuedControllerPacket[controllerNumber] = NULL; } - origPkt = &holder->packet.multiController; - for (;;) { - PNV_MULTI_CONTROLLER_PACKET newPkt; - - // Peek at the next packet - if (LbqPeekQueueElement(&packetQueue, (void**)&controllerBatchHolder) != LBQ_SUCCESS) { - break; - } - - // If it's not a controller packet, we're done - if (controllerBatchHolder->packet.header.magic != multiControllerMagicLE) { - break; - } - - // Check if it's able to be batched - // NB: GFE does some discarding of gamepad packets received very soon after another. - // Thus, this batching is needed for correctness in some cases, as GFE will inexplicably - // drop *newer* packets in that scenario. The brokenness can be tested with consecutive - // calls to LiSendMultiControllerEvent() with different values for analog sticks (max -> zero). - newPkt = &controllerBatchHolder->packet.multiController; - if (newPkt->buttonFlags != origPkt->buttonFlags || - newPkt->buttonFlags2 != origPkt->buttonFlags2 || - newPkt->controllerNumber != origPkt->controllerNumber || - newPkt->activeGamepadMask != origPkt->activeGamepadMask) { - // Batching not allowed - break; - } - - // Remove the batchable controller packet - if (LbqPollQueueElement(&packetQueue, (void**)&controllerBatchHolder) != LBQ_SUCCESS) { - break; - } - - // Update the original packet - origPkt->leftTrigger = newPkt->leftTrigger; - origPkt->rightTrigger = newPkt->rightTrigger; - origPkt->leftStickX = newPkt->leftStickX; - origPkt->leftStickY = newPkt->leftStickY; - origPkt->rightStickX = newPkt->rightStickX; - origPkt->rightStickY = newPkt->rightStickY; - - // Free the batched packet holder - freePacketHolder(controllerBatchHolder); - } - - lastControllerPacketTime[controllerNumber] = now; + PltUnlockMutex(&batchedInputMutex); } - // If it's a relative mouse move packet, we can also do batching + // If it's a relative mouse move packet, we can do batching else if (holder->packet.header.magic == relMouseMagicLE) { uint64_t now = PltGetMillis(); @@ -1039,6 +993,7 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep { PPACKET_HOLDER holder; int err; + bool enqueueHolder = false; if (!initialized) { return -2; @@ -1074,16 +1029,52 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep controllerNumber %= MAX_GAMEPADS; } - holder = allocatePacketHolder(0); - if (holder == NULL) { - return -1; + // The batched input mutex protects against the enqueued packet being processed + // and freed from underneath us when we're trying to update it. + PltLockMutex(&batchedInputMutex); + + // Start with the currently enqueued controller packet (if any) + holder = currentQueuedControllerPacket[controllerNumber]; + + // Check that this current input is compatible with the current batch + if (holder) { + // We do not support batching with the legacy controller packet format + LC_ASSERT(AppVersionQuad[0] > 3); + + // If this new packet has different button flags, end the batch to ensure the + // host receives the exact axis values present at the time of the button press. + if (holder->packet.multiController.buttonFlags != LE16((short)buttonFlags) || + holder->packet.multiController.buttonFlags2 != (IS_SUNSHINE() ? LE16((short)(buttonFlags >> 16)) : 0)) { + // Pretend there wasn't a currently queued controller packet + holder = NULL; + } } - // Send each controller on a separate channel - holder->channelId = CTRL_CHANNEL_GAMEPAD_BASE + controllerNumber; + if (!holder) { + // Because we're not using the currently queued packet, it's safe + // to unlock here without having to worry about the input thread + // touching our packet holder behind our back. + PltUnlockMutex(&batchedInputMutex); - // TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread - holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE; + holder = allocatePacketHolder(0); + if (holder == NULL) { + return -1; + } + + // Send each controller on a separate channel + holder->channelId = CTRL_CHANNEL_GAMEPAD_BASE + controllerNumber; + + // TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread + holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE; + + // Remember that we need to enqueue this holder since it's new + enqueueHolder = true; + + // Reacquire the batched input mutex before making it visible to + // the input thread by storing this in the input queue or in the + // currentQueuedControllerPacket array. + PltLockMutex(&batchedInputMutex); + } if (AppVersionQuad[0] == 3) { // Generation 3 servers don't support multiple controllers so we send @@ -1127,13 +1118,36 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep holder->packet.multiController.tailA = LE16(MC_TAIL_A); holder->packet.multiController.buttonFlags2 = IS_SUNSHINE() ? LE16((short)(buttonFlags >> 16)) : 0; holder->packet.multiController.tailB = LE16(MC_TAIL_B); + + if (enqueueHolder) { + // Make this new packet holder the current enqueued packet + currentQueuedControllerPacket[controllerNumber] = holder; + } } - err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); - if (err != LBQ_SUCCESS) { - LC_ASSERT(err == LBQ_BOUND_EXCEEDED); - Limelog("Input queue reached maximum size limit\n"); - freePacketHolder(holder); + // We can unlock the batched input mutex before enqueuing the new holder because + // the input thread only cares if currentQueuedControllerPacket is equal to the + // holder it's currently processing. Since it cannot be processing the holder + // until we enqueue it, there's no risk here of it processing/freeing the + // holder when we're still touching it. + // + // Unlocking early saves a context switch in the common case where the newly + // queued packet wakes up the input thread which then immediately blocks on + // the batched input mutex until this thread runs again to release it. + PltUnlockMutex(&batchedInputMutex); + + if (enqueueHolder) { + // Enqueue the new packet holder + err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); + if (err != LBQ_SUCCESS) { + LC_ASSERT(err == LBQ_BOUND_EXCEEDED); + Limelog("Input queue reached maximum size limit\n"); + freePacketHolder(holder); + } + } + else { + // The packet holder we updated was already enqueued + err = 0; } return err;