From 54d46ca80f1b1e8dabeb2edb1136170489722599 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Tue, 14 Feb 2023 23:02:18 -0600 Subject: [PATCH] Put backpressure on control stream sender when the window fills up Without any backpressure, ENet will just let packets accumulate forever. --- src/ControlStream.c | 68 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 2 deletions(-) diff --git a/src/ControlStream.c b/src/ControlStream.c index 90ff624..b9a9063 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -489,6 +489,33 @@ static bool decryptControlMessageToV1(PNVCTL_ENCRYPTED_PACKET_HEADER encPacket, return true; } +static void enetPacketFreeCb(ENetPacket* packet) { + if (packet->userData) { + // userData contains a bool that we will set when freed + *(bool*)packet->userData = true; + } +} + + +// Must be called with enetMutex held +static bool isPacketSentWaitingForAck(ENetPacket* packet) { + ENetOutgoingCommand* outgoingCommand = NULL; + ENetListIterator currentCommand; + + // Look for our packet on the sent commands list + for (currentCommand = enet_list_begin(&peer->sentReliableCommands); + currentCommand != enet_list_end(&peer->sentReliableCommands); + currentCommand = enet_list_next(currentCommand)) + { + outgoingCommand = (ENetOutgoingCommand*)currentCommand; + if (outgoingCommand->packet == packet) { + return true; + } + } + + return false; +} + static bool sendMessageEnet(short ptype, short paylen, const void* payload) { ENetPacket* enetPacket; int err; @@ -547,11 +574,48 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload) { PltLockMutex(&enetMutex); } + bool packetFreed = false; + + // Set a callback to use to let us know if the packet has been freed. + // Freeing can only happen when the packet is acked or send fails. + enetPacket->userData = &packetFreed; + enetPacket->freeCallback = enetPacketFreeCb; + // Queue the packet to be sent err = enet_peer_send(peer, 0, enetPacket); - // Actually send it - enet_host_service(client, NULL, 0); + // Wait until the packet is actually sent to provide backpressure on senders + if (err == 0) { + // Try to send the packet + enet_host_service(client, NULL, 0); + + // Don't wait longer than 10 milliseconds to avoid blocking callers for too long + for (int i = 0; i < 10; i++) { + // Break on disconnected, acked/freed, or sent (pending ack). + if (peer->state != ENET_PEER_STATE_CONNECTED || packetFreed || isPacketSentWaitingForAck(enetPacket)) { + break; + } + + // Release the lock before sleeping to allow another thread to send/receive + PltUnlockMutex(&enetMutex); + PltSleepMs(1); + PltLockMutex(&enetMutex); + + // Try to send the packet again + enet_host_service(client, NULL, 0); + } + + if (peer->state == ENET_PEER_STATE_CONNECTED && !packetFreed && !isPacketSentWaitingForAck(enetPacket)) { + Limelog("Control message took over 10 ms to send (net latency: %u ms | packet loss: %f%%)\n", + peer->roundTripTime, peer->packetLoss / (float)ENET_PEER_PACKET_LOSS_SCALE); + } + } + + // Remove the free callback now that the packet was sent + if (!packetFreed) { + enetPacket->userData = NULL; + enetPacket->freeCallback = NULL; + } PltUnlockMutex(&enetMutex);