Implement RTSP over ENet for GFE 2.11

This commit is contained in:
Cameron Gutman 2016-03-04 00:40:12 -05:00
parent c44cdffdd5
commit 31ce0f5c87
5 changed files with 213 additions and 9 deletions

View File

@ -8,6 +8,7 @@ static PLT_THREAD terminationCallbackThread;
static long terminationCallbackErrorCode;
// Common globals
char* RemoteAddrString;
struct sockaddr_storage RemoteAddr;
SOCKADDR_LEN RemoteAddrLen;
int ServerMajorVersion;
@ -108,6 +109,11 @@ void LiStopConnection(void) {
Limelog("done\n");
}
LC_ASSERT(stage == STAGE_NONE);
if (RemoteAddrString != NULL) {
free(RemoteAddrString);
RemoteAddrString = NULL;
}
}
static void terminationCallbackThreadFunc(void* context)
@ -181,6 +187,7 @@ int LiStartConnection(const char* host, PSTREAM_CONFIGURATION streamConfig, PCON
NegotiatedVideoFormat = 0;
ServerMajorVersion = _serverMajorVersion;
memcpy(&StreamConfig, streamConfig, sizeof(StreamConfig));
RemoteAddrString = strdup(host);
// Replace missing callbacks with placeholders
fixupMissingCallbacks(&drCallbacks, &arCallbacks, &clCallbacks);

View File

@ -7,6 +7,7 @@
#include "Video.h"
// Common globals
extern char* RemoteAddrString;
extern struct sockaddr_storage RemoteAddr;
extern SOCKADDR_LEN RemoteAddrLen;
extern int ServerMajorVersion;

View File

@ -1,6 +1,8 @@
#include "PlatformThreads.h"
#include "Platform.h"
#include <enet/enet.h>
int initializePlatformSockets(void);
void cleanupPlatformSockets(void);
@ -233,6 +235,11 @@ int initializePlatform(void) {
int err;
err = initializePlatformSockets();
if (err != 0) {
return err;
}
err = enet_initialize();
if (err != 0) {
return err;
}
@ -242,6 +249,8 @@ int initializePlatform(void) {
void cleanupPlatform(void) {
cleanupPlatformSockets();
enet_deinitialize();
LC_ASSERT(running_threads == 0);
}

View File

@ -1,10 +1,11 @@
#include "Limelight-internal.h"
#include "Rtsp.h"
#include <enet/enet.h>
#define RTSP_MAX_RESP_SIZE 32768
#define RTSP_TIMEOUT_SEC 10
static SOCKET sock = INVALID_SOCKET;
static int currentSeqNumber;
static char rtspTargetUrl[256];
static char sessionIdString[16];
@ -12,6 +13,10 @@ static int hasSessionId;
static char responseBuffer[RTSP_MAX_RESP_SIZE];
static int rtspClientVersion;
static SOCKET sock = INVALID_SOCKET;
static ENetHost* client;
static ENetPeer* peer;
// Create RTSP Option
static POPTION_ITEM createOptionItem(char* option, char* content)
{
@ -78,8 +83,120 @@ static int initializeRtspRequest(PRTSP_MESSAGE msg, char* command, char* target)
return 1;
}
// Send RTSP message and get response
static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, int* error) {
// Send RTSP message and get response over ENet
static int transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE response, int expectingPayload, int* error) {
ENetEvent event;
char* serializedMessage;
int messageLen;
int offset;
ENetPacket* packet;
char* payload;
int payloadLength;
int ret;
// We're going to handle the payload separately, so temporarily set the payload to NULL
payload = request->payload;
payloadLength = request->payloadLength;
request->payload = NULL;
request->payloadLength = 0;
// Serialize the RTSP message into a message buffer
serializedMessage = serializeRtspMessage(request, &messageLen);
if (serializedMessage == NULL) {
ret = 0;
goto Exit;
}
// Create the reliable packet that describes our outgoing message
packet = enet_packet_create(serializedMessage, messageLen, ENET_PACKET_FLAG_RELIABLE);
if (packet == NULL) {
ret = 0;
goto Exit;
}
// Send the message
enet_peer_send(peer, 0, packet);
enet_host_flush(client);
// If we have a payload to send, we'll need to send that separately
if (payload != NULL) {
packet = enet_packet_create(payload, payloadLength, ENET_PACKET_FLAG_RELIABLE);
if (packet == NULL) {
ret = 0;
goto Exit;
}
// Send the payload
enet_peer_send(peer, 0, packet);
enet_host_flush(client);
}
// Wait for a reply
if (enet_host_service(client, &event, RTSP_TIMEOUT_SEC * 1000) <= 0 ||
event.type != ENET_EVENT_TYPE_RECEIVE) {
Limelog("Failed to receive RTSP reply\n");
ret = 0;
goto Exit;
}
if (event.packet->dataLength > RTSP_MAX_RESP_SIZE) {
Limelog("RTSP message too long\n");
ret = 0;
goto Exit;
}
// Copy the data out and destroy the packet
memcpy(responseBuffer, event.packet->data, event.packet->dataLength);
offset = event.packet->dataLength;
enet_packet_destroy(event.packet);
// Wait for the payload if we're expecting some
if (expectingPayload) {
// Only wait 1 second since the packets should be here immediately
// after the header.
if (enet_host_service(client, &event, 1000) <= 0 ||
event.type != ENET_EVENT_TYPE_RECEIVE) {
Limelog("Failed to receive RTSP reply payload\n");
ret = 0;
goto Exit;
}
if (event.packet->dataLength + offset > RTSP_MAX_RESP_SIZE) {
Limelog("RTSP message payload too long\n");
ret = 0;
goto Exit;
}
// Copy the payload out to the end of the response buffer and destroy the packet
memcpy(&responseBuffer[offset], event.packet->data, event.packet->dataLength);
offset += event.packet->dataLength;
enet_packet_destroy(event.packet);
}
if (parseRtspMessage(response, responseBuffer, offset) == RTSP_ERROR_SUCCESS) {
// Successfully parsed response
ret = 1;
}
else {
Limelog("Failed to parse RTSP response\n");
ret = 0;
}
Exit:
// Swap back the payload pointer to avoid leaking memory later
request->payload = payload;
request->payloadLength = payloadLength;
// Free the serialized buffer
if (serializedMessage != NULL) {
free(serializedMessage);
}
return ret;
}
// Send RTSP message and get response over TCP
static int transactRtspMessageTcp(PRTSP_MESSAGE request, PRTSP_MESSAGE response, int expectingPayload, int* error) {
SOCK_RET err;
int ret = 0;
int offset;
@ -146,12 +263,32 @@ Exit:
return ret;
}
static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, int expectingPayload, int* error) {
// Gen 5+ does RTSP over ENet not TCP
if (ServerMajorVersion >= 5) {
return transactRtspMessageEnet(request, response, expectingPayload, error);
}
else {
return transactRtspMessageTcp(request, response, expectingPayload, error);
}
}
// Terminate the RTSP Handshake process by shutting down the socket.
// The thread waiting on RTSP will close the socket.
void terminateRtspHandshake(void) {
if (sock != INVALID_SOCKET) {
shutdownTcpSocket(sock);
}
if (peer != NULL) {
enet_peer_reset(peer);
peer = NULL;
}
if (client != NULL) {
enet_host_destroy(client);
client = NULL;
}
}
// Send RTSP OPTIONS request
@ -163,7 +300,7 @@ static int requestOptions(PRTSP_MESSAGE response, int* error) {
ret = initializeRtspRequest(&request, "OPTIONS", rtspTargetUrl);
if (ret != 0) {
ret = transactRtspMessage(&request, response, error);
ret = transactRtspMessage(&request, response, 0, error);
freeMessage(&request);
}
@ -183,7 +320,7 @@ static int requestDescribe(PRTSP_MESSAGE response, int* error) {
"application/sdp") &&
addOption(&request, "If-Modified-Since",
"Thu, 01 Jan 1970 00:00:00 GMT")) {
ret = transactRtspMessage(&request, response, error);
ret = transactRtspMessage(&request, response, 1, error);
}
else {
ret = 0;
@ -213,7 +350,7 @@ static int setupStream(PRTSP_MESSAGE response, char* target, int* error) {
if (addOption(&request, "Transport", " ") &&
addOption(&request, "If-Modified-Since",
"Thu, 01 Jan 1970 00:00:00 GMT")) {
ret = transactRtspMessage(&request, response, error);
ret = transactRtspMessage(&request, response, 0, error);
}
else {
ret = 0;
@ -236,7 +373,7 @@ static int playStream(PRTSP_MESSAGE response, char* target, int* error) {
ret = initializeRtspRequest(&request, "PLAY", target);
if (ret != 0) {
if (addOption(&request, "Session", sessionIdString)) {
ret = transactRtspMessage(&request, response, error);
ret = transactRtspMessage(&request, response, 0, error);
}
else {
ret = 0;
@ -277,7 +414,7 @@ static int sendVideoAnnounce(PRTSP_MESSAGE response, int* error) {
goto FreeMessage;
}
ret = transactRtspMessage(&request, response, error);
ret = transactRtspMessage(&request, response, 0, error);
FreeMessage:
freeMessage(&request);
@ -305,6 +442,37 @@ int performRtspHandshake(void) {
else {
rtspClientVersion = 12;
}
// Gen 5 servers use ENet to do the RTSP handshake
if (ServerMajorVersion >= 5) {
ENetAddress address;
ENetEvent event;
// Create a client that can use 1 outgoing connection and 1 channel
client = enet_host_create(NULL, 1, 1, 0, 0);
if (client == NULL) {
return -1;
}
enet_address_set_host(&address, RemoteAddrString);
address.port = 48010;
// Connect to the host
peer = enet_host_connect(client, &address, 1, 0);
if (peer == NULL) {
return -1;
}
// Wait for the connect to complete
if (enet_host_service(client, &event, RTSP_TIMEOUT_SEC * 1000) <= 0 ||
event.type != ENET_EVENT_TYPE_CONNECT) {
Limelog("RTSP: Failed to connect to UDP port 48010\n");
return -1;
}
// Ensure the connect verify ACK is sent immediately
enet_host_flush(client);
}
{
RTSP_MESSAGE response;
@ -454,6 +622,19 @@ int performRtspHandshake(void) {
freeMessage(&response);
}
// Cleanup the ENet stuff
if (ServerMajorVersion >= 5) {
if (peer != NULL) {
enet_peer_reset(peer);
peer = NULL;
}
if (client != NULL) {
enet_host_destroy(client);
client = NULL;
}
}
return 0;
}

View File

@ -174,7 +174,13 @@ int parseRtspMessage(PRTSP_MESSAGE msg, char* rtspMessage, int length) {
endCheck = &token[0] + strlen(token) + 1;
// See if we've hit the end of the message. The first \r is missing because it's been tokenized
if (startsWith(endCheck, "\n\r\n")) {
if (startsWith(endCheck, "\n") && endCheck[1] == '\0') {
// RTSP over ENet doesn't always have the second CRLF for some reason
messageEnded = 1;
break;
}
else if (startsWith(endCheck, "\n\r\n")) {
// We've encountered the end of the message - mark it thus
messageEnded = 1;