moonlight-common-c/src/AudioStream.c
2021-04-29 17:21:12 -05:00

422 lines
13 KiB
C

#include "Limelight-internal.h"
#include "PlatformSockets.h"
#include "PlatformThreads.h"
#include "LinkedBlockingQueue.h"
#include "RtpReorderQueue.h"
static SOCKET rtpSocket = INVALID_SOCKET;
static LINKED_BLOCKING_QUEUE packetQueue;
static RTP_REORDER_QUEUE rtpReorderQueue;
static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
static PPLT_CRYPTO_CONTEXT audioDecryptionCtx;
static uint32_t avRiKeyId;
static unsigned short lastSeq;
static bool receivedDataFromPeer;
static uint64_t firstReceiveTime;
#define RTP_PORT 48000
#define MAX_PACKET_SIZE 1400
// This is much larger than we should typically have buffered, but
// it needs to be. We need a cushion in case our thread gets blocked
// for longer than normal.
#define RTP_RECV_BUFFER (64 * 1024)
typedef struct _QUEUED_AUDIO_PACKET {
// data must remain at the front
char data[MAX_PACKET_SIZE];
int size;
union {
RTP_QUEUE_ENTRY rentry;
LINKED_BLOCKING_QUEUE_ENTRY lentry;
} q;
} QUEUED_AUDIO_PACKET, *PQUEUED_AUDIO_PACKET;
static void UdpPingThreadProc(void* context) {
// Ping in ASCII
char pingData[] = { 0x50, 0x49, 0x4E, 0x47 };
LC_SOCKADDR saddr;
memcpy(&saddr, &RemoteAddr, sizeof(saddr));
SET_PORT(&saddr, RTP_PORT);
// Send PING every 500 milliseconds
while (!PltIsThreadInterrupted(&udpPingThread)) {
// We do not check for errors here. Socket errors will be handled
// on the read-side in ReceiveThreadProc(). This avoids potential
// issues related to receiving ICMP port unreachable messages due
// to sending a packet prior to the host PC binding to that port.
sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen);
if (firstReceiveTime == 0 && isSocketReadable(rtpSocket)) {
// Remember the time when we got our first incoming audio packet.
// We will need to adjust for the delay between this event and
// when the real receive thread is ready to avoid falling behind.
firstReceiveTime = PltGetMillis();
}
PltSleepMsInterruptible(&udpPingThread, 500);
}
}
// Initialize the audio stream and start
int initializeAudioStream(void) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFAULT_QUEUE_TIME);
lastSeq = 0;
receivedDataFromPeer = false;
firstReceiveTime = 0;
audioDecryptionCtx = PltCreateCryptoContext();
// Copy and byte-swap the AV RI key ID used for the audio encryption IV
memcpy(&avRiKeyId, StreamConfig.remoteInputAesIv, sizeof(avRiKeyId));
avRiKeyId = BE32(avRiKeyId);
// For GFE 3.22 compatibility, we must start the audio ping thread before the RTSP handshake.
// It will not reply to our RTSP PLAY request until the audio ping has been received.
rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_BUFFER);
if (rtpSocket == INVALID_SOCKET) {
return LastSocketFail();
}
// We may receive audio before our threads are started, but that's okay. We'll
// drop the first 1 second of audio packets to catch up with the backlog.
int err = PltCreateThread("AudioPing", UdpPingThreadProc, NULL, &udpPingThread);
if (err != 0) {
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
return err;
}
return 0;
}
static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
PLINKED_BLOCKING_QUEUE_ENTRY nextEntry;
while (entry != NULL) {
nextEntry = entry->flink;
// The entry is stored within the data allocation
free(entry->data);
entry = nextEntry;
}
}
// Tear down the audio stream once we're done with it
void destroyAudioStream(void) {
if (rtpSocket != INVALID_SOCKET) {
PltInterruptThread(&udpPingThread);
PltJoinThread(&udpPingThread);
PltCloseThread(&udpPingThread);
closeSocket(rtpSocket);
rtpSocket = INVALID_SOCKET;
}
PltDestroyCryptoContext(audioDecryptionCtx);
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
RtpqCleanupQueue(&rtpReorderQueue);
}
static bool queuePacketToLbq(PQUEUED_AUDIO_PACKET* packet) {
int err;
err = LbqOfferQueueItem(&packetQueue, *packet, &(*packet)->q.lentry);
if (err == LBQ_SUCCESS) {
// The LBQ owns the buffer now
*packet = NULL;
}
else if (err == LBQ_BOUND_EXCEEDED) {
Limelog("Audio packet queue overflow\n");
freePacketList(LbqFlushQueueItems(&packetQueue));
}
else if (err == LBQ_INTERRUPTED) {
return false;
}
return true;
}
static void decodeInputData(PQUEUED_AUDIO_PACKET packet) {
PRTP_PACKET rtp;
rtp = (PRTP_PACKET)&packet->data[0];
if (lastSeq != 0 && (unsigned short)(lastSeq + 1) != rtp->sequenceNumber) {
Limelog("Received OOS audio data (expected %d, but got %d)\n", lastSeq + 1, rtp->sequenceNumber);
AudioCallbacks.decodeAndPlaySample(NULL, 0);
}
lastSeq = rtp->sequenceNumber;
if (AudioEncryptionEnabled) {
// We must have room for the AES padding which may be written to the buffer
unsigned char decryptedOpusData[ROUND_TO_PKCS7_PADDED_LEN(MAX_PACKET_SIZE)];
unsigned char iv[16] = { 0 };
int dataLength = packet->size - sizeof(*rtp);
LC_ASSERT(dataLength <= MAX_PACKET_SIZE);
// The IV is the avkeyid (equivalent to the rikeyid) +
// the RTP sequence number, in big endian.
uint32_t ivSeq = BE32(avRiKeyId + rtp->sequenceNumber);
memcpy(iv, &ivSeq, sizeof(ivSeq));
if (!PltDecryptMessage(audioDecryptionCtx, ALGORITHM_AES_CBC, CIPHER_FLAG_RESET_IV | CIPHER_FLAG_FINISH,
(unsigned char*)StreamConfig.remoteInputAesKey, sizeof(StreamConfig.remoteInputAesKey),
iv, sizeof(iv),
NULL, 0,
(unsigned char*)(rtp + 1), dataLength,
decryptedOpusData, &dataLength)) {
return;
}
AudioCallbacks.decodeAndPlaySample((char*)decryptedOpusData, dataLength);
}
else {
AudioCallbacks.decodeAndPlaySample((char*)(rtp + 1), packet->size - sizeof(*rtp));
}
}
static void ReceiveThreadProc(void* context) {
PRTP_PACKET rtp;
PQUEUED_AUDIO_PACKET packet;
int queueStatus;
bool useSelect;
uint32_t packetsToDrop;
int waitingForAudioMs;
packet = NULL;
packetsToDrop = 0;
if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) {
// SO_RCVTIMEO failed, so use select() to wait
useSelect = true;
}
else {
// SO_RCVTIMEO timeout set for recv()
useSelect = false;
}
waitingForAudioMs = 0;
while (!PltIsThreadInterrupted(&receiveThread)) {
if (packet == NULL) {
packet = (PQUEUED_AUDIO_PACKET)malloc(sizeof(*packet));
if (packet == NULL) {
Limelog("Audio Receive: malloc() failed\n");
ListenerCallbacks.connectionTerminated(-1);
break;
}
}
packet->size = recvUdpSocket(rtpSocket, &packet->data[0], MAX_PACKET_SIZE, useSelect);
if (packet->size < 0) {
Limelog("Audio Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError());
ListenerCallbacks.connectionTerminated(LastSocketFail());
break;
}
else if (packet->size == 0) {
// Receive timed out; try again
if (!receivedDataFromPeer) {
waitingForAudioMs += UDP_RECV_POLL_TIMEOUT_MS;
}
// If we hit this path, there are no queued audio packets on the host PC,
// so we don't need to drop anything.
packetsToDrop = 0;
continue;
}
if (packet->size < (int)sizeof(RTP_PACKET)) {
// Runt packet
continue;
}
rtp = (PRTP_PACKET)&packet->data[0];
if (rtp->packetType != 97) {
// Not audio
continue;
}
if (!receivedDataFromPeer) {
receivedDataFromPeer = true;
Limelog("Received first audio packet after %d ms\n", waitingForAudioMs);
if (firstReceiveTime != 0) {
packetsToDrop = (uint32_t)(PltGetMillis() - firstReceiveTime) / AudioPacketDuration;
Limelog("Initial audio resync period: %d milliseconds\n", packetsToDrop * AudioPacketDuration);
}
}
// GFE accumulates audio samples before we are ready to receive them, so
// we will drop the ones that arrived before the receive thread was ready.
if (packetsToDrop > 0) {
packetsToDrop--;
continue;
}
// Convert fields to host byte-order
rtp->sequenceNumber = BE16(rtp->sequenceNumber);
rtp->timestamp = BE32(rtp->timestamp);
rtp->ssrc = BE32(rtp->ssrc);
queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET)packet, &packet->q.rentry);
if (RTPQ_HANDLE_NOW(queueStatus)) {
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
break;
}
}
else {
decodeInputData(packet);
}
}
else {
if (RTPQ_PACKET_CONSUMED(queueStatus)) {
// The queue consumed our packet, so we must allocate a new one
packet = NULL;
}
if (RTPQ_PACKET_READY(queueStatus)) {
// If packets are ready, pull them and send them to the decoder
while ((packet = (PQUEUED_AUDIO_PACKET)RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) {
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
break;
}
}
else {
decodeInputData(packet);
free(packet);
}
}
// Break on exit
if (packet != NULL) {
break;
}
}
}
}
if (packet != NULL) {
free(packet);
}
}
static void DecoderThreadProc(void* context) {
int err;
PQUEUED_AUDIO_PACKET packet;
while (!PltIsThreadInterrupted(&decoderThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**)&packet);
if (err != LBQ_SUCCESS) {
// An exit signal was received
return;
}
decodeInputData(packet);
free(packet);
}
}
void stopAudioStream(void) {
if (!receivedDataFromPeer) {
Limelog("No audio traffic was ever received from the host!\n");
}
AudioCallbacks.stop();
PltInterruptThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
// Signal threads waiting on the LBQ
LbqSignalQueueShutdown(&packetQueue);
PltInterruptThread(&decoderThread);
}
PltJoinThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&receiveThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltCloseThread(&decoderThread);
}
AudioCallbacks.cleanup();
}
int startAudioStream(void* audioContext, int arFlags) {
int err;
OPUS_MULTISTREAM_CONFIGURATION chosenConfig;
if (HighQualitySurroundEnabled) {
LC_ASSERT(HighQualitySurroundSupported);
LC_ASSERT(HighQualityOpusConfig.channelCount != 0);
LC_ASSERT(HighQualityOpusConfig.streams != 0);
chosenConfig = HighQualityOpusConfig;
}
else {
LC_ASSERT(NormalQualityOpusConfig.channelCount != 0);
LC_ASSERT(NormalQualityOpusConfig.streams != 0);
chosenConfig = NormalQualityOpusConfig;
}
chosenConfig.samplesPerFrame = 48 * AudioPacketDuration;
err = AudioCallbacks.init(StreamConfig.audioConfiguration, &chosenConfig, audioContext, arFlags);
if (err != 0) {
return err;
}
AudioCallbacks.start();
err = PltCreateThread("AudioRecv", ReceiveThreadProc, NULL, &receiveThread);
if (err != 0) {
AudioCallbacks.stop();
closeSocket(rtpSocket);
AudioCallbacks.cleanup();
return err;
}
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
err = PltCreateThread("AudioDec", DecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
AudioCallbacks.stop();
PltInterruptThread(&receiveThread);
PltJoinThread(&receiveThread);
PltCloseThread(&receiveThread);
closeSocket(rtpSocket);
AudioCallbacks.cleanup();
return err;
}
}
return 0;
}
int LiGetPendingAudioFrames(void) {
return LbqGetItemCount(&packetQueue);
}
int LiGetPendingAudioDuration(void) {
return LiGetPendingAudioFrames() * AudioPacketDuration;
}