From c1befbe2a82b417ff4239985414126cd4db820d8 Mon Sep 17 00:00:00 2001 From: Cameron Gutman Date: Mon, 26 Apr 2021 22:43:49 -0500 Subject: [PATCH] Improve audio resync logic to use initial receive time rather send time --- src/AudioStream.c | 38 ++++++++++++++++++-------------------- src/PlatformSockets.c | 14 ++++++++++++++ src/PlatformSockets.h | 1 + 3 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/AudioStream.c b/src/AudioStream.c index bae0f9a..e082b82 100644 --- a/src/AudioStream.c +++ b/src/AudioStream.c @@ -18,7 +18,7 @@ static PPLT_CRYPTO_CONTEXT audioDecryptionCtx; static unsigned short lastSeq; static bool receivedDataFromPeer; -static uint64_t pingStartTime; +static uint64_t firstReceiveTime; #define RTP_PORT 48000 @@ -49,7 +49,7 @@ static void UdpPingThreadProc(void* context) { memcpy(&saddr, &RemoteAddr, sizeof(saddr)); SET_PORT(&saddr, RTP_PORT); - // Send PING every second until we get data back then every 5 seconds after that. + // Send PING every 500 milliseconds while (!PltIsThreadInterrupted(&udpPingThread)) { err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen); if (err != sizeof(pingData)) { @@ -58,6 +58,13 @@ static void UdpPingThreadProc(void* context) { return; } + if (firstReceiveTime == 0 && isSocketReadable(rtpSocket)) { + // Remember the time when we got our first incoming audio packet. + // We will need to adjust for the delay between this event and + // when the real receive thread is ready to avoid falling behind. + firstReceiveTime = PltGetMillis(); + } + PltSleepMsInterruptible(&udpPingThread, 500); } } @@ -68,6 +75,7 @@ int initializeAudioStream(void) { RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME); lastSeq = 0; receivedDataFromPeer = false; + firstReceiveTime = 0; audioDecryptionCtx = PltCreateCryptoContext(); // For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake. @@ -77,15 +85,6 @@ int initializeAudioStream(void) { return LastSocketFail(); } - // Track the time we started pinging. Audio samples will begin arriving - // shortly after the first ping reaches the host. However, we won't be - // ready to start playback until later on. As a result, we'll drop samples - // equivalent to the amount of time before the receive thread is ready - // to accept traffic in real-time. - // FIXME: This could also take into account the size of the recv buffer, - // since long RTSP handshakes could cause that to fill to capacity. - pingStartTime = PltGetMillis(); - // We may receive audio before our threads are started, but that's okay. We'll // drop the first 1 second of audio packets to catch up with the backlog. int err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread); @@ -195,16 +194,11 @@ static void ReceiveThreadProc(void* context) { PQUEUED_AUDIO_PACKET packet; int queueStatus; bool useSelect; - int initialResyncDelay = PltGetMillis() - pingStartTime; - int packetsToDrop; + uint32_t packetsToDrop; int waitingForAudioMs; - // Cap delay at 3 seconds to account for recv buffer cap - initialResyncDelay = initialResyncDelay > 3000 ? 3000 : initialResyncDelay; - packetsToDrop = initialResyncDelay / AudioPacketDuration; - Limelog("Initial audio resync period: %d milliseconds\n", initialResyncDelay); - packet = NULL; + packetsToDrop = 0; if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) { // SO_RCVTIMEO failed, so use select() to wait @@ -259,11 +253,15 @@ static void ReceiveThreadProc(void* context) { if (!receivedDataFromPeer) { receivedDataFromPeer = true; Limelog("Received first audio packet after %d ms\n", waitingForAudioMs); + + if (firstReceiveTime != 0) { + packetsToDrop = (PltGetMillis() - firstReceiveTime) / AudioPacketDuration; + Limelog("Initial audio resync period: %d milliseconds\n", packetsToDrop * AudioPacketDuration); + } } // GFE accumulates audio samples before we are ready to receive them, so - // we will drop the first 1 second of packets to avoid accumulating latency - // by sending audio frames to the player faster than they can be played. + // we will drop the ones that arrived before the receive thread was ready. if (packetsToDrop > 0) { packetsToDrop--; continue; diff --git a/src/PlatformSockets.c b/src/PlatformSockets.c index de9bfd4..e1e091c 100644 --- a/src/PlatformSockets.c +++ b/src/PlatformSockets.c @@ -150,6 +150,20 @@ int pollSockets(struct pollfd* pollFds, int pollFdsCount, int timeoutMs) { #endif } +bool isSocketReadable(SOCKET s) { + struct pollfd pfd; + int err; + + pfd.fd = s; + pfd.events = POLLIN; + err = pollSockets(&pfd, 1, 0); + if (err <= 0) { + return false; + } + + return true; +} + int recvUdpSocket(SOCKET s, char* buffer, int size, bool useSelect) { int err; diff --git a/src/PlatformSockets.h b/src/PlatformSockets.h index 071b2fa..89feb41 100644 --- a/src/PlatformSockets.h +++ b/src/PlatformSockets.h @@ -117,6 +117,7 @@ void setRecvTimeout(SOCKET s, int timeoutSec); void closeSocket(SOCKET s); bool isPrivateNetworkAddress(struct sockaddr_storage* address); int pollSockets(struct pollfd* pollFds, int pollFdsCount, int timeoutMs); +bool isSocketReadable(SOCKET s); #define TCP_PORT_MASK 0xFFFF #define TCP_PORT_FLAG_ALWAYS_TEST 0x10000