#include "Limelight-internal.h" static SOCKET rtpSocket = INVALID_SOCKET; static LINKED_BLOCKING_QUEUE packetQueue; static RTP_AUDIO_QUEUE rtpAudioQueue; static PLT_THREAD udpPingThread; static PLT_THREAD receiveThread; static PLT_THREAD decoderThread; static PPLT_CRYPTO_CONTEXT audioDecryptionCtx; static uint32_t avRiKeyId; static unsigned short lastSeq; static bool pingThreadStarted; static bool receivedDataFromPeer; static uint64_t firstReceiveTime; #ifdef LC_DEBUG #define INVALID_OPUS_HEADER 0x00 static uint8_t opusHeaderByte; #endif #define MAX_PACKET_SIZE 1400 // This is much larger than we should typically have buffered, but // it needs to be. We need a cushion in case our thread gets blocked // for longer than normal. #define RTP_RECV_BUFFER (64 * 1024) typedef struct _QUEUE_AUDIO_PACKET_HEADER { LINKED_BLOCKING_QUEUE_ENTRY lentry; int size; } QUEUED_AUDIO_PACKET_HEADER, *PQUEUED_AUDIO_PACKET_HEADER; typedef struct _QUEUED_AUDIO_PACKET { QUEUED_AUDIO_PACKET_HEADER header; char data[MAX_PACKET_SIZE]; } QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET; static void AudioPingThreadProc(void* context) { // Ping in ASCII char pingData[] = { 0x50, 0x49, 0x4E, 0x47 }; LC_SOCKADDR saddr; LC_ASSERT(AudioPortNumber != 0); memcpy(&saddr, &RemoteAddr, sizeof(saddr)); SET_PORT(&saddr, AudioPortNumber); // Send PING every 500 milliseconds while (!PltIsThreadInterrupted(&udpPingThread)) { // We do not check for errors here. Socket errors will be handled // on the read-side in ReceiveThreadProc(). This avoids potential // issues related to receiving ICMP port unreachable messages due // to sending a packet prior to the host PC binding to that port. sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen); if (firstReceiveTime == 0 && isSocketReadable(rtpSocket)) { // Remember the time when we got our first incoming audio packet. // We will need to adjust for the delay between this event and // when the real receive thread is ready to avoid falling behind. firstReceiveTime = PltGetMillis(); } PltSleepMsInterruptible(&udpPingThread, 500); } } // Initialize the audio stream and start int initializeAudioStream(void) { LbqInitializeLinkedBlockingQueue(&packetQueue, 30); RtpaInitializeQueue(&rtpAudioQueue); lastSeq = 0; receivedDataFromPeer = false; pingThreadStarted = false; firstReceiveTime = 0; audioDecryptionCtx = PltCreateCryptoContext(); #ifdef LC_DEBUG opusHeaderByte = INVALID_OPUS_HEADER; #endif // Copy and byte-swap the AV RI key ID used for the audio encryption IV memcpy(&avRiKeyId, StreamConfig.remoteInputAesIv, sizeof(avRiKeyId)); avRiKeyId = BE32(avRiKeyId); // For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake. // It will not reply to our RTSP PLAY request until the audio ping has been received. rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_BUFFER); if (rtpSocket == INVALID_SOCKET) { return LastSocketFail(); } return 0; } // This is called when the RTSP SETUP message is parsed and the audio port // number is parsed out of it. Alternatively, it's also called if parsing fails // and will use the well known audio port instead. int notifyAudioPortNegotiationComplete(void) { LC_ASSERT(!pingThreadStarted); LC_ASSERT(AudioPortNumber != 0); // We may receive audio before our threads are started, but that's okay. We'll // drop the first 1 second of audio packets to catch up with the backlog. int err = PltCreateThread("AudioPing", AudioPingThreadProc, NULL, &udpPingThread); if (err != 0) { return err; } pingThreadStarted = true; return 0; } static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) { PLINKED_BLOCKING_QUEUE_ENTRY nextEntry; while (entry != NULL) { nextEntry = entry->flink; // The entry is stored within the data allocation free(entry->data); entry = nextEntry; } } // Tear down the audio stream once we're done with it void destroyAudioStream(void) { if (rtpSocket != INVALID_SOCKET) { if (pingThreadStarted) { PltInterruptThread(&udpPingThread); PltJoinThread(&udpPingThread); PltCloseThread(&udpPingThread); } closeSocket(rtpSocket); rtpSocket = INVALID_SOCKET; } PltDestroyCryptoContext(audioDecryptionCtx); freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue)); RtpaCleanupQueue(&rtpAudioQueue); } static bool queuePacketToLbq(PQUEUED_AUDIO_PACKET* packet) { int err; do { err = LbqOfferQueueItem(&packetQueue, *packet, &(*packet)->header.lentry); if (err == LBQ_SUCCESS) { // The LBQ owns the buffer now *packet = NULL; } else if (err == LBQ_BOUND_EXCEEDED) { Limelog("Audio packet queue overflow\n"); // The audio queue is full, so free all existing items and try again freePacketList(LbqFlushQueueItems(&packetQueue)); } } while (err == LBQ_BOUND_EXCEEDED); return err == LBQ_SUCCESS; } static void decodeInputData(PQUEUED_AUDIO_PACKET packet) { // If the packet size is zero, this is a placeholder for a missing // packet. Trigger packet loss concealment logic in libopus by // invoking the decoder with a NULL buffer. if (packet->header.size == 0) { AudioCallbacks.decodeAndPlaySample(NULL, 0); return; } PRTP_PACKET rtp = (PRTP_PACKET)&packet->data[0]; if (lastSeq != 0 && (unsigned short)(lastSeq + 1) != rtp->sequenceNumber) { Limelog("Network dropped audio data (expected %d, but received %d)\n", lastSeq + 1, rtp->sequenceNumber); } lastSeq = rtp->sequenceNumber; if (AudioEncryptionEnabled) { // We must have room for the AES padding which may be written to the buffer unsigned char decryptedOpusData[ROUND_TO_PKCS7_PADDED_LEN(MAX_PACKET_SIZE)]; unsigned char iv[16] = { 0 }; int dataLength = packet->header.size - sizeof(*rtp); LC_ASSERT(dataLength <= MAX_PACKET_SIZE); // The IV is the avkeyid (equivalent to the rikeyid) + // the RTP sequence number, in big endian. uint32_t ivSeq = BE32(avRiKeyId + rtp->sequenceNumber); memcpy(iv, &ivSeq, sizeof(ivSeq)); if (!PltDecryptMessage(audioDecryptionCtx, ALGORITHM_AES_CBC, CIPHER_FLAG_RESET_IV | CIPHER_FLAG_FINISH, (unsigned char*)StreamConfig.remoteInputAesKey, sizeof(StreamConfig.remoteInputAesKey), iv, sizeof(iv), NULL, 0, (unsigned char*)(rtp + 1), dataLength, decryptedOpusData, &dataLength)) { Limelog("Failed to decrypt audio packet (sequence number: %u)\n", rtp->sequenceNumber); LC_ASSERT(false); return; } #ifdef LC_DEBUG if (opusHeaderByte == INVALID_OPUS_HEADER) { opusHeaderByte = decryptedOpusData[0]; LC_ASSERT(opusHeaderByte != INVALID_OPUS_HEADER); } else { // Opus header should stay constant for the entire stream. // If it doesn't, it may indicate that the RtpAudioQueue // incorrectly recovered a data shard or the decryption // of the audio packet failed. LC_ASSERT(decryptedOpusData[0] == opusHeaderByte); } #endif AudioCallbacks.decodeAndPlaySample((char*)decryptedOpusData, dataLength); } else { #ifdef LC_DEBUG if (opusHeaderByte == INVALID_OPUS_HEADER) { opusHeaderByte = ((uint8_t*)(rtp + 1))[0]; LC_ASSERT(opusHeaderByte != INVALID_OPUS_HEADER); } else { // Opus header should stay constant for the entire stream. // If it doesn't, it may indicate that the RtpAudioQueue // incorrectly recovered a data shard. LC_ASSERT(((uint8_t*)(rtp + 1))[0] == opusHeaderByte); } #endif AudioCallbacks.decodeAndPlaySample((char*)(rtp + 1), packet->header.size - sizeof(*rtp)); } } static void AudioReceiveThreadProc(void* context) { PRTP_PACKET rtp; PQUEUED_AUDIO_PACKET packet; int queueStatus; bool useSelect; uint32_t packetsToDrop; int waitingForAudioMs; packet = NULL; packetsToDrop = 500 / AudioPacketDuration; if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) { // SO_RCVTIMEO failed, so use select() to wait useSelect = true; } else { // SO_RCVTIMEO timeout set for recv() useSelect = false; } waitingForAudioMs = 0; while (!PltIsThreadInterrupted(&receiveThread)) { if (packet == NULL) { packet = (PQUEUED_AUDIO_PACKET)malloc(sizeof(*packet)); if (packet == NULL) { Limelog("Audio Receive: malloc() failed\n"); ListenerCallbacks.connectionTerminated(-1); break; } } packet->header.size = recvUdpSocket(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, useSelect); if (packet->header.size < 0) { Limelog("Audio Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); break; } else if (packet->header.size == 0) { // Receive timed out; try again if (!receivedDataFromPeer) { waitingForAudioMs += UDP_RECV_POLL_TIMEOUT_MS; } else { // If we hit this path, there are no queued audio packets on the host PC, // so we don't need to drop anything. packetsToDrop = 0; } continue; } if (packet->header.size < (int)sizeof(RTP_PACKET)) { // Runt packet continue; } rtp = (PRTP_PACKET)&packet->data[0]; if (!receivedDataFromPeer) { receivedDataFromPeer = true; Limelog("Received first audio packet after %d ms\n", waitingForAudioMs); if (firstReceiveTime != 0) { packetsToDrop += (uint32_t)(PltGetMillis() - firstReceiveTime) / AudioPacketDuration; } Limelog("Initial audio resync period: %d milliseconds\n", packetsToDrop * AudioPacketDuration); } // GFE accumulates audio samples before we are ready to receive them, so // we will drop the ones that arrived before the receive thread was ready. if (packetsToDrop > 0) { // Only count actual audio data (not FEC) in the packets to drop calculation if (rtp->packetType == 97) { packetsToDrop--; } continue; } // Convert fields to host byte-order rtp->sequenceNumber = BE16(rtp->sequenceNumber); rtp->timestamp = BE32(rtp->timestamp); rtp->ssrc = BE32(rtp->ssrc); queueStatus = RtpaAddPacket(&rtpAudioQueue, (PRTP_PACKET)&packet->data[0], (uint16_t)packet->header.size); if (RTPQ_HANDLE_NOW(queueStatus)) { if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if (!queuePacketToLbq(&packet)) { // An exit signal was received break; } else { // Ownership should have been taken by the LBQ LC_ASSERT(packet == NULL); } } else { decodeInputData(packet); } } else { if (RTPQ_PACKET_CONSUMED(queueStatus)) { // The queue consumed our packet, so we must allocate a new one packet = NULL; } if (RTPQ_PACKET_READY(queueStatus)) { // If packets are ready, pull them and send them to the decoder uint16_t length; PQUEUED_AUDIO_PACKET queuedPacket; while ((queuedPacket = (PQUEUED_AUDIO_PACKET)RtpaGetQueuedPacket(&rtpAudioQueue, sizeof(QUEUED_AUDIO_PACKET_HEADER), &length)) != NULL) { // Populate header data (not preserved in queued packets) queuedPacket->header.size = length; if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { if (!queuePacketToLbq(&queuedPacket)) { // An exit signal was received free(queuedPacket); break; } else { // Ownership should have been taken by the LBQ LC_ASSERT(queuedPacket == NULL); } } else { decodeInputData(queuedPacket); free(queuedPacket); } } // Break on exit if (queuedPacket != NULL) { break; } } } } if (packet != NULL) { free(packet); } } static void AudioDecoderThreadProc(void* context) { int err; PQUEUED_AUDIO_PACKET packet; while (!PltIsThreadInterrupted(&decoderThread)) { err = LbqWaitForQueueElement(&packetQueue, (void**)&packet); if (err != LBQ_SUCCESS) { // An exit signal was received return; } decodeInputData(packet); free(packet); } } void stopAudioStream(void) { if (!receivedDataFromPeer) { Limelog("No audio traffic was ever received from the host!\n"); } AudioCallbacks.stop(); PltInterruptThread(&receiveThread); if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { // Signal threads waiting on the LBQ LbqSignalQueueShutdown(&packetQueue); PltInterruptThread(&decoderThread); } PltJoinThread(&receiveThread); if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { PltJoinThread(&decoderThread); } PltCloseThread(&receiveThread); if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { PltCloseThread(&decoderThread); } AudioCallbacks.cleanup(); } int startAudioStream(void* audioContext, int arFlags) { int err; OPUS_MULTISTREAM_CONFIGURATION chosenConfig; if (HighQualitySurroundEnabled) { LC_ASSERT(HighQualitySurroundSupported); LC_ASSERT(HighQualityOpusConfig.channelCount != 0); LC_ASSERT(HighQualityOpusConfig.streams != 0); chosenConfig = HighQualityOpusConfig; } else { LC_ASSERT(NormalQualityOpusConfig.channelCount != 0); LC_ASSERT(NormalQualityOpusConfig.streams != 0); chosenConfig = NormalQualityOpusConfig; } chosenConfig.samplesPerFrame = 48 * AudioPacketDuration; err = AudioCallbacks.init(StreamConfig.audioConfiguration, &chosenConfig, audioContext, arFlags); if (err != 0) { return err; } AudioCallbacks.start(); err = PltCreateThread("AudioRecv", AudioReceiveThreadProc, NULL, &receiveThread); if (err != 0) { AudioCallbacks.stop(); closeSocket(rtpSocket); AudioCallbacks.cleanup(); return err; } if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { err = PltCreateThread("AudioDec", AudioDecoderThreadProc, NULL, &decoderThread); if (err != 0) { AudioCallbacks.stop(); PltInterruptThread(&receiveThread); PltJoinThread(&receiveThread); PltCloseThread(&receiveThread); closeSocket(rtpSocket); AudioCallbacks.cleanup(); return err; } } return 0; } int LiGetPendingAudioFrames(void) { return LbqGetItemCount(&packetQueue); } int LiGetPendingAudioDuration(void) { return LiGetPendingAudioFrames() * AudioPacketDuration; }