Add LiInterruptConnection() to interrupt LiStartConnection()

This commit is contained in:
Cameron Gutman
2017-05-21 12:24:22 -07:00
parent 9ebb429f66
commit 92951e1309
6 changed files with 63 additions and 57 deletions
+13 -3
View File
@@ -17,6 +17,7 @@ CONNECTION_LISTENER_CALLBACKS ListenerCallbacks;
DECODER_RENDERER_CALLBACKS VideoCallbacks; DECODER_RENDERER_CALLBACKS VideoCallbacks;
AUDIO_RENDERER_CALLBACKS AudioCallbacks; AUDIO_RENDERER_CALLBACKS AudioCallbacks;
int NegotiatedVideoFormat; int NegotiatedVideoFormat;
volatile int ConnectionInterrupted;
// Connection stages // Connection stages
static const char* stageNames[STAGE_MAX] = { static const char* stageNames[STAGE_MAX] = {
@@ -39,11 +40,21 @@ const char* LiGetStageName(int stage) {
return stageNames[stage]; return stageNames[stage];
} }
// Interrupt a pending connection attempt. This interruption happens asynchronously
// so it is not safe to start another connection before LiStartConnection() returns.
void LiInterruptConnection(void) {
// Signal anyone waiting on the global interrupted flag
ConnectionInterrupted = 1;
}
// Stop the connection by undoing the step at the current stage and those before it // Stop the connection by undoing the step at the current stage and those before it
void LiStopConnection(void) { void LiStopConnection(void) {
// Disable termination callbacks now // Disable termination callbacks now
alreadyTerminated = 1; alreadyTerminated = 1;
// Set the interrupted flag
LiInterruptConnection();
if (stage == STAGE_INPUT_STREAM_START) { if (stage == STAGE_INPUT_STREAM_START) {
Limelog("Stopping input stream..."); Limelog("Stopping input stream...");
stopInputStream(); stopInputStream();
@@ -93,10 +104,8 @@ void LiStopConnection(void) {
Limelog("done\n"); Limelog("done\n");
} }
if (stage == STAGE_RTSP_HANDSHAKE) { if (stage == STAGE_RTSP_HANDSHAKE) {
Limelog("Terminating RTSP handshake..."); // Nothing to do
terminateRtspHandshake();
stage--; stage--;
Limelog("done\n");
} }
if (stage == STAGE_NAME_RESOLUTION) { if (stage == STAGE_NAME_RESOLUTION) {
// Nothing to do // Nothing to do
@@ -236,6 +245,7 @@ int LiStartConnection(PSERVER_INFORMATION serverInfo, PSTREAM_CONFIGURATION stre
ListenerCallbacks.connectionTerminated = ClInternalConnectionTerminated; ListenerCallbacks.connectionTerminated = ClInternalConnectionTerminated;
alreadyTerminated = 0; alreadyTerminated = 0;
ConnectionInterrupted = 0;
Limelog("Initializing platform..."); Limelog("Initializing platform...");
ListenerCallbacks.stageStarting(STAGE_PLATFORM_INIT); ListenerCallbacks.stageStarting(STAGE_PLATFORM_INIT);
+24 -34
View File
@@ -289,10 +289,20 @@ static int sendMessageEnet(short ptype, short paylen, const void* payload) {
int err; int err;
LC_ASSERT(AppVersionQuad[0] >= 5); LC_ASSERT(AppVersionQuad[0] >= 5);
// Gen 5+ servers do control protocol over ENet instead of TCP
while ((err = serviceEnetHost(client, &event, 0)) > 0) {
if (event.type == ENET_EVENT_TYPE_RECEIVE) {
enet_packet_destroy(event.packet);
}
else if (event.type == ENET_EVENT_TYPE_DISCONNECT) {
Limelog("Control stream received disconnect event\n");
return 0;
}
}
// We may be trying to disconnect, so our peer could be gone. if (err < 0) {
// This check is safe because we're guaranteed to be holding enetMutex. Limelog("Control stream connection failed\n");
if (peer == NULL) {
return 0; return 0;
} }
@@ -304,23 +314,6 @@ static int sendMessageEnet(short ptype, short paylen, const void* payload) {
packet->type = ptype; packet->type = ptype;
memcpy(&packet[1], payload, paylen); memcpy(&packet[1], payload, paylen);
// Gen 5+ servers do control protocol over ENet instead of TCP
while ((err = serviceEnetHost(client, &event, 0)) > 0) {
if (event.type == ENET_EVENT_TYPE_RECEIVE) {
enet_packet_destroy(event.packet);
}
else if (event.type == ENET_EVENT_TYPE_DISCONNECT) {
Limelog("Control stream received disconnect event\n");
free(packet);
return 0;
}
}
if (err < 0) {
Limelog("Control stream connection failed\n");
return 0;
}
enetPacket = enet_packet_create(packet, sizeof(*packet) + paylen, ENET_PACKET_FLAG_RELIABLE); enetPacket = enet_packet_create(packet, sizeof(*packet) + paylen, ENET_PACKET_FLAG_RELIABLE);
if (enetPacket == NULL) { if (enetPacket == NULL) {
free(packet); free(packet);
@@ -570,13 +563,9 @@ int stopControlStream(void) {
stopping = 1; stopping = 1;
LbqSignalQueueShutdown(&invalidReferenceFrameTuples); LbqSignalQueueShutdown(&invalidReferenceFrameTuples);
PltSetEvent(&invalidateRefFramesEvent); PltSetEvent(&invalidateRefFramesEvent);
if (peer != NULL) { // This must be set to stop in a timely manner
PltLockMutex(&enetMutex); LC_ASSERT(ConnectionInterrupted);
enet_peer_disconnect_now(peer, 0);
peer = NULL;
PltUnlockMutex(&enetMutex);
}
if (ctlSock != INVALID_SOCKET) { if (ctlSock != INVALID_SOCKET) {
shutdownTcpSocket(ctlSock); shutdownTcpSocket(ctlSock);
@@ -591,6 +580,10 @@ int stopControlStream(void) {
PltCloseThread(&lossStatsThread); PltCloseThread(&lossStatsThread);
PltCloseThread(&invalidateRefFramesThread); PltCloseThread(&invalidateRefFramesThread);
if (peer != NULL) {
enet_peer_reset(peer);
peer = NULL;
}
if (client != NULL) { if (client != NULL) {
enet_host_destroy(client); enet_host_destroy(client);
client = NULL; client = NULL;
@@ -731,13 +724,8 @@ int startControlStream(void) {
if (ctlSock != INVALID_SOCKET) { if (ctlSock != INVALID_SOCKET) {
shutdownTcpSocket(ctlSock); shutdownTcpSocket(ctlSock);
} }
else {
if (peer != NULL) { ConnectionInterrupted = 1;
// We must use the mutex here because we have a live thread now.
PltLockMutex(&enetMutex);
enet_peer_disconnect_now(peer, 0);
peer = NULL;
PltUnlockMutex(&enetMutex);
} }
PltInterruptThread(&lossStatsThread); PltInterruptThread(&lossStatsThread);
@@ -749,6 +737,8 @@ int startControlStream(void) {
ctlSock = INVALID_SOCKET; ctlSock = INVALID_SOCKET;
} }
else { else {
enet_peer_disconnect_now(peer, 0);
peer = NULL;
enet_host_destroy(client); enet_host_destroy(client);
client = NULL; client = NULL;
} }
+1 -1
View File
@@ -18,6 +18,7 @@ extern CONNECTION_LISTENER_CALLBACKS ListenerCallbacks;
extern DECODER_RENDERER_CALLBACKS VideoCallbacks; extern DECODER_RENDERER_CALLBACKS VideoCallbacks;
extern AUDIO_RENDERER_CALLBACKS AudioCallbacks; extern AUDIO_RENDERER_CALLBACKS AudioCallbacks;
extern int NegotiatedVideoFormat; extern int NegotiatedVideoFormat;
extern volatile int ConnectionInterrupted;
int isBeforeSignedInt(int numA, int numB, int ambiguousCase); int isBeforeSignedInt(int numA, int numB, int ambiguousCase);
int serviceEnetHost(ENetHost* client, ENetEvent* event, enet_uint32 timeoutMs); int serviceEnetHost(ENetHost* client, ENetEvent* event, enet_uint32 timeoutMs);
@@ -40,7 +41,6 @@ void connectionLostPackets(int lastReceivedPacket, int nextReceivedPacket);
int sendInputPacketOnControlStream(unsigned char* data, int length); int sendInputPacketOnControlStream(unsigned char* data, int length);
int performRtspHandshake(void); int performRtspHandshake(void);
void terminateRtspHandshake(void);
void initializeVideoDepacketizer(int pktSize); void initializeVideoDepacketizer(int pktSize);
void destroyVideoDepacketizer(void); void destroyVideoDepacketizer(void);
+7 -1
View File
@@ -240,12 +240,18 @@ void LiInitializeServerInformation(PSERVER_INFORMATION serverInfo);
// Callbacks are all optional. Pass NULL for individual callbacks within each struct or pass NULL for the entire struct // Callbacks are all optional. Pass NULL for individual callbacks within each struct or pass NULL for the entire struct
// to use the defaults for all callbacks. // to use the defaults for all callbacks.
// //
// This function is not thread-safe.
//
int LiStartConnection(PSERVER_INFORMATION serverInfo, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks, int LiStartConnection(PSERVER_INFORMATION serverInfo, PSTREAM_CONFIGURATION streamConfig, PCONNECTION_LISTENER_CALLBACKS clCallbacks,
PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags); PDECODER_RENDERER_CALLBACKS drCallbacks, PAUDIO_RENDERER_CALLBACKS arCallbacks, void* renderContext, int drFlags);
// This function stops streaming. // This function stops streaming. This function is not thread-safe.
void LiStopConnection(void); void LiStopConnection(void);
// This function interrupts a pending LiStartConnection() call. This interruption happens asynchronously
// so it is not safe to start another connection before the first LiStartConnection() call returns.
void LiInterruptConnection(void);
// Use to get a user-visible string to display initialization progress // Use to get a user-visible string to display initialization progress
// from the integer passed to the ConnListenerStageXXX callbacks // from the integer passed to the ConnListenerStageXXX callbacks
const char* LiGetStageName(int stage); const char* LiGetStageName(int stage);
+18 -7
View File
@@ -1,20 +1,31 @@
#include "Limelight-internal.h" #include "Limelight-internal.h"
#define ENET_SERVICE_RETRIES 10 #define ENET_INTERNAL_TIMEOUT_MS 100
// This function wraps enet_host_service() and hides the fact that it must be called // This function wraps enet_host_service() and hides the fact that it must be called
// multiple times for retransmissions to work correctly. It is meant to be a drop-in // multiple times for retransmissions to work correctly. It is meant to be a drop-in
// replacement for enet_host_service(). // replacement for enet_host_service(). It also handles cancellation of the connection
// attempt during the wait.
int serviceEnetHost(ENetHost* client, ENetEvent* event, enet_uint32 timeoutMs) { int serviceEnetHost(ENetHost* client, ENetEvent* event, enet_uint32 timeoutMs) {
int i; int ret;
int ret = -1;
// We need to call enet_host_service() multiple times to make sure retransmissions happen // We need to call enet_host_service() multiple times to make sure retransmissions happen
for (i = 0; i < ENET_SERVICE_RETRIES; i++) { for (;;) {
ret = enet_host_service(client, event, timeoutMs / ENET_SERVICE_RETRIES); int selectedTimeout = timeoutMs < ENET_INTERNAL_TIMEOUT_MS ? timeoutMs : ENET_INTERNAL_TIMEOUT_MS;
// We want to report an interrupt event if we are able to read data
if (ConnectionInterrupted) {
Limelog("ENet wait interrupted\n");
ret = -1;
break;
}
ret = enet_host_service(client, event, selectedTimeout);
if (ret != 0 || timeoutMs == 0) { if (ret != 0 || timeoutMs == 0) {
break; break;
} }
timeoutMs -= selectedTimeout;
} }
return ret; return ret;
-11
View File
@@ -281,17 +281,6 @@ static int transactRtspMessage(PRTSP_MESSAGE request, PRTSP_MESSAGE response, in
} }
} }
// Terminate the RTSP Handshake process by shutting down the socket.
// The thread waiting on RTSP will close the socket.
void terminateRtspHandshake(void) {
if (sock != INVALID_SOCKET) {
shutdownTcpSocket(sock);
}
// FIXME: We should try to interrupt ENet here, but we must
// be sure to do it safely. We may need to add a new lock for this.
}
// Send RTSP OPTIONS request // Send RTSP OPTIONS request
static int requestOptions(PRTSP_MESSAGE response, int* error) { static int requestOptions(PRTSP_MESSAGE response, int* error) {
RTSP_MESSAGE request; RTSP_MESSAGE request;