Improve audio resync logic to use initial receive time rather send time

This commit is contained in:
Cameron Gutman 2021-04-26 22:43:49 -05:00
parent 33c4e98152
commit c1befbe2a8
3 changed files with 33 additions and 20 deletions

View File

@ -18,7 +18,7 @@ static PPLT_CRYPTO_CONTEXT audioDecryptionCtx;
static unsigned short lastSeq; static unsigned short lastSeq;
static bool receivedDataFromPeer; static bool receivedDataFromPeer;
static uint64_t pingStartTime; static uint64_t firstReceiveTime;
#define RTP_PORT 48000 #define RTP_PORT 48000
@ -49,7 +49,7 @@ static void UdpPingThreadProc(void* context) {
memcpy(&saddr, &RemoteAddr, sizeof(saddr)); memcpy(&saddr, &RemoteAddr, sizeof(saddr));
SET_PORT(&saddr, RTP_PORT); SET_PORT(&saddr, RTP_PORT);
// Send PING every second until we get data back then every 5 seconds after that. // Send PING every 500 milliseconds
while (!PltIsThreadInterrupted(&udpPingThread)) { while (!PltIsThreadInterrupted(&udpPingThread)) {
err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen); err = sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen);
if (err != sizeof(pingData)) { if (err != sizeof(pingData)) {
@ -58,6 +58,13 @@ static void UdpPingThreadProc(void* context) {
return; return;
} }
if (firstReceiveTime == 0 && isSocketReadable(rtpSocket)) {
// Remember the time when we got our first incoming audio packet.
// We will need to adjust for the delay between this event and
// when the real receive thread is ready to avoid falling behind.
firstReceiveTime = PltGetMillis();
}
PltSleepMsInterruptible(&udpPingThread, 500); PltSleepMsInterruptible(&udpPingThread, 500);
} }
} }
@ -68,6 +75,7 @@ int initializeAudioStream(void) {
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME); RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME);
lastSeq = 0; lastSeq = 0;
receivedDataFromPeer = false; receivedDataFromPeer = false;
firstReceiveTime = 0;
audioDecryptionCtx = PltCreateCryptoContext(); audioDecryptionCtx = PltCreateCryptoContext();
// For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake. // For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake.
@ -77,15 +85,6 @@ int initializeAudioStream(void) {
return LastSocketFail(); return LastSocketFail();
} }
// Track the time we started pinging. Audio samples will begin arriving
// shortly after the first ping reaches the host. However, we won't be
// ready to start playback until later on. As a result, we'll drop samples
// equivalent to the amount of time before the receive thread is ready
// to accept traffic in real-time.
// FIXME: This could also take into account the size of the recv buffer,
// since long RTSP handshakes could cause that to fill to capacity.
pingStartTime = PltGetMillis();
// We may receive audio before our threads are started, but that's okay. We'll // We may receive audio before our threads are started, but that's okay. We'll
// drop the first 1 second of audio packets to catch up with the backlog. // drop the first 1 second of audio packets to catch up with the backlog.
int err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread); int err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread);
@ -195,16 +194,11 @@ static void ReceiveThreadProc(void* context) {
PQUEUED_AUDIO_PACKET packet; PQUEUED_AUDIO_PACKET packet;
int queueStatus; int queueStatus;
bool useSelect; bool useSelect;
int initialResyncDelay = PltGetMillis() - pingStartTime; uint32_t packetsToDrop;
int packetsToDrop;
int waitingForAudioMs; int waitingForAudioMs;
// Cap delay at 3 seconds to account for recv buffer cap
initialResyncDelay = initialResyncDelay > 3000 ? 3000 : initialResyncDelay;
packetsToDrop = initialResyncDelay / AudioPacketDuration;
Limelog("Initial audio resync period: %d milliseconds\n", initialResyncDelay);
packet = NULL; packet = NULL;
packetsToDrop = 0;
if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) { if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) {
// SO_RCVTIMEO failed, so use select() to wait // SO_RCVTIMEO failed, so use select() to wait
@ -259,11 +253,15 @@ static void ReceiveThreadProc(void* context) {
if (!receivedDataFromPeer) { if (!receivedDataFromPeer) {
receivedDataFromPeer = true; receivedDataFromPeer = true;
Limelog("Received first audio packet after %d ms\n", waitingForAudioMs); Limelog("Received first audio packet after %d ms\n", waitingForAudioMs);
if (firstReceiveTime != 0) {
packetsToDrop = (PltGetMillis() - firstReceiveTime) / AudioPacketDuration;
Limelog("Initial audio resync period: %d milliseconds\n", packetsToDrop * AudioPacketDuration);
}
} }
// GFE accumulates audio samples before we are ready to receive them, so // GFE accumulates audio samples before we are ready to receive them, so
// we will drop the first 1 second of packets to avoid accumulating latency // we will drop the ones that arrived before the receive thread was ready.
// by sending audio frames to the player faster than they can be played.
if (packetsToDrop > 0) { if (packetsToDrop > 0) {
packetsToDrop--; packetsToDrop--;
continue; continue;

View File

@ -150,6 +150,20 @@ int pollSockets(struct pollfd* pollFds, int pollFdsCount, int timeoutMs) {
#endif #endif
} }
bool isSocketReadable(SOCKET s) {
struct pollfd pfd;
int err;
pfd.fd = s;
pfd.events = POLLIN;
err = pollSockets(&pfd, 1, 0);
if (err <= 0) {
return false;
}
return true;
}
int recvUdpSocket(SOCKET s, char* buffer, int size, bool useSelect) { int recvUdpSocket(SOCKET s, char* buffer, int size, bool useSelect) {
int err; int err;

View File

@ -117,6 +117,7 @@ void setRecvTimeout(SOCKET s, int timeoutSec);
void closeSocket(SOCKET s); void closeSocket(SOCKET s);
bool isPrivateNetworkAddress(struct sockaddr_storage* address); bool isPrivateNetworkAddress(struct sockaddr_storage* address);
int pollSockets(struct pollfd* pollFds, int pollFdsCount, int timeoutMs); int pollSockets(struct pollfd* pollFds, int pollFdsCount, int timeoutMs);
bool isSocketReadable(SOCKET s);
#define TCP_PORT_MASK 0xFFFF #define TCP_PORT_MASK 0xFFFF
#define TCP_PORT_FLAG_ALWAYS_TEST 0x10000 #define TCP_PORT_FLAG_ALWAYS_TEST 0x10000