diff --git a/limelight-common/VideoStream.c b/limelight-common/VideoStream.c index 6629dbd..28172ca 100644 --- a/limelight-common/VideoStream.c +++ b/limelight-common/VideoStream.c @@ -16,11 +16,8 @@ static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks; static SOCKET rtpSocket = INVALID_SOCKET; static SOCKET firstFrameSocket = INVALID_SOCKET; -static LINKED_BLOCKING_QUEUE packetQueue; - static PLT_THREAD udpPingThread; static PLT_THREAD receiveThread; -static PLT_THREAD depacketizerThread; static PLT_THREAD decoderThread; void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, @@ -30,26 +27,13 @@ void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, remoteHost = host; listenerCallbacks = clCallbacks; - LbqInitializeLinkedBlockingQueue(&packetQueue, 30); - initializeVideoDepacketizer(configuration.packetSize); } void destroyVideoStream(void) { - PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry; - callbacks.release(); destroyVideoDepacketizer(); - - entry = LbqDestroyLinkedBlockingQueue(&packetQueue); - - while (entry != NULL) { - nextEntry = entry->flink; - free(entry->data); - free(entry); - entry = nextEntry; - } } static void UdpPingThreadProc(void *context) { @@ -77,56 +61,26 @@ static void UdpPingThreadProc(void *context) { static void ReceiveThreadProc(void* context) { int err; - while (!PltIsThreadInterrupted(&receiveThread)) { - char* buffer = (char*) malloc(1500 + sizeof(int)); - if (buffer == NULL) { - Limelog("Receive thread terminating\n"); - listenerCallbacks->connectionTerminated(-1); - return; - } + char* buffer = (char*) malloc(configuration.packetSize); + if (buffer == NULL) { + Limelog("Receive thread terminating\n"); + listenerCallbacks->connectionTerminated(-1); + return; + } - err = recv(rtpSocket, &buffer[sizeof(int)], 1500, 0); + while (!PltIsThreadInterrupted(&receiveThread)) { + err = recv(rtpSocket, buffer, configuration.packetSize, 0); if (err <= 0) { Limelog("Receive thread terminating #2\n"); - free(buffer); listenerCallbacks->connectionTerminated(LastSocketError()); - return; - } - - memcpy(buffer, &err, sizeof(err)); - - err = LbqOfferQueueItem(&packetQueue, buffer); - if (err != LBQ_SUCCESS) { - free(buffer); + break; } - if (err == LBQ_BOUND_EXCEEDED) { - Limelog("Video packet queue overflow\n"); - } - else if (err == LBQ_INTERRUPTED) { - Limelog("Receive thread terminating #2\n"); - return; - } + // queueRtpPacket() copies the data it needs to we can reuse the buffer + queueRtpPacket((PRTP_PACKET) buffer, err); } -} -static void DepacketizerThreadProc(void* context) { - int length; - int err; - char *data; - - while (!PltIsThreadInterrupted(&depacketizerThread)) { - err = LbqWaitForQueueElement(&packetQueue, (void**)&data); - if (err != LBQ_SUCCESS) { - Limelog("Depacketizer thread terminating\n"); - return; - } - - memcpy(&length, data, sizeof(int)); - queueRtpPacket((PRTP_PACKET) &data[sizeof(int)], length); - - free(data); - } + free(buffer); } static void DecoderThreadProc(void* context) { @@ -179,7 +133,6 @@ void stopVideoStream(void) { PltInterruptThread(&udpPingThread); PltInterruptThread(&receiveThread); - PltInterruptThread(&depacketizerThread); PltInterruptThread(&decoderThread); if (firstFrameSocket != INVALID_SOCKET) { @@ -193,12 +146,10 @@ void stopVideoStream(void) { PltJoinThread(&udpPingThread); PltJoinThread(&receiveThread); - PltJoinThread(&depacketizerThread); PltJoinThread(&decoderThread); PltCloseThread(&udpPingThread); PltCloseThread(&receiveThread); - PltCloseThread(&depacketizerThread); PltCloseThread(&decoderThread); } @@ -225,11 +176,6 @@ int startVideoStream(void* rendererContext, int drFlags) { return err; } - err = PltCreateThread(DepacketizerThreadProc, NULL, &depacketizerThread); - if (err != 0) { - return err; - } - err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread); if (err != 0) { return err;