#include "Limelight-internal.h" #define FIRST_FRAME_MAX 1500 #define FIRST_FRAME_TIMEOUT_SEC 10 #define FIRST_FRAME_PORT 47996 #define RTP_RECV_BUFFER (512 * 1024) static RTP_VIDEO_QUEUE rtpQueue; static SOCKET rtpSocket = INVALID_SOCKET; static SOCKET firstFrameSocket = INVALID_SOCKET; static PLT_THREAD udpPingThread; static PLT_THREAD receiveThread; static PLT_THREAD decoderThread; static bool receivedDataFromPeer; static uint64_t firstDataTimeMs; static bool receivedFullFrame; // We can't request an IDR frame until the depacketizer knows // that a packet was lost. This timeout bounds the time that // the RTP queue will wait for missing/reordered packets. #define RTP_QUEUE_DELAY 10 // Initialize the video stream void initializeVideoStream(void) { initializeVideoDepacketizer(StreamConfig.packetSize); RtpvInitializeQueue(&rtpQueue); receivedDataFromPeer = false; firstDataTimeMs = 0; receivedFullFrame = false; } // Clean up the video stream void destroyVideoStream(void) { destroyVideoDepacketizer(); RtpvCleanupQueue(&rtpQueue); } // UDP Ping proc static void VideoPingThreadProc(void* context) { char pingData[] = { 0x50, 0x49, 0x4E, 0x47 }; LC_SOCKADDR saddr; memcpy(&saddr, &RemoteAddr, sizeof(saddr)); SET_PORT(&saddr, VideoPortNumber); while (!PltIsThreadInterrupted(&udpPingThread)) { // We do not check for errors here. Socket errors will be handled // on the read-side in ReceiveThreadProc(). This avoids potential // issues related to receiving ICMP port unreachable messages due // to sending a packet prior to the host PC binding to that port. sendto(rtpSocket, pingData, sizeof(pingData), 0, (struct sockaddr*)&saddr, RemoteAddrLen); PltSleepMsInterruptible(&udpPingThread, 500); } } // Receive thread proc static void VideoReceiveThreadProc(void* context) { int err; int bufferSize, receiveSize; char* buffer; int queueStatus; bool useSelect; int waitingForVideoMs; receiveSize = StreamConfig.packetSize + MAX_RTP_HEADER_SIZE; bufferSize = receiveSize + sizeof(RTPV_QUEUE_ENTRY); buffer = NULL; if (setNonFatalRecvTimeoutMs(rtpSocket, UDP_RECV_POLL_TIMEOUT_MS) < 0) { // SO_RCVTIMEO failed, so use select() to wait useSelect = true; } else { // SO_RCVTIMEO timeout set for recv() useSelect = false; } waitingForVideoMs = 0; while (!PltIsThreadInterrupted(&receiveThread)) { PRTP_PACKET packet; if (buffer == NULL) { buffer = (char*)malloc(bufferSize); if (buffer == NULL) { Limelog("Video Receive: malloc() failed\n"); ListenerCallbacks.connectionTerminated(-1); return; } } err = recvUdpSocket(rtpSocket, buffer, receiveSize, useSelect); if (err < 0) { Limelog("Video Receive: recvUdpSocket() failed: %d\n", (int)LastSocketError()); ListenerCallbacks.connectionTerminated(LastSocketFail()); break; } else if (err == 0) { if (!receivedDataFromPeer) { // If we wait many seconds without ever receiving a video packet, // assume something is broken and terminate the connection. waitingForVideoMs += UDP_RECV_POLL_TIMEOUT_MS; if (waitingForVideoMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { Limelog("Terminating connection due to lack of video traffic\n"); ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_TRAFFIC); break; } } // Receive timed out; try again continue; } if (!receivedDataFromPeer) { receivedDataFromPeer = true; Limelog("Received first video packet after %d ms\n", waitingForVideoMs); firstDataTimeMs = PltGetMillis(); } if (!receivedFullFrame) { uint64_t now = PltGetMillis(); if (now - firstDataTimeMs >= FIRST_FRAME_TIMEOUT_SEC * 1000) { Limelog("Terminating connection due to lack of a successful video frame\n"); ListenerCallbacks.connectionTerminated(ML_ERROR_NO_VIDEO_FRAME); break; } } // Convert fields to host byte-order packet = (PRTP_PACKET)&buffer[0]; packet->sequenceNumber = BE16(packet->sequenceNumber); packet->timestamp = BE32(packet->timestamp); packet->ssrc = BE32(packet->ssrc); queueStatus = RtpvAddPacket(&rtpQueue, packet, err, (PRTPV_QUEUE_ENTRY)&buffer[receiveSize]); if (queueStatus == RTPF_RET_QUEUED) { // The queue owns the buffer buffer = NULL; } } if (buffer != NULL) { free(buffer); } } void notifyKeyFrameReceived(void) { // Remember that we got a full frame successfully receivedFullFrame = true; } // Decoder thread proc static void VideoDecoderThreadProc(void* context) { while (!PltIsThreadInterrupted(&decoderThread)) { VIDEO_FRAME_HANDLE frameHandle; PDECODE_UNIT decodeUnit; if (!LiWaitForNextVideoFrame(&frameHandle, &decodeUnit)) { return; } LiCompleteVideoFrame(frameHandle, VideoCallbacks.submitDecodeUnit(decodeUnit)); } } // Read the first frame of the video stream int readFirstFrame(void) { // All that matters is that we close this socket. // This starts the flow of video on Gen 3 servers. closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; return 0; } // Terminate the video stream void stopVideoStream(void) { if (!receivedDataFromPeer) { Limelog("No video traffic was ever received from the host!\n"); } VideoCallbacks.stop(); // Wake up client code that may be waiting on the decode unit queue stopVideoDepacketizer(); PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltInterruptThread(&decoderThread); } if (firstFrameSocket != INVALID_SOCKET) { shutdownTcpSocket(firstFrameSocket); } PltJoinThread(&udpPingThread); PltJoinThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltJoinThread(&decoderThread); } PltCloseThread(&udpPingThread); PltCloseThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltCloseThread(&decoderThread); } if (firstFrameSocket != INVALID_SOCKET) { closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; } if (rtpSocket != INVALID_SOCKET) { closeSocket(rtpSocket); rtpSocket = INVALID_SOCKET; } VideoCallbacks.cleanup(); } // Start the video stream int startVideoStream(void* rendererContext, int drFlags) { int err; firstFrameSocket = INVALID_SOCKET; // This must be called before the decoder thread starts submitting // decode units LC_ASSERT(NegotiatedVideoFormat != 0); err = VideoCallbacks.setup(NegotiatedVideoFormat, StreamConfig.width, StreamConfig.height, StreamConfig.fps, rendererContext, drFlags); if (err != 0) { return err; } rtpSocket = bindUdpSocket(RemoteAddr.ss_family, RTP_RECV_BUFFER); if (rtpSocket == INVALID_SOCKET) { VideoCallbacks.cleanup(); return LastSocketError(); } VideoCallbacks.start(); err = PltCreateThread("VideoRecv", VideoReceiveThreadProc, NULL, &receiveThread); if (err != 0) { VideoCallbacks.stop(); closeSocket(rtpSocket); VideoCallbacks.cleanup(); return err; } if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { err = PltCreateThread("VideoDec", VideoDecoderThreadProc, NULL, &decoderThread); if (err != 0) { VideoCallbacks.stop(); PltInterruptThread(&receiveThread); PltJoinThread(&receiveThread); PltCloseThread(&receiveThread); closeSocket(rtpSocket); VideoCallbacks.cleanup(); return err; } } if (AppVersionQuad[0] == 3) { // Connect this socket to open port 47998 for our ping thread firstFrameSocket = connectTcpSocket(&RemoteAddr, RemoteAddrLen, FIRST_FRAME_PORT, FIRST_FRAME_TIMEOUT_SEC); if (firstFrameSocket == INVALID_SOCKET) { VideoCallbacks.stop(); stopVideoDepacketizer(); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltInterruptThread(&decoderThread); } PltJoinThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltJoinThread(&decoderThread); } PltCloseThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltCloseThread(&decoderThread); } closeSocket(rtpSocket); VideoCallbacks.cleanup(); return LastSocketError(); } } // Start pinging before reading the first frame so GFE knows where // to send UDP data err = PltCreateThread("VideoPing", VideoPingThreadProc, NULL, &udpPingThread); if (err != 0) { VideoCallbacks.stop(); stopVideoDepacketizer(); PltInterruptThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltInterruptThread(&decoderThread); } PltJoinThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltJoinThread(&decoderThread); } PltCloseThread(&receiveThread); if ((VideoCallbacks.capabilities & (CAPABILITY_DIRECT_SUBMIT | CAPABILITY_PULL_RENDERER)) == 0) { PltCloseThread(&decoderThread); } closeSocket(rtpSocket); if (firstFrameSocket != INVALID_SOCKET) { closeSocket(firstFrameSocket); firstFrameSocket = INVALID_SOCKET; } VideoCallbacks.cleanup(); return err; } if (AppVersionQuad[0] == 3) { // Read the first frame to start the flow of video err = readFirstFrame(); if (err != 0) { stopVideoStream(); return err; } } return 0; }