diff --git a/limelight-common/ControlStream.c b/limelight-common/ControlStream.c index 7ad6a6a..118517b 100644 --- a/limelight-common/ControlStream.c +++ b/limelight-common/ControlStream.c @@ -4,11 +4,17 @@ #include "ByteBuffer.h" -// NV control stream packet header -typedef struct _NVCTL_PACKET_HEADER { +#include + +// NV control stream packet header for TCP +typedef struct _NVCTL_TCP_PACKET_HEADER { unsigned short type; unsigned short payloadLength; -} NVCTL_PACKET_HEADER, *PNVCTL_PACKET_HEADER; +} NVCTL_TCP_PACKET_HEADER, *PNVCTL_TCP_PACKET_HEADER; + +typedef struct _NVCTL_ENET_PACKET_HEADER { + unsigned short type; +} NVCTL_ENET_PACKET_HEADER, *PNVCTL_ENET_PACKET_HEADER; typedef struct _QUEUED_FRAME_INVALIDATION_TUPLE { int startFrame; @@ -17,6 +23,10 @@ typedef struct _QUEUED_FRAME_INVALIDATION_TUPLE { } QUEUED_FRAME_INVALIDATION_TUPLE, *PQUEUED_FRAME_INVALIDATION_TUPLE; static SOCKET ctlSock = INVALID_SOCKET; +static ENetHost* client; +static ENetPeer* peer; +static PLT_MUTEX enetMutex; + static PLT_THREAD lossStatsThread; static PLT_THREAD invalidateRefFramesThread; static PLT_EVENT invalidateRefFramesEvent; @@ -214,10 +224,10 @@ void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket) { lossCountSinceLastReport += (nextReceivedPacket - lastReceivedPacket) - 1; } -// Reads an NV control stream packet -static PNVCTL_PACKET_HEADER readNvctlPacket(void) { - NVCTL_PACKET_HEADER staticHeader; - PNVCTL_PACKET_HEADER fullPacket; +// Reads an NV control stream packet from the TCP connection +static PNVCTL_TCP_PACKET_HEADER readNvctlPacketTcp(void) { + NVCTL_TCP_PACKET_HEADER staticHeader; + PNVCTL_TCP_PACKET_HEADER fullPacket; SOCK_RET err; err = recv(ctlSock, (char*)&staticHeader, sizeof(staticHeader), 0); @@ -225,7 +235,7 @@ static PNVCTL_PACKET_HEADER readNvctlPacket(void) { return NULL; } - fullPacket = (PNVCTL_PACKET_HEADER)malloc(staticHeader.payloadLength + sizeof(staticHeader)); + fullPacket = (PNVCTL_TCP_PACKET_HEADER)malloc(staticHeader.payloadLength + sizeof(staticHeader)); if (fullPacket == NULL) { return NULL; } @@ -242,10 +252,53 @@ static PNVCTL_PACKET_HEADER readNvctlPacket(void) { return fullPacket; } -static int sendMessageAndForget(short ptype, short paylen, const void* payload) { - PNVCTL_PACKET_HEADER packet; +static int sendMessageEnet(short ptype, short paylen, const void* payload) { + PNVCTL_ENET_PACKET_HEADER packet; + ENetPacket* enetPacket; + ENetEvent event; + + LC_ASSERT(ServerMajorVersion > 5); + + packet = malloc(sizeof(*packet) + paylen); + if (packet == NULL) { + return 0; + } + + packet->type = ptype; + memcpy(&packet[1], payload, paylen); + + // Gen 5+ servers do control protocol over ENet instead of TCP + while (enet_host_service(client, &event, 0) > 0) { + if (event.type == ENET_EVENT_TYPE_RECEIVE) { + enet_packet_destroy(event.packet); + } + else if (event.type == ENET_EVENT_TYPE_DISCONNECT) { + Limelog("Control stream received disconnect event\n"); + free(packet); + return 0; + } + } + + enetPacket = enet_packet_create(packet, sizeof(*packet) + paylen, ENET_PACKET_FLAG_RELIABLE); + if (packet == NULL) { + free(packet); + return 0; + } + + enet_peer_send(peer, 0, enetPacket); + enet_host_flush(client); + + free(packet); + + return 1; +} + +static int sendMessageTcp(short ptype, short paylen, const void* payload) { + PNVCTL_TCP_PACKET_HEADER packet; SOCK_RET err; + LC_ASSERT(ServerMajorVersion < 5); + packet = malloc(sizeof(*packet) + paylen); if (packet == NULL) { return 0; @@ -255,7 +308,7 @@ static int sendMessageAndForget(short ptype, short paylen, const void* payload) packet->payloadLength = paylen; memcpy(&packet[1], payload, paylen); - err = send(ctlSock, (char*)packet, sizeof(*packet) + paylen, 0); + err = send(ctlSock, (char*) packet, sizeof(*packet) + paylen, 0); free(packet); if (err != sizeof(*packet) + paylen) { @@ -265,23 +318,60 @@ static int sendMessageAndForget(short ptype, short paylen, const void* payload) return 1; } -static PNVCTL_PACKET_HEADER sendMessage(short ptype, short paylen, const void* payload) { - if (!sendMessageAndForget(ptype, paylen, payload)) { - return NULL; +static int sendMessageAndForget(short ptype, short paylen, const void* payload) { + int ret; + + // Unlike regular sockets, ENet sockets aren't safe to invoke from multiple + // threads at once. We have to synchronize them with a lock. + if (ServerMajorVersion >= 5) { + PltLockMutex(&enetMutex); + ret = sendMessageEnet(ptype, paylen, payload); + PltUnlockMutex(&enetMutex); + } + else { + ret = sendMessageTcp(ptype, paylen, payload); } - return readNvctlPacket(); + return ret; } static int sendMessageAndDiscardReply(short ptype, short paylen, const void* payload) { - PNVCTL_PACKET_HEADER reply; + // Discard the response + if (ServerMajorVersion >= 5) { + ENetEvent event; - reply = sendMessage(ptype, paylen, payload); - if (reply == NULL) { - return 0; + PltLockMutex(&enetMutex); + + if (!sendMessageEnet(ptype, paylen, payload)) { + PltUnlockMutex(&enetMutex); + return 0; + } + + if (enet_host_service(client, &event, CONTROL_STREAM_TIMEOUT_SEC * 1000) <= 0 || + event.type != ENET_EVENT_TYPE_RECEIVE) { + PltUnlockMutex(&enetMutex); + return 0; + } + + enet_packet_destroy(event.packet); + + PltUnlockMutex(&enetMutex); + } + else { + PNVCTL_TCP_PACKET_HEADER reply; + + if (!sendMessageTcp(ptype, paylen, payload)) { + return 0; + } + + reply = readNvctlPacketTcp(); + if (reply == NULL) { + return 0; + } + + free(reply); } - free(reply); return 1; } @@ -433,6 +523,10 @@ int stopControlStream(void) { LbqSignalQueueShutdown(&invalidReferenceFrameTuples); PltSetEvent(&invalidateRefFramesEvent); + if (peer != NULL) { + enet_peer_reset(peer); + } + if (ctlSock != INVALID_SOCKET) { shutdownTcpSocket(ctlSock); } @@ -445,12 +539,32 @@ int stopControlStream(void) { PltCloseThread(&lossStatsThread); PltCloseThread(&invalidateRefFramesThread); + + peer = NULL; + if (client != NULL) { + enet_host_destroy(client); + client = NULL; + } if (ctlSock != INVALID_SOCKET) { closeSocket(ctlSock); ctlSock = INVALID_SOCKET; } + PltDeleteMutex(&enetMutex); + + return 0; +} + +// Called by the input stream to send a packet for Gen 5+ servers +int sendInputPacketOnControlStream(unsigned char* data, int length) { + LC_ASSERT(ServerMajorVersion >= 5); + + // Send the input data (no reply expected) + if (sendMessageAndForget(0x0207, length, data) == 0) { + return -1; + } + return 0; } @@ -458,13 +572,46 @@ int stopControlStream(void) { int startControlStream(void) { int err; - ctlSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, - 47995, CONTROL_STREAM_TIMEOUT_SEC); - if (ctlSock == INVALID_SOCKET) { - return LastSocketFail(); - } + PltCreateMutex(&enetMutex); - enableNoDelay(ctlSock); + if (ServerMajorVersion >= 5) { + ENetAddress address; + ENetEvent event; + + // Create a client that can use 1 outgoing connection and 1 channel + client = enet_host_create(NULL, 1, 1, 0, 0); + if (client == NULL) { + return -1; + } + + enet_address_set_host(&address, RemoteAddrString); + address.port = 47999; + + // Connect to the host + peer = enet_host_connect(client, &address, 1, 0); + if (peer == NULL) { + return -1; + } + + // Wait for the connect to complete + if (enet_host_service(client, &event, CONTROL_STREAM_TIMEOUT_SEC * 1000) <= 0 || + event.type != ENET_EVENT_TYPE_CONNECT) { + Limelog("RTSP: Failed to connect to UDP port 47999\n"); + return -1; + } + + // Ensure the connect verify ACK is sent immediately + enet_host_flush(client); + } + else { + ctlSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, + 47995, CONTROL_STREAM_TIMEOUT_SEC); + if (ctlSock == INVALID_SOCKET) { + return LastSocketFail(); + } + + enableNoDelay(ctlSock); + } // Send START A if (!sendMessageAndDiscardReply(packetTypes[IDX_START_A], diff --git a/limelight-common/InputStream.c b/limelight-common/InputStream.c index 7e77152..a1c5721 100644 --- a/limelight-common/InputStream.c +++ b/limelight-common/InputStream.c @@ -258,13 +258,24 @@ static void inputSendThreadProc(void* context) { memcpy(&encryptedBuffer[OAES_DATA_OFFSET - sizeof(encryptedLengthPrefix)], &encryptedLengthPrefix, sizeof(encryptedLengthPrefix)); - // Send the encrypted payload - err = send(inputSock, (const char*)&encryptedBuffer[OAES_DATA_OFFSET - sizeof(encryptedLengthPrefix)], - (int)(encryptedSize + sizeof(encryptedLengthPrefix)), 0); - if (err <= 0) { - Limelog("Input: send() failed: %d\n", (int)LastSocketError()); - ListenerCallbacks.connectionTerminated(LastSocketError()); - return; + if (ServerMajorVersion < 5) { + // Send the encrypted payload + err = send(inputSock, (const char*) &encryptedBuffer[OAES_DATA_OFFSET - sizeof(encryptedLengthPrefix)], + (int) (encryptedSize + sizeof(encryptedLengthPrefix)), 0); + if (err <= 0) { + Limelog("Input: send() failed: %d\n", (int) LastSocketError()); + ListenerCallbacks.connectionTerminated(LastSocketError()); + return; + } + } + else { + err = sendInputPacketOnControlStream(&encryptedBuffer[OAES_DATA_OFFSET - sizeof(encryptedLengthPrefix)], + (int) (encryptedSize + sizeof(encryptedLengthPrefix))); + if (err < 0) { + Limelog("Input: sendInputPacketOnControlStream() failed: %d\n", err); + ListenerCallbacks.connectionTerminated(LastSocketError()); + return; + } } } } @@ -273,13 +284,16 @@ static void inputSendThreadProc(void* context) { int startInputStream(void) { int err; - inputSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, - 35043, INPUT_STREAM_TIMEOUT_SEC); - if (inputSock == INVALID_SOCKET) { - return LastSocketFail(); - } + // After Gen 5, we send input on the control stream + if (ServerMajorVersion < 5) { + inputSock = connectTcpSocket(&RemoteAddr, RemoteAddrLen, + 35043, INPUT_STREAM_TIMEOUT_SEC); + if (inputSock == INVALID_SOCKET) { + return LastSocketFail(); + } - enableNoDelay(inputSock); + enableNoDelay(inputSock); + } err = PltCreateThread(inputSendThreadProc, NULL, &inputSendThread); if (err != 0) { diff --git a/limelight-common/Limelight-internal.h b/limelight-common/Limelight-internal.h index f32e025..2e337b0 100644 --- a/limelight-common/Limelight-internal.h +++ b/limelight-common/Limelight-internal.h @@ -34,6 +34,7 @@ void connectionDetectedFrameLoss(int startFrame, int endFrame); void connectionReceivedCompleteFrame(int frameIndex); void connectionSawFrame(int frameIndex); void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket); +int sendInputPacketOnControlStream(unsigned char* data, int length); int performRtspHandshake(void); void terminateRtspHandshake(void); diff --git a/limelight-common/SdpGenerator.c b/limelight-common/SdpGenerator.c index 5c87bba..013df7e 100644 --- a/limelight-common/SdpGenerator.c +++ b/limelight-common/SdpGenerator.c @@ -148,9 +148,9 @@ static int addGen4Options(PSDP_OPTION* head, char* addrStr) { static int addGen5Options(PSDP_OPTION* head) { int err = 0; - // We want to use the legacy TCP connections for control and input rather than the new UDP stuff - err |= addAttributeString(head, "x-nv-general.useReliableUdp", "0"); - err |= addAttributeString(head, "x-nv-ri.useControlChannel", "0"); + // We want to use the new ENet connections for control and input + err |= addAttributeString(head, "x-nv-general.useReliableUdp", "1"); + err |= addAttributeString(head, "x-nv-ri.useControlChannel", "1"); // Disable dynamic resolution switching err |= addAttributeString(head, "x-nv-vqos[0].drc.enable", "0");