diff --git a/src/ControlStream.c b/src/ControlStream.c index 51f16ad..399407c 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -559,7 +559,7 @@ static bool isPacketSentWaitingForAck(ENetPacket* packet) { return false; } -static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { +static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) { ENetPacket* enetPacket; int err; @@ -639,30 +639,32 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint // Queue the packet to be sent err = enet_peer_send(peer, channelId, enetPacket); - // Try to send the packet - enet_host_service(client, NULL, 0); + // If there is no more data coming soon, send the packet now + if (!moreData) { + enet_host_service(client, NULL, 0); - // Wait until the packet is actually sent to provide backpressure on senders - if (err == 0 && (flags & ENET_PACKET_FLAG_RELIABLE)) { - // Don't wait longer than 10 milliseconds to avoid blocking callers for too long - for (int i = 0; i < 10; i++) { - // Break on disconnected, acked/freed, or sent (pending ack). - if (peer->state != ENET_PEER_STATE_CONNECTED || packetFreed || isPacketSentWaitingForAck(enetPacket)) { - break; + // Wait until the packet is actually sent to provide backpressure on senders + if (err == 0 && (flags & ENET_PACKET_FLAG_RELIABLE)) { + // Don't wait longer than 10 milliseconds to avoid blocking callers for too long + for (int i = 0; i < 10; i++) { + // Break on disconnected, acked/freed, or sent (pending ack). + if (peer->state != ENET_PEER_STATE_CONNECTED || packetFreed || isPacketSentWaitingForAck(enetPacket)) { + break; + } + + // Release the lock before sleeping to allow another thread to send/receive + PltUnlockMutex(&enetMutex); + PltSleepMs(1); + PltLockMutex(&enetMutex); + + // Try to send the packet again + enet_host_service(client, NULL, 0); } - // Release the lock before sleeping to allow another thread to send/receive - PltUnlockMutex(&enetMutex); - PltSleepMs(1); - PltLockMutex(&enetMutex); - - // Try to send the packet again - enet_host_service(client, NULL, 0); - } - - if (peer->state == ENET_PEER_STATE_CONNECTED && !packetFreed && !isPacketSentWaitingForAck(enetPacket)) { - Limelog("Control message took over 10 ms to send (net latency: %u ms | packet loss: %f%%)\n", - peer->roundTripTime, peer->packetLoss / (float)ENET_PEER_PACKET_LOSS_SCALE); + if (peer->state == ENET_PEER_STATE_CONNECTED && !packetFreed && !isPacketSentWaitingForAck(enetPacket)) { + Limelog("Control message took over 10 ms to send (net latency: %u ms | packet loss: %f%%)\n", + peer->roundTripTime, peer->packetLoss / (float)ENET_PEER_PACKET_LOSS_SCALE); + } } } @@ -708,13 +710,13 @@ static bool sendMessageTcp(short ptype, short paylen, const void* payload) { return true; } -static bool sendMessageAndForget(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { +static bool sendMessageAndForget(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) { bool ret; // Unlike regular sockets, ENet sockets aren't safe to invoke from multiple // threads at once. We have to synchronize them with a lock. if (AppVersionQuad[0] >= 5) { - ret = sendMessageEnet(ptype, paylen, payload, channelId, flags); + ret = sendMessageEnet(ptype, paylen, payload, channelId, flags, moreData); } else { ret = sendMessageTcp(ptype, paylen, payload); @@ -723,9 +725,9 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload, return ret; } -static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { +static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) { if (AppVersionQuad[0] >= 5) { - if (!sendMessageEnet(ptype, paylen, payload, channelId, flags)) { + if (!sendMessageEnet(ptype, paylen, payload, channelId, flags, moreData)) { return false; } } @@ -1114,7 +1116,8 @@ static void lossStatsThreadFunc(void* context) { sizeof(queuedFrameStatus->fecStatus), &queuedFrameStatus->fecStatus, CTRL_CHANNEL_GENERIC, - ENET_PACKET_FLAG_UNSEQUENCED)) { + ENET_PACKET_FLAG_UNSEQUENCED, + LbqGetItemCount(&frameFecStatusQueue) > 0)) { Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); free(queuedFrameStatus); @@ -1135,7 +1138,8 @@ static void lossStatsThreadFunc(void* context) { sizeof(periodicPingPayload), periodicPingPayload, CTRL_CHANNEL_GENERIC, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); return; @@ -1174,7 +1178,8 @@ static void lossStatsThreadFunc(void* context) { payloadLengths[IDX_LOSS_STATS], lossStatsPayload, CTRL_CHANNEL_GENERIC, - 0)) { + 0, + false)) { free(lossStatsPayload); Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); @@ -1216,7 +1221,8 @@ static void requestIdrFrame(void) { payloadLengths[IDX_INVALIDATE_REF_FRAMES], payload, CTRL_CHANNEL_URGENT, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); return; @@ -1228,7 +1234,8 @@ static void requestIdrFrame(void) { payloadLengths[IDX_REQUEST_IDR_FRAME], preconstructedPayloads[IDX_REQUEST_IDR_FRAME], CTRL_CHANNEL_URGENT, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); return; @@ -1252,7 +1259,8 @@ static void requestInvalidateReferenceFrames(int startFrame, int endFrame) { if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES], payloadLengths[IDX_INVALIDATE_REF_FRAMES], payload, CTRL_CHANNEL_URGENT, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); return; @@ -1361,17 +1369,26 @@ int stopControlStream(void) { } // Called by the input stream to send a packet for Gen 5+ servers -int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags) { +int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags, bool moreData) { LC_ASSERT(AppVersionQuad[0] >= 5); // Send the input data (no reply expected) - if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data, channelId, flags) == 0) { + if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data, channelId, flags, moreData) == 0) { return -1; } return 0; } +// Called by the input stream to flush queued packets before a batching wait +void flushInputOnControlStream(void) { + if (AppVersionQuad[0] >= 5) { + PltLockMutex(&enetMutex); + enet_host_flush(client); + PltUnlockMutex(&enetMutex); + } +} + bool isControlDataInTransit(void) { bool ret = false; @@ -1515,7 +1532,8 @@ int startControlStream(void) { payloadLengths[IDX_START_A], preconstructedPayloads[IDX_START_A], CTRL_CHANNEL_GENERIC, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Start A failed: %d\n", (int)LastSocketError()); err = LastSocketFail(); stopping = true; @@ -1549,7 +1567,8 @@ int startControlStream(void) { payloadLengths[IDX_START_B], preconstructedPayloads[IDX_START_B], CTRL_CHANNEL_GENERIC, - ENET_PACKET_FLAG_RELIABLE)) { + ENET_PACKET_FLAG_RELIABLE, + false)) { Limelog("Start B failed: %d\n", (int)LastSocketError()); err = LastSocketFail(); stopping = true; diff --git a/src/InputStream.c b/src/InputStream.c index cfb9e10..d2c5589 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -221,7 +221,8 @@ static bool sendInputPacket(PPACKET_HOLDER holder) { err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*)&holder->packet, PACKET_SIZE(holder), holder->channelId, - holder->enetPacketFlags); + holder->enetPacketFlags, + LbqGetItemCount(&packetQueue) > 0); if (err < 0) { Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err); ListenerCallbacks.connectionTerminated(err); @@ -271,7 +272,8 @@ static bool sendInputPacket(PPACKET_HOLDER holder) { err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*) encryptedBuffer, (int)(encryptedSize + sizeof(encryptedLengthPrefix)), holder->channelId, - holder->enetPacketFlags); + holder->enetPacketFlags, + LbqGetItemCount(&packetQueue) > 0); if (err < 0) { Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err); ListenerCallbacks.connectionTerminated(err); @@ -318,6 +320,7 @@ static void inputSendThreadProc(void* context) { // Delay for batching if required if (now < lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS) { + flushInputOnControlStream(); PltSleepMs((int)(lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS - now)); now = PltGetMillis(); } @@ -378,6 +381,7 @@ static void inputSendThreadProc(void* context) { // Delay for batching if required if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) { + flushInputOnControlStream(); PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now)); now = PltGetMillis(); } @@ -432,6 +436,7 @@ static void inputSendThreadProc(void* context) { // Delay for batching if required if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) { + flushInputOnControlStream(); PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now)); now = PltGetMillis(); } @@ -467,6 +472,7 @@ static void inputSendThreadProc(void* context) { // Delay for batching if required if (now < lastPenPacketTime + PEN_BATCHING_INTERVAL_MS) { + flushInputOnControlStream(); PltSleepMs((int)(lastPenPacketTime + PEN_BATCHING_INTERVAL_MS - now)); now = PltGetMillis(); } @@ -508,6 +514,7 @@ static void inputSendThreadProc(void* context) { // Delay for batching if required if (now < lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS) { + flushInputOnControlStream(); PltSleepMs((int)(lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS - now)); now = PltGetMillis(); } @@ -553,6 +560,7 @@ static void inputSendThreadProc(void* context) { // and UTF-8 text events with each other. We need to make sure any previous keyboard events // have been processed prior to sending these UTF-8 events to avoid interference between // the two (especially with modifier keys). + flushInputOnControlStream(); while (!PltIsThreadInterrupted(&inputSendThread) && isControlDataInTransit()) { PltSleepMs(10); } diff --git a/src/Limelight-internal.h b/src/Limelight-internal.h index 4b5789d..0427fe9 100644 --- a/src/Limelight-internal.h +++ b/src/Limelight-internal.h @@ -112,7 +112,8 @@ void connectionReceivedCompleteFrame(int frameIndex); void connectionSawFrame(int frameIndex); void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket); void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus); -int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags); +int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags, bool moreData); +void flushInputOnControlStream(); bool isControlDataInTransit(void); int performRtspHandshake(PSERVER_INFORMATION serverInfo);