Batch control stream messages

This commit is contained in:
Cameron Gutman
2023-08-19 11:27:16 -05:00
parent 8b0dbf7158
commit 78e06eb613
3 changed files with 67 additions and 39 deletions
+35 -16
View File
@@ -559,7 +559,7 @@ static bool isPacketSentWaitingForAck(ENetPacket* packet) {
return false; return false;
} }
static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) {
ENetPacket* enetPacket; ENetPacket* enetPacket;
int err; int err;
@@ -639,7 +639,8 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint
// Queue the packet to be sent // Queue the packet to be sent
err = enet_peer_send(peer, channelId, enetPacket); err = enet_peer_send(peer, channelId, enetPacket);
// Try to send the packet // If there is no more data coming soon, send the packet now
if (!moreData) {
enet_host_service(client, NULL, 0); enet_host_service(client, NULL, 0);
// Wait until the packet is actually sent to provide backpressure on senders // Wait until the packet is actually sent to provide backpressure on senders
@@ -665,6 +666,7 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint
peer->roundTripTime, peer->packetLoss / (float)ENET_PEER_PACKET_LOSS_SCALE); peer->roundTripTime, peer->packetLoss / (float)ENET_PEER_PACKET_LOSS_SCALE);
} }
} }
}
// Remove the free callback now that the packet was sent // Remove the free callback now that the packet was sent
if (!packetFreed) { if (!packetFreed) {
@@ -708,13 +710,13 @@ static bool sendMessageTcp(short ptype, short paylen, const void* payload) {
return true; return true;
} }
static bool sendMessageAndForget(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { static bool sendMessageAndForget(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) {
bool ret; bool ret;
// Unlike regular sockets, ENet sockets aren't safe to invoke from multiple // Unlike regular sockets, ENet sockets aren't safe to invoke from multiple
// threads at once. We have to synchronize them with a lock. // threads at once. We have to synchronize them with a lock.
if (AppVersionQuad[0] >= 5) { if (AppVersionQuad[0] >= 5) {
ret = sendMessageEnet(ptype, paylen, payload, channelId, flags); ret = sendMessageEnet(ptype, paylen, payload, channelId, flags, moreData);
} }
else { else {
ret = sendMessageTcp(ptype, paylen, payload); ret = sendMessageTcp(ptype, paylen, payload);
@@ -723,9 +725,9 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload,
return ret; return ret;
} }
static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) { static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags, bool moreData) {
if (AppVersionQuad[0] >= 5) { if (AppVersionQuad[0] >= 5) {
if (!sendMessageEnet(ptype, paylen, payload, channelId, flags)) { if (!sendMessageEnet(ptype, paylen, payload, channelId, flags, moreData)) {
return false; return false;
} }
} }
@@ -1114,7 +1116,8 @@ static void lossStatsThreadFunc(void* context) {
sizeof(queuedFrameStatus->fecStatus), sizeof(queuedFrameStatus->fecStatus),
&queuedFrameStatus->fecStatus, &queuedFrameStatus->fecStatus,
CTRL_CHANNEL_GENERIC, CTRL_CHANNEL_GENERIC,
ENET_PACKET_FLAG_UNSEQUENCED)) { ENET_PACKET_FLAG_UNSEQUENCED,
LbqGetItemCount(&frameFecStatusQueue) > 0)) {
Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError()); Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
free(queuedFrameStatus); free(queuedFrameStatus);
@@ -1135,7 +1138,8 @@ static void lossStatsThreadFunc(void* context) {
sizeof(periodicPingPayload), sizeof(periodicPingPayload),
periodicPingPayload, periodicPingPayload,
CTRL_CHANNEL_GENERIC, CTRL_CHANNEL_GENERIC,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError()); Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
return; return;
@@ -1174,7 +1178,8 @@ static void lossStatsThreadFunc(void* context) {
payloadLengths[IDX_LOSS_STATS], payloadLengths[IDX_LOSS_STATS],
lossStatsPayload, lossStatsPayload,
CTRL_CHANNEL_GENERIC, CTRL_CHANNEL_GENERIC,
0)) { 0,
false)) {
free(lossStatsPayload); free(lossStatsPayload);
Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError()); Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
@@ -1216,7 +1221,8 @@ static void requestIdrFrame(void) {
payloadLengths[IDX_INVALIDATE_REF_FRAMES], payloadLengths[IDX_INVALIDATE_REF_FRAMES],
payload, payload,
CTRL_CHANNEL_URGENT, CTRL_CHANNEL_URGENT,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError()); Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
return; return;
@@ -1228,7 +1234,8 @@ static void requestIdrFrame(void) {
payloadLengths[IDX_REQUEST_IDR_FRAME], payloadLengths[IDX_REQUEST_IDR_FRAME],
preconstructedPayloads[IDX_REQUEST_IDR_FRAME], preconstructedPayloads[IDX_REQUEST_IDR_FRAME],
CTRL_CHANNEL_URGENT, CTRL_CHANNEL_URGENT,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError()); Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
return; return;
@@ -1252,7 +1259,8 @@ static void requestInvalidateReferenceFrames(int startFrame, int endFrame) {
if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES], if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES],
payloadLengths[IDX_INVALIDATE_REF_FRAMES], payloadLengths[IDX_INVALIDATE_REF_FRAMES],
payload, CTRL_CHANNEL_URGENT, payload, CTRL_CHANNEL_URGENT,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int)LastSocketError()); Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail()); ListenerCallbacks.connectionTerminated(LastSocketFail());
return; return;
@@ -1361,17 +1369,26 @@ int stopControlStream(void) {
} }
// Called by the input stream to send a packet for Gen 5+ servers // Called by the input stream to send a packet for Gen 5+ servers
int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags) { int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags, bool moreData) {
LC_ASSERT(AppVersionQuad[0] >= 5); LC_ASSERT(AppVersionQuad[0] >= 5);
// Send the input data (no reply expected) // Send the input data (no reply expected)
if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data, channelId, flags) == 0) { if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data, channelId, flags, moreData) == 0) {
return -1; return -1;
} }
return 0; return 0;
} }
// Called by the input stream to flush queued packets before a batching wait
void flushInputOnControlStream(void) {
if (AppVersionQuad[0] >= 5) {
PltLockMutex(&enetMutex);
enet_host_flush(client);
PltUnlockMutex(&enetMutex);
}
}
bool isControlDataInTransit(void) { bool isControlDataInTransit(void) {
bool ret = false; bool ret = false;
@@ -1515,7 +1532,8 @@ int startControlStream(void) {
payloadLengths[IDX_START_A], payloadLengths[IDX_START_A],
preconstructedPayloads[IDX_START_A], preconstructedPayloads[IDX_START_A],
CTRL_CHANNEL_GENERIC, CTRL_CHANNEL_GENERIC,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Start A failed: %d\n", (int)LastSocketError()); Limelog("Start A failed: %d\n", (int)LastSocketError());
err = LastSocketFail(); err = LastSocketFail();
stopping = true; stopping = true;
@@ -1549,7 +1567,8 @@ int startControlStream(void) {
payloadLengths[IDX_START_B], payloadLengths[IDX_START_B],
preconstructedPayloads[IDX_START_B], preconstructedPayloads[IDX_START_B],
CTRL_CHANNEL_GENERIC, CTRL_CHANNEL_GENERIC,
ENET_PACKET_FLAG_RELIABLE)) { ENET_PACKET_FLAG_RELIABLE,
false)) {
Limelog("Start B failed: %d\n", (int)LastSocketError()); Limelog("Start B failed: %d\n", (int)LastSocketError());
err = LastSocketFail(); err = LastSocketFail();
stopping = true; stopping = true;
+10 -2
View File
@@ -221,7 +221,8 @@ static bool sendInputPacket(PPACKET_HOLDER holder) {
err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*)&holder->packet, err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*)&holder->packet,
PACKET_SIZE(holder), PACKET_SIZE(holder),
holder->channelId, holder->channelId,
holder->enetPacketFlags); holder->enetPacketFlags,
LbqGetItemCount(&packetQueue) > 0);
if (err < 0) { if (err < 0) {
Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err); Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err);
ListenerCallbacks.connectionTerminated(err); ListenerCallbacks.connectionTerminated(err);
@@ -271,7 +272,8 @@ static bool sendInputPacket(PPACKET_HOLDER holder) {
err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*) encryptedBuffer, err = (SOCK_RET)sendInputPacketOnControlStream((unsigned char*) encryptedBuffer,
(int)(encryptedSize + sizeof(encryptedLengthPrefix)), (int)(encryptedSize + sizeof(encryptedLengthPrefix)),
holder->channelId, holder->channelId,
holder->enetPacketFlags); holder->enetPacketFlags,
LbqGetItemCount(&packetQueue) > 0);
if (err < 0) { if (err < 0) {
Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err); Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", (int) err);
ListenerCallbacks.connectionTerminated(err); ListenerCallbacks.connectionTerminated(err);
@@ -318,6 +320,7 @@ static void inputSendThreadProc(void* context) {
// Delay for batching if required // Delay for batching if required
if (now < lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS) { if (now < lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS) {
flushInputOnControlStream();
PltSleepMs((int)(lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS - now)); PltSleepMs((int)(lastControllerPacketTime + CONTROLLER_BATCHING_INTERVAL_MS - now));
now = PltGetMillis(); now = PltGetMillis();
} }
@@ -378,6 +381,7 @@ static void inputSendThreadProc(void* context) {
// Delay for batching if required // Delay for batching if required
if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) { if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) {
flushInputOnControlStream();
PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now)); PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now));
now = PltGetMillis(); now = PltGetMillis();
} }
@@ -432,6 +436,7 @@ static void inputSendThreadProc(void* context) {
// Delay for batching if required // Delay for batching if required
if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) { if (now < lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS) {
flushInputOnControlStream();
PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now)); PltSleepMs((int)(lastMousePacketTime + MOUSE_BATCHING_INTERVAL_MS - now));
now = PltGetMillis(); now = PltGetMillis();
} }
@@ -467,6 +472,7 @@ static void inputSendThreadProc(void* context) {
// Delay for batching if required // Delay for batching if required
if (now < lastPenPacketTime + PEN_BATCHING_INTERVAL_MS) { if (now < lastPenPacketTime + PEN_BATCHING_INTERVAL_MS) {
flushInputOnControlStream();
PltSleepMs((int)(lastPenPacketTime + PEN_BATCHING_INTERVAL_MS - now)); PltSleepMs((int)(lastPenPacketTime + PEN_BATCHING_INTERVAL_MS - now));
now = PltGetMillis(); now = PltGetMillis();
} }
@@ -508,6 +514,7 @@ static void inputSendThreadProc(void* context) {
// Delay for batching if required // Delay for batching if required
if (now < lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS) { if (now < lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS) {
flushInputOnControlStream();
PltSleepMs((int)(lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS - now)); PltSleepMs((int)(lastMotionPacketTime + MOTION_BATCHING_INTERVAL_MS - now));
now = PltGetMillis(); now = PltGetMillis();
} }
@@ -553,6 +560,7 @@ static void inputSendThreadProc(void* context) {
// and UTF-8 text events with each other. We need to make sure any previous keyboard events // and UTF-8 text events with each other. We need to make sure any previous keyboard events
// have been processed prior to sending these UTF-8 events to avoid interference between // have been processed prior to sending these UTF-8 events to avoid interference between
// the two (especially with modifier keys). // the two (especially with modifier keys).
flushInputOnControlStream();
while (!PltIsThreadInterrupted(&inputSendThread) && isControlDataInTransit()) { while (!PltIsThreadInterrupted(&inputSendThread) && isControlDataInTransit()) {
PltSleepMs(10); PltSleepMs(10);
} }
+2 -1
View File
@@ -112,7 +112,8 @@ void connectionReceivedCompleteFrame(int frameIndex);
void connectionSawFrame(int frameIndex); void connectionSawFrame(int frameIndex);
void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket); void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket);
void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus); void connectionSendFrameFecStatus(PSS_FRAME_FEC_STATUS fecStatus);
int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags); int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags, bool moreData);
void flushInputOnControlStream();
bool isControlDataInTransit(void); bool isControlDataInTransit(void);
int performRtspHandshake(PSERVER_INFORMATION serverInfo); int performRtspHandshake(PSERVER_INFORMATION serverInfo);