Rewrite input batching for mouse and gamepad sensors

The old method was too inflexible (depending on consecutive events to batch) that it couldn't
really handle stressful cases like high polling rate mice combined with multiple gamepads
reporting motion sensor events.
This commit is contained in:
Cameron Gutman 2023-10-16 21:43:05 -05:00
parent 8d30079033
commit 615b5e2bba

View File

@ -18,8 +18,26 @@ static float absCurrentPosY;
// Limited by number of bits in activeGamepadMask // Limited by number of bits in activeGamepadMask
#define MAX_GAMEPADS 16 #define MAX_GAMEPADS 16
// Accelerometer and gyro
#define MAX_MOTION_EVENTS 2
static uint8_t currentPenButtonState; static uint8_t currentPenButtonState;
static PLT_MUTEX batchedInputMutex;
static struct {
float x, y, z;
bool dirty; // Update ready to send (queued packet holder in packetQueue)
} currentGamepadSensorState[MAX_GAMEPADS][MAX_MOTION_EVENTS];
static struct {
int deltaX, deltaY;
bool dirty; // Update ready to send (queued packet holder in packetQueue)
} currentRelativeMouseState;
static struct {
int x, y;
int width, height;
bool dirty; // Update ready to send (queued packet holder in packetQueue)
} currentAbsoluteMouseState;
#define CLAMP(val, min, max) (((val) < (min)) ? (min) : (((val) > (max)) ? (max) : (val))) #define CLAMP(val, min, max) (((val) < (min)) ? (min) : (((val) > (max)) ? (max) : (val)))
#define MAX_INPUT_PACKET_SIZE 128 #define MAX_INPUT_PACKET_SIZE 128
@ -100,6 +118,12 @@ int initializeInputStream(void) {
// Start with the virtual mouse centered // Start with the virtual mouse centered
absCurrentPosX = absCurrentPosY = 0.5f; absCurrentPosX = absCurrentPosY = 0.5f;
memset(currentGamepadSensorState, 0, sizeof(currentGamepadSensorState));
memset(&currentRelativeMouseState, 0, sizeof(currentRelativeMouseState));
memset(&currentAbsoluteMouseState, 0, sizeof(currentAbsoluteMouseState));
PltCreateMutex(&batchedInputMutex);
return 0; return 0;
} }
@ -130,6 +154,8 @@ void destroyInputStream(void) {
entry = nextEntry; entry = nextEntry;
} }
PltDeleteMutex(&batchedInputMutex);
} }
static int encryptData(unsigned char* plaintext, int plaintextLen, static int encryptData(unsigned char* plaintext, int plaintextLen,
@ -281,6 +307,19 @@ static bool sendInputPacket(PPACKET_HOLDER holder, bool moreData) {
return true; return true;
} }
static void floatToNetfloat(float in, netfloat out) {
if (IS_LITTLE_ENDIAN()) {
memcpy(out, &in, sizeof(in));
}
else {
uint8_t* inb = (uint8_t*)&in;
out[0] = inb[3];
out[1] = inb[2];
out[2] = inb[1];
out[3] = inb[0];
}
}
// Input thread proc // Input thread proc
static void inputSendThreadProc(void* context) { static void inputSendThreadProc(void* context) {
SOCK_RET err; SOCK_RET err;
@ -370,9 +409,6 @@ static void inputSendThreadProc(void* context) {
} }
// If it's a relative mouse move packet, we can also do batching // If it's a relative mouse move packet, we can also do batching
else if (holder->packet.header.magic == relMouseMagicLE) { else if (holder->packet.header.magic == relMouseMagicLE) {
PPACKET_HOLDER mouseBatchHolder;
int totalDeltaX = (short)BE16(holder->packet.mouseMoveRel.deltaX);
int totalDeltaY = (short)BE16(holder->packet.mouseMoveRel.deltaY);
uint64_t now = PltGetMillis(); uint64_t now = PltGetMillis();
// Delay for batching if required // Delay for batching if required
@ -382,49 +418,65 @@ static void inputSendThreadProc(void* context) {
now = PltGetMillis(); now = PltGetMillis();
} }
for (;;) { PltLockMutex(&batchedInputMutex);
int partialDeltaX;
int partialDeltaY;
// Peek at the next packet // Send as many packets as it takes to get the entire delta through
if (LbqPeekQueueElement(&packetQueue, (void**)&mouseBatchHolder) != LBQ_SUCCESS) { while (currentRelativeMouseState.deltaX != 0 || currentRelativeMouseState.deltaY != 0) {
break; bool more = false;
if (currentRelativeMouseState.deltaX < INT16_MIN) {
holder->packet.mouseMoveRel.deltaX = BE16(INT16_MIN);
currentRelativeMouseState.deltaX -= INT16_MIN;
more = true;
}
else if (currentRelativeMouseState.deltaX > INT16_MAX) {
holder->packet.mouseMoveRel.deltaX = BE16(INT16_MAX);
currentRelativeMouseState.deltaX -= INT16_MAX;
more = true;
}
else {
holder->packet.mouseMoveRel.deltaX = BE16(currentRelativeMouseState.deltaX);
currentRelativeMouseState.deltaX = 0;
} }
// If it's not a mouse move packet, we're done if (currentRelativeMouseState.deltaY < INT16_MIN) {
if (mouseBatchHolder->packet.header.magic != relMouseMagicLE) { holder->packet.mouseMoveRel.deltaY = BE16(INT16_MIN);
break; currentRelativeMouseState.deltaY -= INT16_MIN;
more = true;
}
else if (currentRelativeMouseState.deltaY > INT16_MAX) {
holder->packet.mouseMoveRel.deltaY = BE16(INT16_MAX);
currentRelativeMouseState.deltaY -= INT16_MAX;
more = true;
}
else {
holder->packet.mouseMoveRel.deltaY = BE16(currentRelativeMouseState.deltaY);
currentRelativeMouseState.deltaY = 0;
} }
partialDeltaX = (short)BE16(mouseBatchHolder->packet.mouseMoveRel.deltaX); // Don't hold the batching lock while we're doing network I/O
partialDeltaY = (short)BE16(mouseBatchHolder->packet.mouseMoveRel.deltaY); PltUnlockMutex(&batchedInputMutex);
// Check for overflow // Encrypt and send the split packet
if (partialDeltaX + totalDeltaX > INT16_MAX || if (!sendInputPacket(holder, more)) {
partialDeltaX + totalDeltaX < INT16_MIN || freePacketHolder(holder);
partialDeltaY + totalDeltaY > INT16_MAX || return;
partialDeltaY + totalDeltaY < INT16_MIN) {
// Total delta would overflow our 16-bit short
break;
} }
// Remove the batchable mouse move packet PltLockMutex(&batchedInputMutex);
if (LbqPollQueueElement(&packetQueue, (void**)&mouseBatchHolder) != LBQ_SUCCESS) {
break;
}
totalDeltaX += partialDeltaX;
totalDeltaY += partialDeltaY;
// Free the batched packet holder
freePacketHolder(mouseBatchHolder);
} }
// The state change is no longer pending
currentRelativeMouseState.dirty = false;
PltUnlockMutex(&batchedInputMutex);
lastMousePacketTime = now; lastMousePacketTime = now;
// Update the original packet // We sent everything we needed in the loop above, so we can just free the
holder->packet.mouseMoveRel.deltaX = BE16((short)totalDeltaX); // holder of the original packet and wait for another input event.
holder->packet.mouseMoveRel.deltaY = BE16((short)totalDeltaY); freePacketHolder(holder);
continue;
} }
// If it's an absolute mouse move packet, we should only send the latest // If it's an absolute mouse move packet, we should only send the latest
else if (holder->packet.header.magic == LE32(MOUSE_MOVE_ABS_MAGIC)) { else if (holder->packet.header.magic == LE32(MOUSE_MOVE_ABS_MAGIC)) {
@ -437,28 +489,24 @@ static void inputSendThreadProc(void* context) {
now = PltGetMillis(); now = PltGetMillis();
} }
for (;;) { PltLockMutex(&batchedInputMutex);
PPACKET_HOLDER mouseBatchHolder;
// Peek at the next packet // Populate the packet with the latest state
if (LbqPeekQueueElement(&packetQueue, (void**)&mouseBatchHolder) != LBQ_SUCCESS) { holder->packet.mouseMoveAbs.x = BE16(currentAbsoluteMouseState.x);
break; holder->packet.mouseMoveAbs.y = BE16(currentAbsoluteMouseState.y);
}
// If it's not a mouse position packet, we're done // There appears to be a rounding error in GFE's scaling calculation which prevents
if (mouseBatchHolder->packet.header.magic != LE32(MOUSE_MOVE_ABS_MAGIC)) { // the cursor from reaching the far edge of the screen when streaming at smaller
break; // resolutions with a higher desktop resolution (like streaming 720p with a desktop
} // resolution of 1080p, or streaming 720p/1080p with a desktop resolution of 4K).
// Subtracting one from the reference dimensions seems to work around this issue.
holder->packet.mouseMoveAbs.width = BE16(currentAbsoluteMouseState.width - 1);
holder->packet.mouseMoveAbs.height = BE16(currentAbsoluteMouseState.height - 1);
// Remove the mouse position packet // The state change is no longer pending
if (LbqPollQueueElement(&packetQueue, (void**)&mouseBatchHolder) != LBQ_SUCCESS) { currentAbsoluteMouseState.dirty = false;
break;
}
// Replace the current packet with the new one PltUnlockMutex(&batchedInputMutex);
freePacketHolder(holder);
holder = mouseBatchHolder;
}
lastMousePacketTime = now; lastMousePacketTime = now;
} }
@ -515,34 +563,33 @@ static void inputSendThreadProc(void* context) {
now = PltGetMillis(); now = PltGetMillis();
} }
for (;;) { PltLockMutex(&batchedInputMutex);
PPACKET_HOLDER motionBatchHolder;
// Peek at the next packet // LI_MOTION_TYPE_* values are 1-based, so we have to subtract 1 to index into our state array
if (LbqPeekQueueElement(&packetQueue, (void**)&motionBatchHolder) != LBQ_SUCCESS) { float x = currentGamepadSensorState[holder->packet.controllerMotion.controllerNumber][holder->packet.controllerMotion.motionType - 1].x;
break; float y = currentGamepadSensorState[holder->packet.controllerMotion.controllerNumber][holder->packet.controllerMotion.motionType - 1].y;
} float z = currentGamepadSensorState[holder->packet.controllerMotion.controllerNumber][holder->packet.controllerMotion.motionType - 1].z;
// If it's not a motion packet, we're done // Motion events are so rapid that we can just drop any events that are lost in transit,
if (motionBatchHolder->packet.header.magic != LE32(SS_CONTROLLER_MOTION_MAGIC)) { // but we will treat (0, 0, 0) as a special value for gyro events to allow clients to
break; // reliably set the gyro to a null state when sensor events are halted due to focus loss
} // or similar client-side constraints.
if (holder->packet.controllerMotion.motionType == LI_MOTION_TYPE_GYRO && x == 0.0f && y == 0.0f && z == 0.0f) {
// If the controller or sensor type is different, we cannot batch holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE;
if (holder->packet.controllerMotion.motionType != motionBatchHolder->packet.controllerMotion.motionType ||
holder->packet.controllerMotion.controllerNumber != motionBatchHolder->packet.controllerMotion.controllerNumber) {
break;
}
// Remove the next packet
if (LbqPollQueueElement(&packetQueue, (void**)&motionBatchHolder) != LBQ_SUCCESS) {
break;
}
// Replace the current packet with the new one
freePacketHolder(holder);
holder = motionBatchHolder;
} }
else {
holder->enetPacketFlags = 0;
}
// Populate the packet with the latest state
floatToNetfloat(x, holder->packet.controllerMotion.x);
floatToNetfloat(y, holder->packet.controllerMotion.y);
floatToNetfloat(z, holder->packet.controllerMotion.z);
// The state change is no longer pending
currentGamepadSensorState[holder->packet.controllerMotion.controllerNumber][holder->packet.controllerMotion.motionType - 1].dirty = false;
PltUnlockMutex(&batchedInputMutex);
lastMotionPacketTime = now; lastMotionPacketTime = now;
} }
@ -707,19 +754,6 @@ int stopInputStream(void) {
return 0; return 0;
} }
void floatToNetfloat(float in, netfloat out) {
if (IS_LITTLE_ENDIAN()) {
memcpy(out, &in, sizeof(in));
}
else {
uint8_t* inb = (uint8_t*)&in;
out[0] = inb[3];
out[1] = inb[2];
out[2] = inb[1];
out[3] = inb[0];
}
}
// Send a mouse move event to the streaming machine // Send a mouse move event to the streaming machine
int LiSendMouseMoveEvent(short deltaX, short deltaY) { int LiSendMouseMoveEvent(short deltaX, short deltaY) {
PPACKET_HOLDER holder; PPACKET_HOLDER holder;
@ -733,33 +767,52 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) {
return 0; return 0;
} }
holder = allocatePacketHolder(0); PltLockMutex(&batchedInputMutex);
if (holder == NULL) {
return -1;
}
holder->channelId = CTRL_CHANNEL_MOUSE; // Combine the previous deltas with the new one
currentRelativeMouseState.deltaX += deltaX;
currentRelativeMouseState.deltaY += deltaY;
// TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread // Queue a packet holder if this is the only pending relative mouse event
// and protocol updates to allow us to determine which unreliable messages were dropped. if (!currentRelativeMouseState.dirty) {
holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE; holder = allocatePacketHolder(0);
if (holder == NULL) {
PltUnlockMutex(&batchedInputMutex);
return -1;
}
holder->packet.mouseMoveRel.header.size = BE32(sizeof(NV_REL_MOUSE_MOVE_PACKET) - sizeof(uint32_t)); holder->channelId = CTRL_CHANNEL_MOUSE;
if (AppVersionQuad[0] >= 5) {
holder->packet.mouseMoveRel.header.magic = LE32(MOUSE_MOVE_REL_MAGIC_GEN5); // TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread
// and protocol updates to allow us to determine which unreliable messages were dropped.
holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE;
holder->packet.mouseMoveRel.header.size = BE32(sizeof(NV_REL_MOUSE_MOVE_PACKET) - sizeof(uint32_t));
if (AppVersionQuad[0] >= 5) {
holder->packet.mouseMoveRel.header.magic = LE32(MOUSE_MOVE_REL_MAGIC_GEN5);
}
else {
holder->packet.mouseMoveRel.header.magic = LE32(MOUSE_MOVE_REL_MAGIC);
}
// Remaining fields are set in the input thread based on the latest currentRelativeMouseState values
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err == LBQ_SUCCESS) {
currentRelativeMouseState.dirty = true;
}
else {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
} }
else { else {
holder->packet.mouseMoveRel.header.magic = LE32(MOUSE_MOVE_REL_MAGIC); // There's already a packet holder queued to send this event
err = 0;
} }
holder->packet.mouseMoveRel.deltaX = BE16(deltaX);
holder->packet.mouseMoveRel.deltaY = BE16(deltaY);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry); PltUnlockMutex(&batchedInputMutex);
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
return err; return err;
} }
@ -773,36 +826,49 @@ int LiSendMousePositionEvent(short x, short y, short referenceWidth, short refer
return -2; return -2;
} }
holder = allocatePacketHolder(0); PltLockMutex(&batchedInputMutex);
if (holder == NULL) {
return -1; // Overwrite the previous mouse location with the new one
currentAbsoluteMouseState.x = x;
currentAbsoluteMouseState.y = y;
currentAbsoluteMouseState.width = referenceWidth;
currentAbsoluteMouseState.height = referenceHeight;
// Queue a packet holder if this is the only pending absolute mouse event
if (!currentAbsoluteMouseState.dirty) {
holder = allocatePacketHolder(0);
if (holder == NULL) {
PltUnlockMutex(&batchedInputMutex);
return -1;
}
holder->channelId = CTRL_CHANNEL_MOUSE;
// TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread
holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE;
holder->packet.mouseMoveAbs.header.size = BE32(sizeof(NV_ABS_MOUSE_MOVE_PACKET) - sizeof(uint32_t));
holder->packet.mouseMoveAbs.header.magic = LE32(MOUSE_MOVE_ABS_MAGIC);
holder->packet.mouseMoveAbs.unused = 0;
// Remaining fields are set in the input thread based on the latest currentAbsoluteMouseState values
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err == LBQ_SUCCESS) {
currentAbsoluteMouseState.dirty = true;
}
else {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
}
else {
// There's already a packet holder queued to send this event
err = 0;
} }
holder->channelId = CTRL_CHANNEL_MOUSE; PltUnlockMutex(&batchedInputMutex);
// TODO: Send this as unreliable sequenced when we have a delayed reliable retransmission thread
holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE;
holder->packet.mouseMoveAbs.header.size = BE32(sizeof(NV_ABS_MOUSE_MOVE_PACKET) - sizeof(uint32_t));
holder->packet.mouseMoveAbs.header.magic = LE32(MOUSE_MOVE_ABS_MAGIC);
holder->packet.mouseMoveAbs.x = BE16(x);
holder->packet.mouseMoveAbs.y = BE16(y);
holder->packet.mouseMoveAbs.unused = 0;
// There appears to be a rounding error in GFE's scaling calculation which prevents
// the cursor from reaching the far edge of the screen when streaming at smaller
// resolutions with a higher desktop resolution (like streaming 720p with a desktop
// resolution of 1080p, or streaming 720p/1080p with a desktop resolution of 4K).
// Subtracting one from the reference dimensions seems to work around this issue.
holder->packet.mouseMoveAbs.width = BE16(referenceWidth - 1);
holder->packet.mouseMoveAbs.height = BE16(referenceHeight - 1);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
// This is not thread safe, but it's not a big deal because callers that want to // This is not thread safe, but it's not a big deal because callers that want to
// use LiSendRelativeMotionAsMousePositionEvent() must not mix these function // use LiSendRelativeMotionAsMousePositionEvent() must not mix these function
@ -1439,55 +1505,61 @@ int LiSendControllerMotionEvent(uint8_t controllerNumber, uint8_t motionType, fl
return -2; return -2;
} }
// Check for valid motion type values
if (motionType - 1 >= MAX_MOTION_EVENTS) {
LC_ASSERT(motionType - 1 < MAX_MOTION_EVENTS);
return -3;
}
// This is a protocol extension only supported with Sunshine // This is a protocol extension only supported with Sunshine
if (!(SunshineFeatureFlags & LI_FF_CONTROLLER_TOUCH_EVENTS)) { if (!(SunshineFeatureFlags & LI_FF_CONTROLLER_TOUCH_EVENTS)) {
return LI_ERR_UNSUPPORTED; return LI_ERR_UNSUPPORTED;
} }
// Since these events can be sent incredibly frequently, let's avoid queuing more if
// we already have more than half of our max input queue full of other events.
if (LbqGetItemCount(&packetQueue) > MAX_QUEUED_INPUT_PACKETS / 2) {
Limelog("Dropping motion event due to high input queue length\n");
return -1;
}
// Sunshine supports up to 16 controllers // Sunshine supports up to 16 controllers
controllerNumber %= MAX_GAMEPADS; controllerNumber %= MAX_GAMEPADS;
holder = allocatePacketHolder(0); PltLockMutex(&batchedInputMutex);
if (holder == NULL) {
return -1;
}
// Send each controller on a separate channel specific to motion sensors currentGamepadSensorState[controllerNumber][motionType - 1].x = x;
holder->channelId = CTRL_CHANNEL_SENSOR_BASE + controllerNumber; currentGamepadSensorState[controllerNumber][motionType - 1].y = y;
currentGamepadSensorState[controllerNumber][motionType - 1].z = z;
// Motion events are so rapid that we can just drop any events that are lost in transit, // Queue a packet holder if this is the only pending sensor event
// but we will treat (0, 0, 0) as a special value for gyro events to allow clients to if (!currentGamepadSensorState[controllerNumber][motionType - 1].dirty) {
// reliably set the gyro to a null state when sensor events are halted due to focus loss holder = allocatePacketHolder(0);
// or similar client-side constraints. if (holder == NULL) {
if (motionType == LI_MOTION_TYPE_GYRO && x == 0.0f && y == 0.0f && z == 0.0f) { PltUnlockMutex(&batchedInputMutex);
holder->enetPacketFlags = ENET_PACKET_FLAG_RELIABLE; return -1;
}
// Send each controller on a separate channel specific to motion sensors
holder->channelId = CTRL_CHANNEL_SENSOR_BASE + controllerNumber;
holder->packet.controllerMotion.header.size = BE32(sizeof(SS_CONTROLLER_MOTION_PACKET) - sizeof(uint32_t));
holder->packet.controllerMotion.header.magic = LE32(SS_CONTROLLER_MOTION_MAGIC);
holder->packet.controllerMotion.controllerNumber = controllerNumber;
holder->packet.controllerMotion.motionType = motionType;
memset(holder->packet.controllerMotion.zero, 0, sizeof(holder->packet.controllerMotion.zero));
// Remaining fields are set in the input thread based on the latest currentGamepadSensorState values
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err == LBQ_SUCCESS) {
currentGamepadSensorState[controllerNumber][motionType - 1].dirty = true;
}
else {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
} }
else { else {
holder->enetPacketFlags = 0; // There's already a packet holder queued to send this event
err = 0;
} }
holder->packet.controllerMotion.header.size = BE32(sizeof(SS_CONTROLLER_MOTION_PACKET) - sizeof(uint32_t)); PltUnlockMutex(&batchedInputMutex);
holder->packet.controllerMotion.header.magic = LE32(SS_CONTROLLER_MOTION_MAGIC);
holder->packet.controllerMotion.controllerNumber = controllerNumber;
holder->packet.controllerMotion.motionType = motionType;
memset(holder->packet.controllerMotion.zero, 0, sizeof(holder->packet.controllerMotion.zero));
floatToNetfloat(x, holder->packet.controllerMotion.x);
floatToNetfloat(y, holder->packet.controllerMotion.y);
floatToNetfloat(z, holder->packet.controllerMotion.z);
err = LbqOfferQueueItem(&packetQueue, holder, &holder->entry);
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
freePacketHolder(holder);
}
return err; return err;
} }