From ac9c4e82ac10665aeee1dc5de5739003af3cbb6b Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Sat, 12 Jun 2021 10:43:50 -0500 Subject: [PATCH] Introduce a list to cache freed input packet entries for reuse --- src/InputStream.c | 84 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 20 deletions(-) diff --git a/src/InputStream.c b/src/InputStream.c index 4601913..c9605f9 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -6,11 +6,14 @@ static bool initialized; static PPLT_CRYPTO_CONTEXT cryptoContext; static LINKED_BLOCKING_QUEUE packetQueue; +static LINKED_BLOCKING_QUEUE packetHolderFreeList; static PLT_THREAD inputSendThread; #define MAX_INPUT_PACKET_SIZE 128 #define INPUT_STREAM_TIMEOUT_SEC 10 +#define MAX_QUEUED_INPUT_PACKETS 150 + // Contains input stream packets typedef struct _PACKET_HOLDER { int packetLength; @@ -33,7 +36,8 @@ int initializeInputStream(void) { // Set a high maximum queue size limit to ensure input isn't dropped // while the input send thread is blocked for short periods. - LbqInitializeLinkedBlockingQueue(&packetQueue, 150); + LbqInitializeLinkedBlockingQueue(&packetQueue, MAX_QUEUED_INPUT_PACKETS); + LbqInitializeLinkedBlockingQueue(&packetHolderFreeList, MAX_QUEUED_INPUT_PACKETS); cryptoContext = PltCreateCryptoContext(); return 0; @@ -55,6 +59,17 @@ void destroyInputStream(void) { entry = nextEntry; } + + entry = LbqDestroyLinkedBlockingQueue(&packetHolderFreeList); + + while (entry != NULL) { + nextEntry = entry->flink; + + // The entry is stored in the data buffer + free(entry->data); + + entry = nextEntry; + } } static int encryptData(unsigned char* plaintext, int plaintextLen, @@ -92,6 +107,34 @@ static int encryptData(unsigned char* plaintext, int plaintextLen, } } +static void freePacketHolder(PPACKET_HOLDER holder) { + // Place the packet holder back into the free list + if (LbqOfferQueueItem(&packetHolderFreeList, holder, &holder->entry) != LBQ_SUCCESS) { + free(holder); + } +} + +static PPACKET_HOLDER allocatePacketHolder(void) { + PPACKET_HOLDER holder; + int err; + + // Grab an entry from the free list (if available) + err = LbqPollQueueElement(&packetHolderFreeList, (void**)&holder); + if (err == LBQ_SUCCESS) { + return holder; + } + else if (err == LBQ_INTERRUPTED) { + // We're shutting down. Don't bother allocating. + return NULL; + } + else { + LC_ASSERT(err == LBQ_NO_ELEMENT); + + // Otherwise we'll have to allocate + return malloc(sizeof(*holder)); + } +} + // Input thread proc static void inputSendThreadProc(void* context) { SOCK_RET err; @@ -154,7 +197,7 @@ static void inputSendThreadProc(void* context) { origPkt->rightStickY = newPkt->rightStickY; // Free the batched packet holder - free(controllerBatchHolder); + freePacketHolder(controllerBatchHolder); } } // If it's a relative mouse move packet, we can also do batching @@ -198,7 +241,7 @@ static void inputSendThreadProc(void* context) { totalDeltaY += partialDeltaY; // Free the batched packet holder - free(mouseBatchHolder); + freePacketHolder(mouseBatchHolder); } // Update the original packet @@ -226,7 +269,7 @@ static void inputSendThreadProc(void* context) { } // Replace the current packet with the new one - free(holder); + freePacketHolder(holder); holder = mouseBatchHolder; } } @@ -236,7 +279,7 @@ static void inputSendThreadProc(void* context) { // the encryption. if (encryptedControlStream) { err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*)&holder->packet, holder->packetLength); - free(holder); + freePacketHolder(holder); if (err < 0) { Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err); ListenerCallbacks.connectionTerminated(err); @@ -248,7 +291,7 @@ static void inputSendThreadProc(void* context) { encryptedSize = sizeof(encryptedBuffer) - 4; err = encryptData((unsigned char*)&holder->packet, holder->packetLength, (unsigned char*)&encryptedBuffer[4], (int*)&encryptedSize); - free(holder); + freePacketHolder(holder); if (err != 0) { Limelog("Input: Encryption failed: %d\n", (int)err); ListenerCallbacks.connectionTerminated(err); @@ -303,7 +346,7 @@ static int sendEnableHaptics(void) { return 0; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -317,7 +360,7 @@ static int sendEnableHaptics(void) { if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -360,6 +403,7 @@ int startInputStream(void) { int stopInputStream(void) { // No more packets should be queued now initialized = false; + LbqSignalQueueShutdown(&packetHolderFreeList); // Signal the input send thread to drain all pending // input packets before shutting down. @@ -392,7 +436,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) { return 0; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -412,7 +456,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) { if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -427,7 +471,7 @@ int LiSendMousePositionEvent(short x, short y, short referenceWidth, short refer return -2; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -451,7 +495,7 @@ int LiSendMousePositionEvent(short x, short y, short referenceWidth, short refer if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -466,7 +510,7 @@ int LiSendMouseButtonEvent(char action, int button) { return -2; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -483,7 +527,7 @@ int LiSendMouseButtonEvent(char action, int button) { if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -498,7 +542,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) { return -2; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -557,7 +601,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) { if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -574,7 +618,7 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep return -2; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -625,7 +669,7 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err; @@ -662,7 +706,7 @@ int LiSendHighResScrollEvent(short scrollAmount) { return 0; } - holder = malloc(sizeof(*holder)); + holder = allocatePacketHolder(); if (holder == NULL) { return -1; } @@ -684,7 +728,7 @@ int LiSendHighResScrollEvent(short scrollAmount) { if (err != LBQ_SUCCESS) { LC_ASSERT(err == LBQ_BOUND_EXCEEDED); Limelog("Input queue reached maximum size limit\n"); - free(holder); + freePacketHolder(holder); } return err;