Introduce a list to cache freed input packet entries for reuse

This commit is contained in:
Cameron Gutman
2021-06-12 10:43:50 -05:00
parent a13eeddacf
commit ac9c4e82ac

View File

@@ -6,11 +6,14 @@ static bool initialized;
static PPLT_CRYPTO_CONTEXT cryptoContext;
static LINKED_BLOCKING_QUEUE packetQueue;
static LINKED_BLOCKING_QUEUE packetHolderFreeList;
static PLT_THREAD inputSendThread;
#define MAX_INPUT_PACKET_SIZE 128
#define INPUT_STREAM_TIMEOUT_SEC 10
#define MAX_QUEUED_INPUT_PACKETS 150
// Contains input stream packets
typedef struct _PACKET_HOLDER {
int packetLength;
@@ -33,7 +36,8 @@ int initializeInputStream(void) {
// Set a high maximum queue size limit to ensure input isn't dropped
// while the input send thread is blocked for short periods.
LbqInitializeLinkedBlockingQueue(&packetQueue, 150);
LbqInitializeLinkedBlockingQueue(&packetQueue, MAX_QUEUED_INPUT_PACKETS);
LbqInitializeLinkedBlockingQueue(&packetHolderFreeList, MAX_QUEUED_INPUT_PACKETS);
cryptoContext = PltCreateCryptoContext();
return 0;
@@ -55,6 +59,17 @@ void destroyInputStream(void) {
entry = nextEntry;
}
entry = LbqDestroyLinkedBlockingQueue(&packetHolderFreeList);
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored in the data buffer
free(entry->data);
entry = nextEntry;
}
}
static int encryptData(unsigned char* plaintext, int plaintextLen,
@@ -92,6 +107,34 @@ static int encryptData(unsigned char* plaintext, int plaintextLen,
}
}
static void freePacketHolder(PPACKET_HOLDER holder) {
// Place the packet holder back into the free list
if (LbqOfferQueueItem(&packetHolderFreeList, holder, &holder->entry) != LBQ_SUCCESS) {
free(holder);
}
}
static PPACKET_HOLDER allocatePacketHolder(void) {
PPACKET_HOLDER holder;
int err;
// Grab an entry from the free list (if available)
err = LbqPollQueueElement(&packetHolderFreeList, (void**)&holder);
if (err == LBQ_SUCCESS) {
return holder;
}
else if (err == LBQ_INTERRUPTED) {
// We're shutting down. Don't bother allocating.
return NULL;
}
else {
LC_ASSERT(err == LBQ_NO_ELEMENT);
// Otherwise we'll have to allocate
return malloc(sizeof(*holder));
}
}
// Input thread proc
static void inputSendThreadProc(void* context) {
SOCK_RET err;
@@ -154,7 +197,7 @@ static void inputSendThreadProc(void* context) {
origPkt->rightStickY = newPkt->rightStickY;
// Free the batched packet holder
free(controllerBatchHolder);
freePacketHolder(controllerBatchHolder);
}
}
// If it's a relative mouse move packet, we can also do batching
@@ -198,7 +241,7 @@ static void inputSendThreadProc(void* context) {
totalDeltaY += partialDeltaY;
// Free the batched packet holder
free(mouseBatchHolder);
freePacketHolder(mouseBatchHolder);
}
// Update the original packet
@@ -226,7 +269,7 @@ static void inputSendThreadProc(void* context) {
}
// Replace the current packet with the new one
free(holder);
freePacketHolder(holder);
holder = mouseBatchHolder;
}
}
@@ -236,7 +279,7 @@ static void inputSendThreadProc(void* context) {
// the encryption.
if (encryptedControlStream) {
err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*)&holder->packet, holder->packetLength);
free(holder);
freePacketHolder(holder);
if (err < 0) {
Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err);
ListenerCallbacks.connectionTerminated(err);
@@ -248,7 +291,7 @@ static void inputSendThreadProc(void* context) {
encryptedSize = sizeof(encryptedBuffer) - 4;
err = encryptData((unsigned char*)&holder->packet, holder->packetLength,
(unsigned char*)&encryptedBuffer[4], (int*)&encryptedSize);
free(holder);
freePacketHolder(holder);
if (err != 0) {
Limelog("Input: Encryption failed: %d\n", (int)err);
ListenerCallbacks.connectionTerminated(err);
@@ -303,7 +346,7 @@ static int sendEnableHaptics(void) {
return 0;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -317,7 +360,7 @@ static int sendEnableHaptics(void) {
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -360,6 +403,7 @@ int startInputStream(void) {
int stopInputStream(void) {
// No more packets should be queued now
initialized = false;
LbqSignalQueueShutdown(&packetHolderFreeList);
// Signal the input send thread to drain all pending
// input packets before shutting down.
@@ -392,7 +436,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) {
return 0;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -412,7 +456,7 @@ int LiSendMouseMoveEvent(short deltaX, short deltaY) {
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -427,7 +471,7 @@ int LiSendMousePositionEvent(short x, short y, short referenceWidth, short refer
return -2;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -451,7 +495,7 @@ int LiSendMousePositionEvent(short x, short y, short referenceWidth, short refer
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -466,7 +510,7 @@ int LiSendMouseButtonEvent(char action, int button) {
return -2;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -483,7 +527,7 @@ int LiSendMouseButtonEvent(char action, int button) {
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -498,7 +542,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) {
return -2;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -557,7 +601,7 @@ int LiSendKeyboardEvent(short keyCode, char keyAction, char modifiers) {
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -574,7 +618,7 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep
return -2;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -625,7 +669,7 @@ static int sendControllerEventInternal(short controllerNumber, short activeGamep
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;
@@ -662,7 +706,7 @@ int LiSendHighResScrollEvent(short scrollAmount) {
return 0;
}
holder = malloc(sizeof(*holder));
holder = allocatePacketHolder();
if (holder == NULL) {
return -1;
}
@@ -684,7 +728,7 @@ int LiSendHighResScrollEvent(short scrollAmount) {
if (err != LBQ_SUCCESS) {
LC_ASSERT(err == LBQ_BOUND_EXCEEDED);
Limelog("Input queue reached maximum size limit\n");
free(holder);
freePacketHolder(holder);
}
return err;