Reorder audio initialization and RTSP handshake to avoid RTSP PLAY failure on GFE 3.22

This commit is contained in:
Cameron Gutman 2021-04-09 08:38:40 -05:00
parent 2c13835f32
commit d0c3513504
5 changed files with 80 additions and 89 deletions

View File

@ -37,33 +37,6 @@ typedef struct _QUEUED_AUDIO_PACKET {
} q;
} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET;
// Initialize the audio stream
void initializeAudioStream(void) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME);
lastSeq = 0;
receivedDataFromPeer = false;
}
static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
entry = nextEntry;
}
}
// Tear down the audio stream once we're done with it
void destroyAudioStream(void) {
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
RtpqCleanupQueue(&rtpReorderQueue);
}
static void UdpPingThreadProc(void* context) {
// Ping in ASCII
char pingData[] = { 0x50, 0x49, 0x4E, 0x47 };
@ -86,6 +59,59 @@ static void UdpPingThreadProc(void* context) {
}
}
// Initialize the audio stream and start
int initializeAudioStream(void) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME);
lastSeq = 0;
receivedDataFromPeer = false;
// For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake.
// It will not reply to our RTSP PLAY request until the audio ping has been received.
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_BUFFER);
if (rtpSocket == INVALID_SOCKET) {
return LastSocketFail();
}
// We may receive audio before our threads are started, but that's okay. We'll
// drop the first 1 second of audio packets to catch up with the backlog.
int err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
closeSocket(rtpSocket);
return err;
}
return 0;
}
static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
entry = nextEntry;
}
}
// Tear down the audio stream once we're done with it
void destroyAudioStream(void) {
if (rtpSocket != INVALID_SOCKET) {
PltInterruptThread(&udpPingThread);
PltJoinThread(&udpPingThread);
PltCloseThread(&udpPingThread);
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
RtpqCleanupQueue(&rtpReorderQueue);
}
static bool queuePacketToLbq(PQUEUED_AUDIO_PACKET* packet) {
int err;
@ -268,7 +294,6 @@ void stopAudioStream(void) {
AudioCallbacks.stop();
PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Signal threads waiting on the LBQ
@ -276,23 +301,16 @@ void stopAudioStream(void) {
PltInterruptThread(&decoderThread);
}
PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&udpPingThread);
PltCloseThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltCloseThread(&decoderThread);
}
if (rtpSocket != INVALID_SOCKET) {
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
AudioCallbacks.cleanup();
}
@ -319,13 +337,6 @@ int startAudioStream(void* audioContext, int arFlags) {
return err;
}
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_BUFFER);
if (rtpSocket == INVALID_SOCKET) {
err = LastSocketFail();
AudioCallbacks.cleanup();
return err;
}
AudioCallbacks.start();
err = PltCreateThread("AudioRecv", ReceiveThreadProc, NULL, &receiveThread);
@ -349,32 +360,6 @@ int startAudioStream(void* audioContext, int arFlags) {
}
}
// Don't start pinging (which will cause GFE to start sending us traffic)
// until everything else is started. Otherwise we could accumulate a
// bunch of audio packets in the socket receive buffer while our audio
// backend is starting up and create audio latency.
err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
AudioCallbacks.stop();
PltInterruptThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Signal threads waiting on the LBQ
LbqSignalQueueShutdown(&packetQueue);
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltCloseThread(&decoderThread);
}
closeSocket(rtpSocket);
AudioCallbacks.cleanup();
return err;
}
return 0;
}

View File

