diff --git a/.gitignore b/.gitignore index bc8390e..275532a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ .idea/ +.vscode/ limelight-common/ARM/ limelight-common/Debug/ Build/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 2856c7b..603e9dd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2,6 +2,7 @@ cmake_minimum_required(VERSION 3.1...4.0) project(moonlight-common-c LANGUAGES C) +string(TOUPPER "x${CMAKE_BUILD_TYPE}" BUILD_TYPE) set(CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) option(USE_MBEDTLS "Use MbedTLS instead of OpenSSL" OFF) @@ -61,7 +62,6 @@ else() target_include_directories(moonlight-common-c SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR}) endif() -string(TOUPPER "x${CMAKE_BUILD_TYPE}" BUILD_TYPE) if("${BUILD_TYPE}" STREQUAL "XDEBUG") target_compile_definitions(moonlight-common-c PRIVATE LC_DEBUG) else() @@ -74,10 +74,40 @@ else() endif() endif() +if (NOT(MSVC OR APPLE)) + include(CheckLibraryExists) + CHECK_LIBRARY_EXISTS(rt clock_gettime "" HAVE_CLOCK_GETTIME) + + if (NOT HAVE_CLOCK_GETTIME) + set(CMAKE_EXTRA_INCLUDE_FILES time.h) + CHECK_FUNCTION_EXISTS(clock_gettime HAVE_CLOCK_GETTIME) + SET(CMAKE_EXTRA_INCLUDE_FILES) + endif() + + foreach(clock CLOCK_MONOTONIC CLOCK_MONOTONIC_RAW) + message(STATUS "Testing whether ${clock} can be used") + CHECK_CXX_SOURCE_COMPILES( +"#define _POSIX_C_SOURCE 200112L +#include +int main () +{ + struct timespec ts[1]; + clock_gettime (${clock}, ts); + return 0; +}" HAVE_${clock}) + if(HAVE_${clock}) + message(STATUS "Testing whether ${clock} can be used -- Success") + else() + message(STATUS "Testing whether ${clock} can be used -- Failed") + endif() + endforeach() + +endif() + target_include_directories(moonlight-common-c SYSTEM PUBLIC src) target_include_directories(moonlight-common-c PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/reedsolomon ) -target_compile_definitions(moonlight-common-c PRIVATE HAS_SOCKLEN_T) \ No newline at end of file +target_compile_definitions(moonlight-common-c PRIVATE HAS_SOCKLEN_T) diff --git a/src/AudioStream.c b/src/AudioStream.c index d14d572..2fcc1f0 100644 --- a/src/AudioStream.c +++ b/src/AudioStream.c @@ -275,7 +275,7 @@ static void AudioReceiveThreadProc(void* context) { } else if (packet->header.size == 0) { // Receive timed out; try again - + if (!receivedDataFromPeer) { waitingForAudioMs += UDP_RECV_POLL_TIMEOUT_MS; } @@ -299,6 +299,8 @@ static void AudioReceiveThreadProc(void* context) { Limelog("Received first audio packet after %d ms\n", waitingForAudioMs); if (firstReceiveTime != 0) { + // XXX firstReceiveTime is never set here... + // We're already dropping 500ms of audio so this probably doesn't matter packetsToDrop += (uint32_t)(PltGetMillis() - firstReceiveTime) / AudioPacketDuration; } @@ -366,7 +368,7 @@ static void AudioReceiveThreadProc(void* context) { free(queuedPacket); } } - + // Break on exit if (queuedPacket != NULL) { break; @@ -374,7 +376,7 @@ static void AudioReceiveThreadProc(void* context) { } } } - + if (packet != NULL) { free(packet); } @@ -405,12 +407,12 @@ void stopAudioStream(void) { AudioCallbacks.stop(); PltInterruptThread(&receiveThread); - if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { + if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { // Signal threads waiting on the LBQ LbqSignalQueueShutdown(&packetQueue); PltInterruptThread(&decoderThread); } - + PltJoinThread(&receiveThread); if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) { PltJoinThread(&decoderThread); @@ -474,3 +476,7 @@ int LiGetPendingAudioFrames(void) { int LiGetPendingAudioDuration(void) { return LiGetPendingAudioFrames() * AudioPacketDuration; } + +const RTP_AUDIO_STATS* LiGetRTPAudioStats(void) { + return &rtpAudioQueue.stats; +} diff --git a/src/InputStream.c b/src/InputStream.c index e61e2be..d9e97ce 100644 --- a/src/InputStream.c +++ b/src/InputStream.c @@ -95,7 +95,7 @@ typedef struct _PACKET_HOLDER { // Initializes the input stream int initializeInputStream(void) { memcpy(currentAesIv, StreamConfig.remoteInputAesIv, sizeof(currentAesIv)); - + // Set a high maximum queue size limit to ensure input isn't dropped // while the input send thread is blocked for short periods. LbqInitializeLinkedBlockingQueue(&packetQueue, MAX_QUEUED_INPUT_PACKETS); @@ -129,7 +129,7 @@ int initializeInputStream(void) { // Destroys and cleans up the input stream void destroyInputStream(void) { PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry; - + PltDestroyCryptoContext(cryptoContext); entry = LbqDestroyLinkedBlockingQueue(&packetQueue); @@ -740,7 +740,7 @@ int stopInputStream(void) { if (inputSock != INVALID_SOCKET) { shutdownTcpSocket(inputSock); } - + if (inputSock != INVALID_SOCKET) { closeSocket(inputSock); inputSock = INVALID_SOCKET; diff --git a/src/Limelight.h b/src/Limelight.h index 3d8aded..395083a 100644 --- a/src/Limelight.h +++ b/src/Limelight.h @@ -154,16 +154,15 @@ typedef struct _DECODE_UNIT { // (happens when the frame is repeated). uint16_t frameHostProcessingLatency; - // Receive time of first buffer. This value uses an implementation-defined epoch, - // but the same epoch as enqueueTimeMs and LiGetMillis(). - uint64_t receiveTimeMs; + // Receive time of first buffer in microseconds. + uint64_t receiveTimeUs; // Time the frame was fully assembled and queued for the video decoder to process. // This is also approximately the same time as the final packet was received, so - // enqueueTimeMs - receiveTimeMs is the time taken to receive the frame. At the + // enqueueTimeUs - receiveTimeUs is the time taken to receive the frame. At the // time the decode unit is passed to submitDecodeUnit(), the total queue delay - // can be calculated by LiGetMillis() - enqueueTimeMs. - uint64_t enqueueTimeMs; + // can be calculated. This value is in microseconds. + uint64_t enqueueTimeUs; // Presentation time in milliseconds with the epoch at the first captured frame. // This can be used to aid frame pacing or to drop old frames that were queued too @@ -833,7 +832,12 @@ int LiSendHighResScrollEvent(short scrollAmount); int LiSendHScrollEvent(signed char scrollClicks); int LiSendHighResHScrollEvent(short scrollAmount); +// This function returns a time in microseconds with an implementation-defined epoch. +// It should only ever be compared with the return value from a previous call to itself. +uint64_t LiGetMicroseconds(void); + // This function returns a time in milliseconds with an implementation-defined epoch. +// It should only ever be compared with the return value from a previous call to itself. uint64_t LiGetMillis(void); // This is a simplistic STUN function that can assist clients in getting the WAN address @@ -856,6 +860,36 @@ int LiGetPendingAudioFrames(void); // negotiated audio frame duration. int LiGetPendingAudioDuration(void); +// Returns a pointer to a struct containing various statistics about the RTP audio stream. +// The data should be considered read-only and must not be modified. +typedef struct _RTP_AUDIO_STATS { + uint32_t packetCountAudio; // total audio packets + uint32_t packetCountFec; // total packets of type FEC + uint32_t packetCountFecRecovered; // a packet was saved + uint32_t packetCountFecFailed; // tried to recover but too much was lost + uint32_t packetCountOOS; // out-of-sequence packets + uint32_t packetCountInvalid; // corrupted packets, etc + uint32_t packetCountFecInvalid; // invalid FEC packet +} RTP_AUDIO_STATS, *PRTP_AUDIO_STATS; + +const RTP_AUDIO_STATS* LiGetRTPAudioStats(void); + +// Returns a pointer to a struct containing various statistics about the RTP video stream. +// The data should be considered read-only and must not be modified. +// Right now this is mainly used to track total video and FEC packets, as there are +// many video stats already implemented at a higher level in moonlight-qt. +typedef struct _RTP_VIDEO_STATS { + uint32_t packetCountVideo; // total video packets + uint32_t packetCountFec; // total packets of type FEC + uint32_t packetCountFecRecovered; // a packet was saved + uint32_t packetCountFecFailed; // tried to recover but too much was lost + uint32_t packetCountOOS; // out-of-sequence packets + uint32_t packetCountInvalid; // corrupted packets, etc + uint32_t packetCountFecInvalid; // invalid FEC packet +} RTP_VIDEO_STATS, *PRTP_VIDEO_STATS; + +const RTP_VIDEO_STATS* LiGetRTPVideoStats(void); + // Port index flags for use with LiGetPortFromPortFlagIndex() and LiGetProtocolFromPortFlagIndex() #define ML_PORT_INDEX_TCP_47984 0 #define ML_PORT_INDEX_TCP_47989 1 @@ -883,7 +917,7 @@ int LiGetPendingAudioDuration(void); unsigned int LiGetPortFlagsFromStage(int stage); unsigned int LiGetPortFlagsFromTerminationErrorCode(int errorCode); -// Returns the IPPROTO_* value for the specified port index +// Returns the IPPROTO_* value for the specified port index int LiGetProtocolFromPortFlagIndex(int portFlagIndex); // Returns the port number for the specified port index diff --git a/src/Misc.c b/src/Misc.c index 4988b12..e3cd6b0 100644 --- a/src/Misc.c +++ b/src/Misc.c @@ -99,7 +99,7 @@ int extractVersionQuadFromString(const char* string, int* quad) { nextNumber++; } } - + return 0; } @@ -148,6 +148,10 @@ uint64_t LiGetMillis(void) { return PltGetMillis(); } +uint64_t LiGetMicroseconds(void) { + return PltGetMicroseconds(); +} + uint32_t LiGetHostFeatureFlags(void) { return SunshineFeatureFlags; } diff --git a/src/Platform.c b/src/Platform.c index 9782e00..61bc3ba 100644 --- a/src/Platform.c +++ b/src/Platform.c @@ -419,24 +419,157 @@ void PltWaitForConditionVariable(PLT_COND* cond, PLT_MUTEX* mutex) { #endif } -uint64_t PltGetMillis(void) { +//// Begin timing functions + +// These functions return a number of microseconds or milliseconds since an opaque start time. + +static bool has_monotonic_time = false; +static bool ticks_started = false; + #if defined(LC_WINDOWS) - return GetTickCount64(); -#elif defined(CLOCK_MONOTONIC) && !defined(NO_CLOCK_GETTIME) - struct timespec tv; - clock_gettime(CLOCK_MONOTONIC, &tv); +static LARGE_INTEGER start_ticks; +static LARGE_INTEGER ticks_per_second; - return ((uint64_t)tv.tv_sec * 1000) + (tv.tv_nsec / 1000000); -#else - struct timeval tv; - - gettimeofday(&tv, NULL); - - return ((uint64_t)tv.tv_sec * 1000) + (tv.tv_usec / 1000); -#endif +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + QueryPerformanceFrequency(&ticks_per_second); + QueryPerformanceCounter(&start_ticks); } +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + LARGE_INTEGER now; + QueryPerformanceCounter(&now); + return (uint64_t)(((now.QuadPart - start_ticks.QuadPart) * 1000000) / ticks_per_second.QuadPart); +} + +#elif defined(LC_DARWIN) + +static mach_timebase_info_data_t mach_base_info; +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + mach_timebase_info(&mach_base_info); + has_monotonic_time = true; + start = mach_absolute_time(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + const uint64_t now = mach_absolute_time(); + return (((now - start) * mach_base_info.numer) / mach_base_info.denom) / 1000; +} + +#elif defined(__vita__) + +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + start = sceKernelGetProcessTimeWide(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + uint64_t now = sceKernelGetProcessTimeWide(); + return (uint64_t)(now - start); +} + +#elif defined(__3DS__) + +static uint64_t start; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; + start = svcGetSystemTick(); +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + uint64_t elapsed = svcGetSystemTick() - start; + return elapsed * 1000 / CPU_TICKS_PER_MSEC; +} + +#else + +/* Use CLOCK_MONOTONIC_RAW, if available, which is not subject to adjustment by NTP */ +#ifdef HAVE_CLOCK_GETTIME +static struct timespec start_ts; +# ifdef CLOCK_MONOTONIC_RAW +# define PLT_MONOTONIC_CLOCK CLOCK_MONOTONIC_RAW +# else +# define PLT_MONOTONIC_CLOCK CLOCK_MONOTONIC +# endif +#endif + +static struct timeval start_tv; + +void PltTicksInit(void) { + if (ticks_started) { + return; + } + ticks_started = true; +#ifdef HAVE_CLOCK_GETTIME + if (clock_gettime(PLT_MONOTONIC_CLOCK, &start_ts) == 0) { + has_monotonic_time = true; + } else +#endif + { + gettimeofday(&start_tv, NULL); + } +} + +uint64_t PltGetMicroseconds(void) { + if (!ticks_started) { + PltTicksInit(); + } + + if (has_monotonic_time) { +#ifdef HAVE_CLOCK_GETTIME + struct timespec now; + clock_gettime(PLT_MONOTONIC_CLOCK, &now); + return (uint64_t)(((int64_t)(now.tv_sec - start_ts.tv_sec) * 1000000) + ((now.tv_nsec - start_ts.tv_nsec) / 1000)); +#else + LC_ASSERT(false); + return 0; +#endif + } else { + struct timeval now; + gettimeofday(&now, NULL); + return (uint64_t)(((int64_t)(now.tv_sec - start_tv.tv_sec) * 1000000) + (now.tv_usec - start_tv.tv_usec)); + } +} + +#endif + +uint64_t PltGetMillis(void) { + return PltGetMicroseconds() / 1000; +} + +//// End timing functions + bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src) { LC_ASSERT(dest_size > 0); @@ -474,6 +607,8 @@ bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src) { int initializePlatform(void) { int err; + PltTicksInit(); + err = initializePlatformSockets(); if (err != 0) { return err; diff --git a/src/Platform.h b/src/Platform.h index 20d03f5..4c21b57 100644 --- a/src/Platform.h +++ b/src/Platform.h @@ -18,6 +18,15 @@ #include #include #include +#elif defined(__APPLE__) +#include +#include +#include +#include +#include +#include +#include +#include #elif defined(__vita__) #include #include @@ -146,6 +155,11 @@ int initializePlatform(void); void cleanupPlatform(void); +bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src); + +void PltTicksInit(void); + +uint64_t PltGetMicroseconds(void); uint64_t PltGetMillis(void); -bool PltSafeStrcpy(char* dest, size_t dest_size, const char* src); + diff --git a/src/RtpAudioQueue.c b/src/RtpAudioQueue.c index be734d4..0f7235b 100644 --- a/src/RtpAudioQueue.c +++ b/src/RtpAudioQueue.c @@ -204,15 +204,19 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK if (packet->packetType == RTP_PAYLOAD_TYPE_AUDIO) { if (length < sizeof(RTP_PACKET)) { + queue->stats.packetCountInvalid++; Limelog("RTP audio data packet too small: %u\n", length); LC_ASSERT_VT(false); return NULL; } + queue->stats.packetCountAudio++; + // Remember if we've received out-of-sequence packets lately. We can use // this knowledge to more quickly give up on FEC blocks. if (!queue->synchronizing && isBefore16(packet->sequenceNumber, queue->oldestRtpBaseSequenceNumber)) { queue->lastOosSequenceNumber = packet->sequenceNumber; + queue->stats.packetCountOOS++; if (!queue->receivedOosData) { Limelog("Leaving fast audio recovery mode after OOS audio data (%u < %u)\n", packet->sequenceNumber, queue->oldestRtpBaseSequenceNumber); @@ -238,11 +242,14 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK PAUDIO_FEC_HEADER fecHeader = (PAUDIO_FEC_HEADER)(packet + 1); if (length < sizeof(RTP_PACKET) + sizeof(AUDIO_FEC_HEADER)) { + queue->stats.packetCountFecInvalid++; Limelog("RTP audio FEC packet too small: %u\n", length); LC_ASSERT_VT(false); return NULL; } + queue->stats.packetCountFec++; + // This is an FEC packet, so we can just copy (and byteswap) the FEC header fecBlockPayloadType = fecHeader->payloadType; fecBlockBaseSeqNum = BE16(fecHeader->baseSequenceNumber); @@ -252,6 +259,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK // Ensure the FEC shard index is valid to prevent OOB access // later during recovery. if (fecHeader->fecShardIndex >= RTPA_FEC_SHARDS) { + queue->stats.packetCountFecInvalid++; Limelog("Too many audio FEC shards: %u\n", fecHeader->fecShardIndex); LC_ASSERT_VT(false); return NULL; @@ -261,6 +269,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK // The FEC blocks must start on a RTPA_DATA_SHARDS boundary for our queuing logic to work. This isn't // the case for older versions of GeForce Experience (at least 3.13). Disable the FEC logic if this // invariant is validated. + queue->stats.packetCountFecInvalid++; Limelog("Invalid FEC block base sequence number (got %u, expected %u)\n", fecBlockBaseSeqNum, (fecBlockBaseSeqNum / RTPA_DATA_SHARDS) * RTPA_DATA_SHARDS); Limelog("Audio FEC has been disabled due to an incompatibility with your host's old software!\n"); @@ -304,6 +313,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK if (existingBlock->blockSize != blockSize) { // This can happen with older versions of GeForce Experience (3.13) and Sunshine that don't use a // constant size for audio packets. + queue->stats.packetCountFecInvalid++; Limelog("Audio block size mismatch (got %u, expected %u)\n", blockSize, existingBlock->blockSize); Limelog("Audio FEC has been disabled due to an incompatibility with your host's old software!\n"); LC_ASSERT_VT(existingBlock->blockSize == blockSize); @@ -331,7 +341,7 @@ static PRTPA_FEC_BLOCK getFecBlockForRtpPacket(PRTP_AUDIO_QUEUE queue, PRTP_PACK memset(block, 0, sizeof(*block)); - block->queueTimeMs = PltGetMillis(); + block->queueTimeUs = PltGetMicroseconds(); block->blockSize = blockSize; memset(block->marks, 1, sizeof(block->marks)); @@ -454,13 +464,15 @@ static bool completeFecBlock(PRTP_AUDIO_QUEUE queue, PRTPA_FEC_BLOCK block) { } } -#ifdef FEC_VERBOSE + if (block->dataShardsReceived != RTPA_DATA_SHARDS) { + queue->stats.packetCountFecRecovered += RTPA_DATA_SHARDS - block->dataShardsReceived; +#ifdef FEC_VERBOSE Limelog("Recovered %d audio data shards from block %d\n", RTPA_DATA_SHARDS - block->dataShardsReceived, block->fecHeader.baseSequenceNumber); - } #endif + } #ifdef FEC_VALIDATION_MODE // Check the RTP header values @@ -531,9 +543,10 @@ static void handleMissingPackets(PRTP_AUDIO_QUEUE queue) { // At this point, we know we've got a second FEC block queued up waiting on the first one to complete. // If we've never seen OOS data from this host, we'll assume the first one is lost and skip forward. // If we have seen OOS data, we'll wait for a little while longer to see if OOS packets arrive before giving up. - if (!queue->receivedOosData || PltGetMillis() - queue->blockHead->queueTimeMs > (uint32_t)(AudioPacketDuration * RTPA_DATA_SHARDS) + RTPQ_OOS_WAIT_TIME_MS) { + if (!queue->receivedOosData || PltGetMicroseconds() - queue->blockHead->queueTimeUs > (uint64_t)(AudioPacketDuration * RTPA_DATA_SHARDS) + (RTPQ_OOS_WAIT_TIME_MS * 1000)) { LC_ASSERT(!isBefore16(queue->nextRtpSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber)); + queue->stats.packetCountFecFailed++; Limelog("Unable to recover audio data block %u to %u (%u+%u=%u received < %u needed)\n", queue->blockHead->fecHeader.baseSequenceNumber, queue->blockHead->fecHeader.baseSequenceNumber + RTPA_DATA_SHARDS - 1, diff --git a/src/RtpAudioQueue.h b/src/RtpAudioQueue.h index 1c8ed09..bbe562e 100644 --- a/src/RtpAudioQueue.h +++ b/src/RtpAudioQueue.h @@ -33,7 +33,7 @@ typedef struct _RTPA_FEC_BLOCK { AUDIO_FEC_HEADER fecHeader; - uint64_t queueTimeMs; + uint64_t queueTimeUs; uint8_t dataShardsReceived; uint8_t fecShardsReceived; bool fullyReassembled; @@ -63,6 +63,8 @@ typedef struct _RTP_AUDIO_QUEUE { bool receivedOosData; bool synchronizing; bool incompatibleServer; + + RTP_AUDIO_STATS stats; } RTP_AUDIO_QUEUE, *PRTP_AUDIO_QUEUE; #define RTPQ_RET_PACKET_CONSUMED 0x1 diff --git a/src/RtpVideoQueue.c b/src/RtpVideoQueue.c index d5dbe94..dc82a94 100644 --- a/src/RtpVideoQueue.c +++ b/src/RtpVideoQueue.c @@ -91,7 +91,7 @@ static void removeEntryFromList(PRTPV_QUEUE_LIST list, PRTPV_QUEUE_ENTRY entry) static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { SS_FRAME_FEC_STATUS fecStatus; - + fecStatus.frameIndex = BE32(queue->currentFrameNumber); fecStatus.highestReceivedSequenceNumber = BE16(queue->receivedHighestSequenceNumber); fecStatus.nextContiguousSequenceNumber = BE16(queue->nextContiguousSequenceNumber); @@ -103,7 +103,7 @@ static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { fecStatus.fecPercentage = (uint8_t)queue->fecPercentage; fecStatus.multiFecBlockIndex = (uint8_t)queue->multiFecCurrentBlockNumber; fecStatus.multiFecBlockCount = (uint8_t)(queue->multiFecLastBlockNumber + 1); - + connectionSendFrameFecStatus(&fecStatus); } @@ -111,7 +111,7 @@ static void reportFinalFrameFecStatus(PRTP_VIDEO_QUEUE queue) { static bool queuePacket(PRTP_VIDEO_QUEUE queue, PRTPV_QUEUE_ENTRY newEntry, PRTP_PACKET packet, int length, bool isParity, bool isFecRecovery) { PRTPV_QUEUE_ENTRY entry; bool outOfSequence; - + LC_ASSERT(!(isFecRecovery && isParity)); LC_ASSERT(!isBefore16(packet->sequenceNumber, queue->nextContiguousSequenceNumber)); @@ -195,7 +195,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { int ret; LC_ASSERT(totalPackets == U16(queue->bufferHighestSequenceNumber - queue->bufferLowestSequenceNumber) + 1U); - + #ifdef FEC_VALIDATION_MODE // We'll need an extra packet to run in FEC validation mode, because we will // be "dropping" one below and recovering it using parity. However, some frames @@ -263,9 +263,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { ret = -2; goto cleanup; } - + rs = reed_solomon_new(queue->bufferDataPackets, queue->bufferParityPackets); - + // This could happen in an OOM condition, but it could also mean the FEC data // that we fed to reed_solomon_new() is bogus, so we'll assert to get a better look. LC_ASSERT(rs != NULL); @@ -273,9 +273,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { ret = -3; goto cleanup; } - + memset(marks, 1, sizeof(char) * (totalPackets)); - + int receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE; int packetBufferSize = receiveSize + sizeof(RTPV_QUEUE_ENTRY); @@ -307,7 +307,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { packets[index] = (unsigned char*) entry->packet; marks[index] = 0; - + //Set padding to zero if (entry->length < receiveSize) { memset(&packets[index][entry->length], 0, receiveSize - entry->length); @@ -326,9 +326,9 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { } } } - + ret = reed_solomon_reconstruct(rs, packets, marks, totalPackets, receiveSize); - + // We should always provide enough parity to recover the missing data successfully. // If this fails, something is probably wrong with our FEC state. LC_ASSERT(ret == 0); @@ -339,7 +339,7 @@ static int reconstructFrame(PRTP_VIDEO_QUEUE queue) { queue->bufferDataPackets - queue->receivedDataPackets, queue->currentFrameNumber); #endif - + // Report the final FEC status if we needed to perform a recovery reportFinalFrameFecStatus(queue); } @@ -355,7 +355,7 @@ cleanup_packets: rtpPacket->header = queue->pendingFecBlockList.head->packet->header; rtpPacket->timestamp = queue->pendingFecBlockList.head->packet->timestamp; rtpPacket->ssrc = queue->pendingFecBlockList.head->packet->ssrc; - + int dataOffset = sizeof(*rtpPacket); if (rtpPacket->header & FLAG_EXTENSION) { dataOffset += 4; // 2 additional fields @@ -457,7 +457,7 @@ cleanup: if (marks != NULL) free(marks); - + return ret; } @@ -497,8 +497,8 @@ static void stageCompleteFecBlock(PRTP_VIDEO_QUEUE queue) { // and use the first packet's receive time for all packets. This ends up // actually being better for the measurements that the depacketizer does, // since it properly handles out of order packets. - LC_ASSERT(queue->bufferFirstRecvTimeMs != 0); - entry->receiveTimeMs = queue->bufferFirstRecvTimeMs; + LC_ASSERT(queue->bufferFirstRecvTimeUs != 0); + entry->receiveTimeUs = queue->bufferFirstRecvTimeUs; // Move this packet to the completed FEC block list insertEntryIntoList(&queue->completedFecBlockList, entry); @@ -631,7 +631,7 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->bufferDataPackets); } } - + // We must either start on the current FEC block number for the current frame, // or block 0 of a new frame. uint8_t expectedFecBlockNumber = (queue->currentFrameNumber == nvPacket->frameIndex ? queue->multiFecCurrentBlockNumber : 0); @@ -689,8 +689,8 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ // Tell the control stream logic about this frame, even if we don't end up // being able to reconstruct a full frame from it. connectionSawFrame(queue->currentFrameNumber); - - queue->bufferFirstRecvTimeMs = PltGetMillis(); + + queue->bufferFirstRecvTimeUs = PltGetMicroseconds(); queue->bufferLowestSequenceNumber = U16(packet->sequenceNumber - fecIndex); queue->nextContiguousSequenceNumber = queue->bufferLowestSequenceNumber; queue->receivedDataPackets = 0; @@ -706,6 +706,9 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->bufferHighestSequenceNumber = U16(queue->bufferFirstParitySequenceNumber + queue->bufferParityPackets - 1); queue->multiFecCurrentBlockNumber = fecCurrentBlockNumber; queue->multiFecLastBlockNumber = (nvPacket->multiFecBlocks >> 6) & 0x3; + + queue->stats.packetCountVideo += queue->bufferDataPackets; + queue->stats.packetCountFec += queue->bufferParityPackets; } // Reject packets above our FEC queue valid sequence number range @@ -762,18 +765,18 @@ int RtpvAddPacket(PRTP_VIDEO_QUEUE queue, PRTP_PACKET packet, int length, PRTPV_ queue->receivedParityPackets++; LC_ASSERT(queue->receivedParityPackets <= queue->bufferParityPackets); } - + // Try to submit this frame. If we haven't received enough packets, // this will fail and we'll keep waiting. if (reconstructFrame(queue) == 0) { // Stage the complete FEC block for use once reassembly is complete stageCompleteFecBlock(queue); - + // stageCompleteFecBlock() should have consumed all pending FEC data LC_ASSERT(queue->pendingFecBlockList.head == NULL); LC_ASSERT(queue->pendingFecBlockList.tail == NULL); LC_ASSERT(queue->pendingFecBlockList.count == 0); - + // If we're not yet at the last FEC block for this frame, move on to the next block. // Otherwise, the frame is complete and we can move on to the next frame. if (queue->multiFecCurrentBlockNumber < queue->multiFecLastBlockNumber) { diff --git a/src/RtpVideoQueue.h b/src/RtpVideoQueue.h index ec42c04..a79ada8 100644 --- a/src/RtpVideoQueue.h +++ b/src/RtpVideoQueue.h @@ -6,7 +6,7 @@ typedef struct _RTPV_QUEUE_ENTRY { struct _RTPV_QUEUE_ENTRY* next; struct _RTPV_QUEUE_ENTRY* prev; PRTP_PACKET packet; - uint64_t receiveTimeMs; + uint64_t receiveTimeUs; uint32_t presentationTimeMs; int length; bool isParity; @@ -22,7 +22,7 @@ typedef struct _RTP_VIDEO_QUEUE { RTPV_QUEUE_LIST pendingFecBlockList; RTPV_QUEUE_LIST completedFecBlockList; - uint64_t bufferFirstRecvTimeMs; + uint64_t bufferFirstRecvTimeUs; uint32_t bufferLowestSequenceNumber; uint32_t bufferHighestSequenceNumber; uint32_t bufferFirstParitySequenceNumber; @@ -45,6 +45,8 @@ typedef struct _RTP_VIDEO_QUEUE { uint32_t lastOosFramePresentationTimestamp; bool receivedOosData; + + RTP_VIDEO_STATS stats; // the above values are short-lived, this tracks stats for the life of the queue } RTP_VIDEO_QUEUE, *PRTP_VIDEO_QUEUE; #define RTPF_RET_QUEUED 0 diff --git a/src/RtspConnection.c b/src/RtspConnection.c index 6f2a183..d8f1b35 100644 --- a/src/RtspConnection.c +++ b/src/RtspConnection.c @@ -267,19 +267,19 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons payloadLength = request->payloadLength; request->payload = NULL; request->payloadLength = 0; - + // Serialize the RTSP message into a message buffer serializedMessage = serializeRtspMessage(request, &messageLen); if (serializedMessage == NULL) { goto Exit; } - + // Create the reliable packet that describes our outgoing message packet = enet_packet_create(serializedMessage, messageLen, ENET_PACKET_FLAG_RELIABLE); if (packet == NULL) { goto Exit; } - + // Send the message if (enet_peer_send(peer, 0, packet) < 0) { enet_packet_destroy(packet); @@ -299,10 +299,10 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons enet_packet_destroy(packet); goto Exit; } - + enet_host_flush(client); } - + // Wait for a reply if (serviceEnetHost(client, &event, RTSP_RECEIVE_TIMEOUT_SEC * 1000) <= 0 || event.type != ENET_EVENT_TYPE_RECEIVE) { @@ -343,7 +343,7 @@ static bool transactRtspMessageEnet(PRTSP_MESSAGE request, PRTSP_MESSAGE respons offset += (int) event.packet->dataLength; enet_packet_destroy(event.packet); } - + if (parseRtspMessage(response, responseBuffer, offset) == RTSP_ERROR_SUCCESS) { // Successfully parsed response ret = true; @@ -583,7 +583,7 @@ static bool setupStream(PRTSP_MESSAGE response, char* target, int* error) { else { transportValue = " "; } - + if (addOption(&request, "Transport", transportValue) && addOption(&request, "If-Modified-Since", "Thu, 01 Jan 1970 00:00:00 GMT")) { @@ -992,21 +992,21 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { rtspClientVersion = 14; break; } - + // Setup ENet if required by this GFE version if (useEnet) { ENetAddress address; ENetEvent event; - + enet_address_set_address(&address, (struct sockaddr *)&RemoteAddr, AddrLen); enet_address_set_port(&address, RtspPortNumber); - + // Create a client that can use 1 outgoing connection and 1 channel client = enet_host_create(RemoteAddr.ss_family, NULL, 1, 1, 0, 0); if (client == NULL) { return -1; } - + // Connect to the host peer = enet_host_connect(client, &address, 1, 0); if (peer == NULL) { @@ -1014,7 +1014,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { client = NULL; return -1; } - + // Wait for the connect to complete if (serviceEnetHost(client, &event, RTSP_CONNECT_TIMEOUT_SEC * 1000) <= 0 || event.type != ENET_EVENT_TYPE_CONNECT) { @@ -1072,7 +1072,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { ret = -1; goto Exit; } - + if ((StreamConfig.supportedVideoFormats & VIDEO_FORMAT_MASK_AV1) && strstr(response.payload, "AV1/90000")) { if ((serverInfo->serverCodecModeSupport & SCM_AV1_HIGH10_444) && (StreamConfig.supportedVideoFormats & VIDEO_FORMAT_AV1_HIGH10_444)) { NegotiatedVideoFormat = VIDEO_FORMAT_AV1_HIGH10_444; @@ -1205,10 +1205,10 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { } // Given there is a non-null session id, get the - // first token of the session until ";", which + // first token of the session until ";", which // resolves any 454 session not found errors on // standard RTSP server implementations. - // (i.e - sessionId = "DEADBEEFCAFE;timeout = 90") + // (i.e - sessionId = "DEADBEEFCAFE;timeout = 90") sessionIdString = strdup(strtok_r(sessionId, ";", &strtokCtx)); if (sessionIdString == NULL) { Limelog("Failed to duplicate session ID string\n"); @@ -1262,7 +1262,7 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { freeMessage(&response); } - + if (AppVersionQuad[0] >= 5) { RTSP_MESSAGE response; int error = -1; @@ -1389,9 +1389,9 @@ int performRtspHandshake(PSERVER_INFORMATION serverInfo) { } } - + ret = 0; - + Exit: // Cleanup the ENet stuff if (useEnet) { @@ -1399,7 +1399,7 @@ Exit: enet_peer_disconnect_now(peer, 0); peer = NULL; } - + if (client != NULL) { enet_host_destroy(client); client = NULL; diff --git a/src/VideoDepacketizer.c b/src/VideoDepacketizer.c index b5321c1..c6859ef 100644 --- a/src/VideoDepacketizer.c +++ b/src/VideoDepacketizer.c @@ -17,9 +17,9 @@ static bool decodingFrame; static int frameType; static uint16_t lastPacketPayloadLength; static bool strictIdrFrameWait; -static uint64_t syntheticPtsBase; +static uint64_t syntheticPtsBaseUs; static uint16_t frameHostProcessingLatency; -static uint64_t firstPacketReceiveTime; +static uint64_t firstPacketReceiveTimeUs; static unsigned int firstPacketPresentationTime; static bool dropStatePending; static bool idrFrameProcessed; @@ -68,9 +68,9 @@ void initializeVideoDepacketizer(int pktSize) { waitingForRefInvalFrame = false; lastPacketInStream = UINT32_MAX; decodingFrame = false; - syntheticPtsBase = 0; + syntheticPtsBaseUs = 0; frameHostProcessingLatency = 0; - firstPacketReceiveTime = 0; + firstPacketReceiveTimeUs = 0; firstPacketPresentationTime = 0; lastPacketPayloadLength = 0; dropStatePending = false; @@ -483,9 +483,9 @@ static void reassembleFrame(int frameNumber) { qdu->decodeUnit.frameType = frameType; qdu->decodeUnit.frameNumber = frameNumber; qdu->decodeUnit.frameHostProcessingLatency = frameHostProcessingLatency; - qdu->decodeUnit.receiveTimeMs = firstPacketReceiveTime; + qdu->decodeUnit.receiveTimeUs = firstPacketReceiveTimeUs; qdu->decodeUnit.presentationTimeMs = firstPacketPresentationTime; - qdu->decodeUnit.enqueueTimeMs = LiGetMillis(); + qdu->decodeUnit.enqueueTimeUs = PltGetMicroseconds(); // These might be wrong for a few frames during a transition between SDR and HDR, // but the effects shouldn't very noticable since that's an infrequent operation. @@ -714,16 +714,16 @@ static void processAvcHevcRtpPayloadSlow(PBUFFER_DESC currentPos, PLENTRY_INTERN void requestDecoderRefresh(void) { // Wait for the next IDR frame waitingForIdrFrame = true; - + // Flush the decode unit queue freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue)); - + // Request the receive thread drop its state // on the next call. We can't do it here because // it may be trying to queue DUs and we'll nuke // the state out from under it. dropStatePending = true; - + // Request the IDR frame LiRequestIdrFrame(); } @@ -740,7 +740,7 @@ static bool isFirstPacket(uint8_t flags, uint8_t fecBlockNumber) { // Process an RTP Payload // The caller will free *existingEntry unless we NULL it static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, - uint64_t receiveTimeMs, unsigned int presentationTimeMs, + uint64_t receiveTimeUs, uint64_t presentationTimeMs, PLENTRY_INTERNAL* existingEntry) { BUFFER_DESC currentPos; uint32_t frameIndex; @@ -768,7 +768,7 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, LC_ASSERT_VT((flags & ~(FLAG_SOF | FLAG_EOF | FLAG_CONTAINS_PIC_DATA)) == 0); streamPacketIndex = videoPacket->streamPacketIndex; - + // Drop packets from a previously corrupt frame if (isBefore32(frameIndex, nextFrameNumber)) { return; @@ -791,10 +791,10 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, } return; } - + // Verify that we didn't receive an incomplete frame LC_ASSERT(firstPacket ^ decodingFrame); - + // Check sequencing of this frame to ensure we didn't // miss one in between if (firstPacket) { @@ -823,19 +823,19 @@ static void processRtpPayload(PNV_VIDEO_PACKET videoPacket, int length, // We're now decoding a frame decodingFrame = true; frameType = FRAME_TYPE_PFRAME; - firstPacketReceiveTime = receiveTimeMs; - + firstPacketReceiveTimeUs = receiveTimeUs; + // Some versions of Sunshine don't send a valid PTS, so we will // synthesize one using the receive time as the time base. - if (!syntheticPtsBase) { - syntheticPtsBase = receiveTimeMs; + if (!syntheticPtsBaseUs) { + syntheticPtsBaseUs = receiveTimeUs; } - + if (!presentationTimeMs && frameIndex > 0) { - firstPacketPresentationTime = (unsigned int)(receiveTimeMs - syntheticPtsBase); + firstPacketPresentationTime = (unsigned int)((receiveTimeUs - syntheticPtsBaseUs) / 1000); } else { - firstPacketPresentationTime = presentationTimeMs; + firstPacketPresentationTime = (unsigned int)presentationTimeMs; } } @@ -1154,7 +1154,7 @@ void queueRtpPacket(PRTPV_QUEUE_ENTRY queueEntryPtr) { RTPV_QUEUE_ENTRY queueEntry = *queueEntryPtr; LC_ASSERT(!queueEntry.isParity); - LC_ASSERT(queueEntry.receiveTimeMs != 0); + LC_ASSERT(queueEntry.receiveTimeUs != 0); dataOffset = sizeof(*queueEntry.packet); if (queueEntry.packet->header & FLAG_EXTENSION) { @@ -1173,7 +1173,7 @@ void queueRtpPacket(PRTPV_QUEUE_ENTRY queueEntryPtr) { processRtpPayload((PNV_VIDEO_PACKET)(((char*)queueEntry.packet) + dataOffset), queueEntry.length - dataOffset, - queueEntry.receiveTimeMs, + queueEntry.receiveTimeUs, queueEntry.presentationTimeMs, &existingEntry); diff --git a/src/VideoStream.c b/src/VideoStream.c index a2def84..402b056 100644 --- a/src/VideoStream.c +++ b/src/VideoStream.c @@ -154,7 +154,7 @@ static void VideoReceiveThreadProc(void* context) { break; } } - + // Receive timed out; try again continue; } @@ -168,9 +168,7 @@ static void VideoReceiveThreadProc(void* context) { #ifndef LC_FUZZING if (!receivedFullFrame) { - uint64_t now = PltGetMillis(); - - if (now - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { + if (PltGetMillis() - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { Limelog("Terminating connection due to lack of a successful video frame\n"); ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_FRAME); break; @@ -286,7 +284,7 @@ void stopVideoStream(void) { // Wake up client code that may be waiting on the decode unit queue stopVideoDepacketizer(); - + PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { @@ -302,7 +300,7 @@ void stopVideoStream(void) { if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltJoinThread(&decoderThread); } - + if (firstFrameSocket != INVALID_SOCKET) { closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; @@ -415,3 +413,7 @@ int startVideoStream(void* rendererContext, int drFlags) { return 0; } + +const RTP_VIDEO_STATS* LiGetRTPVideoStats(void) { + return &rtpQueue.stats; +}