mirror of
https://github.com/moonlight-stream/moonlight-common-c.git
synced 2026-04-08 00:36:30 +00:00
Split control data into multiple channels and optimize packet flags based on type of data
This can significantly performance on lossy networks by avoiding HOL blocking.
This commit is contained in:
@@ -559,12 +559,17 @@ static bool isPacketSentWaitingForAck(ENetPacket* packet) {
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool reliable) {
|
||||
static bool sendMessageEnet(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) {
|
||||
ENetPacket* enetPacket;
|
||||
int err;
|
||||
|
||||
LC_ASSERT(AppVersionQuad[0] >= 5);
|
||||
|
||||
// Only send reliable packets to GFE
|
||||
if (!IS_SUNSHINE()) {
|
||||
flags = ENET_PACKET_FLAG_RELIABLE;
|
||||
}
|
||||
|
||||
if (encryptedControlStream) {
|
||||
PNVCTL_ENCRYPTED_PACKET_HEADER encPacket;
|
||||
PNVCTL_ENET_PACKET_HEADER_V2 packet;
|
||||
@@ -572,7 +577,7 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool
|
||||
|
||||
enetPacket = enet_packet_create(NULL,
|
||||
sizeof(*encPacket) + AES_GCM_TAG_LENGTH + sizeof(*packet) + paylen,
|
||||
reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED);
|
||||
flags);
|
||||
if (enetPacket == NULL) {
|
||||
return false;
|
||||
}
|
||||
@@ -606,7 +611,7 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool
|
||||
else {
|
||||
PNVCTL_ENET_PACKET_HEADER_V1 packet;
|
||||
enetPacket = enet_packet_create(NULL, sizeof(*packet) + paylen,
|
||||
reliable ? ENET_PACKET_FLAG_RELIABLE : ENET_PACKET_FLAG_UNSEQUENCED);
|
||||
flags);
|
||||
if (enetPacket == NULL) {
|
||||
return false;
|
||||
}
|
||||
@@ -625,11 +630,24 @@ static bool sendMessageEnet(short ptype, short paylen, const void* payload, bool
|
||||
enetPacket->userData = (void*)&packetFreed;
|
||||
enetPacket->freeCallback = enetPacketFreeCb;
|
||||
|
||||
// channelCount == 0 is possible if the peer is disconnected,
|
||||
// so we need to assign channel ID under the enetMutex to prevent
|
||||
// racing updates the peer.
|
||||
if (!IS_SUNSHINE() || peer->channelCount == 0) {
|
||||
// We always use a single channel for GFE.
|
||||
channelId = 0;
|
||||
}
|
||||
else if (channelId >= peer->channelCount) {
|
||||
// If this peer doesn't support enough channels, distribute the remaining channels onto the ones
|
||||
// the peer does support. We don't use the channel to distinguish traffic types, so this is safe.
|
||||
channelId %= peer->channelCount;
|
||||
}
|
||||
|
||||
// Queue the packet to be sent
|
||||
err = enet_peer_send(peer, 0, enetPacket);
|
||||
err = enet_peer_send(peer, channelId, enetPacket);
|
||||
|
||||
// Wait until the packet is actually sent to provide backpressure on senders
|
||||
if (err == 0 && reliable) {
|
||||
if (err == 0 && (flags & ENET_PACKET_FLAG_RELIABLE)) {
|
||||
// Try to send the packet
|
||||
enet_host_service(client, NULL, 0);
|
||||
|
||||
@@ -697,13 +715,13 @@ static bool sendMessageTcp(short ptype, short paylen, const void* payload) {
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool sendMessageAndForget(short ptype, short paylen, const void* payload) {
|
||||
static bool sendMessageAndForget(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) {
|
||||
bool ret;
|
||||
|
||||
// Unlike regular sockets, ENet sockets aren't safe to invoke from multiple
|
||||
// threads at once. We have to synchronize them with a lock.
|
||||
if (AppVersionQuad[0] >= 5) {
|
||||
ret = sendMessageEnet(ptype, paylen, payload, true);
|
||||
ret = sendMessageEnet(ptype, paylen, payload, channelId, flags);
|
||||
}
|
||||
else {
|
||||
ret = sendMessageTcp(ptype, paylen, payload);
|
||||
@@ -712,9 +730,9 @@ static bool sendMessageAndForget(short ptype, short paylen, const void* payload)
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload) {
|
||||
static bool sendMessageAndDiscardReply(short ptype, short paylen, const void* payload, uint8_t channelId, uint32_t flags) {
|
||||
if (AppVersionQuad[0] >= 5) {
|
||||
if (!sendMessageEnet(ptype, paylen, payload, true)) {
|
||||
if (!sendMessageEnet(ptype, paylen, payload, channelId, flags)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
@@ -1099,7 +1117,11 @@ static void lossStatsThreadFunc(void* context) {
|
||||
|
||||
while (LbqPollQueueElement(&frameFecStatusQueue, (void**)&queuedFrameStatus) == LBQ_SUCCESS) {
|
||||
// Send as an unreliable packet, since it's not a critical message
|
||||
if (!sendMessageEnet(SS_FRAME_FEC_PTYPE, sizeof(queuedFrameStatus->fecStatus), &queuedFrameStatus->fecStatus, false)) {
|
||||
if (!sendMessageEnet(SS_FRAME_FEC_PTYPE,
|
||||
sizeof(queuedFrameStatus->fecStatus),
|
||||
&queuedFrameStatus->fecStatus,
|
||||
CTRL_CHANNEL_GENERIC,
|
||||
ENET_PACKET_FLAG_UNSEQUENCED)) {
|
||||
Limelog("Loss Stats: Sending frame FEC status message failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
free(queuedFrameStatus);
|
||||
@@ -1111,7 +1133,11 @@ static void lossStatsThreadFunc(void* context) {
|
||||
}
|
||||
|
||||
// Send the message (and don't expect a response)
|
||||
if (!sendMessageAndForget(0x0200, sizeof(periodicPingPayload), periodicPingPayload)) {
|
||||
if (!sendMessageAndForget(0x0200,
|
||||
sizeof(periodicPingPayload),
|
||||
periodicPingPayload,
|
||||
CTRL_CHANNEL_GENERIC,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
return;
|
||||
@@ -1147,7 +1173,10 @@ static void lossStatsThreadFunc(void* context) {
|
||||
|
||||
// Send the message (and don't expect a response)
|
||||
if (!sendMessageAndForget(packetTypes[IDX_LOSS_STATS],
|
||||
payloadLengths[IDX_LOSS_STATS], lossStatsPayload)) {
|
||||
payloadLengths[IDX_LOSS_STATS],
|
||||
lossStatsPayload,
|
||||
CTRL_CHANNEL_GENERIC,
|
||||
0)) {
|
||||
free(lossStatsPayload);
|
||||
Limelog("Loss Stats: Transaction failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
@@ -1186,7 +1215,10 @@ static void requestIdrFrame(void) {
|
||||
|
||||
// Send the reference frame invalidation request and read the response
|
||||
if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES],
|
||||
payloadLengths[IDX_INVALIDATE_REF_FRAMES], payload)) {
|
||||
payloadLengths[IDX_INVALIDATE_REF_FRAMES],
|
||||
payload,
|
||||
CTRL_CHANNEL_URGENT,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
return;
|
||||
@@ -1195,7 +1227,10 @@ static void requestIdrFrame(void) {
|
||||
else {
|
||||
// Send IDR frame request and read the response
|
||||
if (!sendMessageAndDiscardReply(packetTypes[IDX_REQUEST_IDR_FRAME],
|
||||
payloadLengths[IDX_REQUEST_IDR_FRAME], preconstructedPayloads[IDX_REQUEST_IDR_FRAME])) {
|
||||
payloadLengths[IDX_REQUEST_IDR_FRAME],
|
||||
preconstructedPayloads[IDX_REQUEST_IDR_FRAME],
|
||||
CTRL_CHANNEL_URGENT,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Request IDR Frame: Transaction failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
return;
|
||||
@@ -1217,7 +1252,9 @@ static void requestInvalidateReferenceFrames(int startFrame, int endFrame) {
|
||||
|
||||
// Send the reference frame invalidation request and read the response
|
||||
if (!sendMessageAndDiscardReply(packetTypes[IDX_INVALIDATE_REF_FRAMES],
|
||||
payloadLengths[IDX_INVALIDATE_REF_FRAMES], payload)) {
|
||||
payloadLengths[IDX_INVALIDATE_REF_FRAMES],
|
||||
payload, CTRL_CHANNEL_URGENT,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Request Invaldiate Reference Frames: Transaction failed: %d\n", (int)LastSocketError());
|
||||
ListenerCallbacks.connectionTerminated(LastSocketFail());
|
||||
return;
|
||||
@@ -1326,11 +1363,11 @@ int stopControlStream(void) {
|
||||
}
|
||||
|
||||
// Called by the input stream to send a packet for Gen 5+ servers
|
||||
int sendInputPacketOnControlStream(unsigned char* data, int length) {
|
||||
int sendInputPacketOnControlStream(unsigned char* data, int length, uint8_t channelId, uint32_t flags) {
|
||||
LC_ASSERT(AppVersionQuad[0] >= 5);
|
||||
|
||||
// Send the input data (no reply expected)
|
||||
if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data) == 0) {
|
||||
if (sendMessageAndForget(packetTypes[IDX_INPUT_DATA], length, data, channelId, flags) == 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -1384,8 +1421,8 @@ int startControlStream(void) {
|
||||
enet_address_set_address(&address, (struct sockaddr *)&RemoteAddr, RemoteAddrLen);
|
||||
enet_address_set_port(&address, ControlPortNumber);
|
||||
|
||||
// Create a client that can use 1 outgoing connection and 1 channel
|
||||
client = enet_host_create(address.address.ss_family, NULL, 1, 1, 0, 0);
|
||||
// Create a client
|
||||
client = enet_host_create(address.address.ss_family, NULL, 1, CTRL_CHANNEL_MAX + 1, 0, 0);
|
||||
if (client == NULL) {
|
||||
stopping = true;
|
||||
return -1;
|
||||
@@ -1394,7 +1431,7 @@ int startControlStream(void) {
|
||||
client->intercept = ignoreDisconnectIntercept;
|
||||
|
||||
// Connect to the host
|
||||
peer = enet_host_connect(client, &address, 1, 0);
|
||||
peer = enet_host_connect(client, &address, CTRL_CHANNEL_MAX + 1, 0);
|
||||
if (peer == NULL) {
|
||||
stopping = true;
|
||||
enet_host_destroy(client);
|
||||
@@ -1471,8 +1508,10 @@ int startControlStream(void) {
|
||||
|
||||
// Send START A
|
||||
if (!sendMessageAndDiscardReply(packetTypes[IDX_START_A],
|
||||
payloadLengths[IDX_START_A],
|
||||
preconstructedPayloads[IDX_START_A])) {
|
||||
payloadLengths[IDX_START_A],
|
||||
preconstructedPayloads[IDX_START_A],
|
||||
CTRL_CHANNEL_GENERIC,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Start A failed: %d\n", (int)LastSocketError());
|
||||
err = LastSocketFail();
|
||||
stopping = true;
|
||||
@@ -1503,8 +1542,10 @@ int startControlStream(void) {
|
||||
|
||||
// Send START B
|
||||
if (!sendMessageAndDiscardReply(packetTypes[IDX_START_B],
|
||||
payloadLengths[IDX_START_B],
|
||||
preconstructedPayloads[IDX_START_B])) {
|
||||
payloadLengths[IDX_START_B],
|
||||
preconstructedPayloads[IDX_START_B],
|
||||
CTRL_CHANNEL_GENERIC,
|
||||
ENET_PACKET_FLAG_RELIABLE)) {
|
||||
Limelog("Start B failed: %d\n", (int)LastSocketError());
|
||||
err = LastSocketFail();
|
||||
stopping = true;
|
||||
|
||||
Reference in New Issue
Block a user