@ -30,10 +30,10 @@ static const char* stageNames[STAGE_MAX] = {
"none",
"platform initialization",
"name resolution",
"audio stream initialization",
"RTSP handshake",
"control stream initialization",
"video stream initialization",
"audio stream initialization",
"input stream initialization",
"control stream establishment",
"video stream establishment",
@ -91,12 +91,6 @@ void LiStopConnection(void) {
stage--;
Limelog("done\n");
}
if (stage == STAGE_AUDIO_STREAM_INIT) {
Limelog("Cleaning up audio stream...");
destroyAudioStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_VIDEO_STREAM_INIT) {
Limelog("Cleaning up video stream...");
destroyVideoStream();
@ -113,6 +107,12 @@ void LiStopConnection(void) {
// Nothing to do
stage--;
}
if (stage == STAGE_AUDIO_STREAM_INIT) {
Limelog("Cleaning up audio stream...");
destroyAudioStream();
stage--;
Limelog("done\n");
}
if (stage == STAGE_NAME_RESOLUTION) {
// Nothing to do
stage--;
@ -289,6 +289,20 @@ int LiStartConnection(PSERVER_INFORMATION serverInfo, PSTREAM_CONFIGURATION stre
}
}
Limelog("Initializing audio stream...");
ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_INIT);
if (initializeAudioStream() != 0) {
if (err != 0) {
Limelog("failed: %d\n", err);
ListenerCallbacks.stageFailed(STAGE_AUDIO_STREAM_INIT, err);
goto Cleanup;
}
}
stage++;
LC_ASSERT(stage == STAGE_AUDIO_STREAM_INIT);
ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_INIT);
Limelog("done\n");
Limelog("Starting RTSP handshake...");
ListenerCallbacks.stageStarting(STAGE_RTSP_HANDSHAKE);
err = performRtspHandshake();
@ -323,14 +337,6 @@ int LiStartConnection(PSERVER_INFORMATION serverInfo, PSTREAM_CONFIGURATION stre
ListenerCallbacks.stageComplete(STAGE_VIDEO_STREAM_INIT);
Limelog("done\n");
Limelog("Initializing audio stream...");
ListenerCallbacks.stageStarting(STAGE_AUDIO_STREAM_INIT);
initializeAudioStream();
stage++;
LC_ASSERT(stage == STAGE_AUDIO_STREAM_INIT);
ListenerCallbacks.stageComplete(STAGE_AUDIO_STREAM_INIT);
Limelog("done\n");
Limelog("Initializing input stream...");
ListenerCallbacks.stageStarting(STAGE_INPUT_STREAM_INIT);
initializeInputStream();

View File

@ -92,7 +92,7 @@ int startVideoStream(void* rendererContext, int drFlags);
void submitFrame(PQUEUED_DECODE_UNIT qdu);
void stopVideoStream(void);
void initializeAudioStream(void);
int initializeAudioStream(void);
void destroyAudioStream(void);
int startAudioStream(void* audioContext, int arFlags);
void stopAudioStream(void);

View File

@ -325,10 +325,10 @@ void LiInitializeAudioCallbacks(PAUDIO_RENDERER_CALLBACKS arCallbacks);
#define STAGE_NONE 0
#define STAGE_PLATFORM_INIT 1
#define STAGE_NAME_RESOLUTION 2
#define STAGE_RTSP_HANDSHAKE 3
#define STAGE_CONTROL_STREAM_INIT 4
#define STAGE_VIDEO_STREAM_INIT 5
#define STAGE_AUDIO_STREAM_INIT 6
#define STAGE_AUDIO_STREAM_INIT 3
#define STAGE_RTSP_HANDSHAKE 4
#define STAGE_CONTROL_STREAM_INIT 5
#define STAGE_VIDEO_STREAM_INIT 6
#define STAGE_INPUT_STREAM_INIT 7
#define STAGE_CONTROL_STREAM_START 8
#define STAGE_VIDEO_STREAM_START 9

View File

@ -855,7 +855,7 @@ int performRtspHandshake(void) {
int error = -1;
if (!playStream(&response, "/", &error)) {
Limelog("RTSP PLAY streamid=video request failed: %d\n", error);
Limelog("RTSP PLAY request failed: %d\n", error);
ret = error;
goto Exit;
}