Remove the depacketizer thread

This commit is contained in:
Cameron Gutman 2014-06-29 00:06:54 -07:00
parent 8c8fcc7cbe
commit 97710478de

View File

@ -16,11 +16,8 @@ static PCONNECTION_LISTENER_CALLBACKS listenerCallbacks;
static SOCKET rtpSocket = INVALID_SOCKET; static SOCKET rtpSocket = INVALID_SOCKET;
static SOCKET firstFrameSocket = INVALID_SOCKET; static SOCKET firstFrameSocket = INVALID_SOCKET;
static LINKED_BLOCKING_QUEUE packetQueue;
static PLT_THREAD udpPingThread; static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread; static PLT_THREAD receiveThread;
static PLT_THREAD depacketizerThread;
static PLT_THREAD decoderThread; static PLT_THREAD decoderThread;
void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks, void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig, PDECODER_RENDERER_CALLBACKS drCallbacks,
@ -30,26 +27,13 @@ void initializeVideoStream(IP_ADDRESS host, PSTREAM_CONFIGURATION streamConfig,
remoteHost = host; remoteHost = host;
listenerCallbacks = clCallbacks; listenerCallbacks = clCallbacks;
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
initializeVideoDepacketizer(configuration.packetSize); initializeVideoDepacketizer(configuration.packetSize);
} }
void destroyVideoStream(void) { void destroyVideoStream(void) {
PLINKED_BLOCKING_QUEUE_ENTRY entry, nextEntry;
callbacks.release(); callbacks.release();
destroyVideoDepacketizer(); destroyVideoDepacketizer();
entry = LbqDestroyLinkedBlockingQueue(&packetQueue);
while (entry != NULL) {
nextEntry = entry->flink;
free(entry->data);
free(entry);
entry = nextEntry;
}
} }
static void UdpPingThreadProc(void *context) { static void UdpPingThreadProc(void *context) {
@ -77,56 +61,26 @@ static void UdpPingThreadProc(void *context) {
static void ReceiveThreadProc(void* context) { static void ReceiveThreadProc(void* context) {
int err; int err;
while (!PltIsThreadInterrupted(&receiveThread)) { char* buffer = (char*) malloc(configuration.packetSize);
char* buffer = (char*) malloc(1500 + sizeof(int)); if (buffer == NULL) {
if (buffer == NULL) { Limelog("Receive thread terminating\n");
Limelog("Receive thread terminating\n"); listenerCallbacks->connectionTerminated(-1);
listenerCallbacks->connectionTerminated(-1); return;
return; }
}
err = recv(rtpSocket, &buffer[sizeof(int)], 1500, 0); while (!PltIsThreadInterrupted(&receiveThread)) {
err = recv(rtpSocket, buffer, configuration.packetSize, 0);
if (err <= 0) { if (err <= 0) {
Limelog("Receive thread terminating #2\n"); Limelog("Receive thread terminating #2\n");
free(buffer);
listenerCallbacks->connectionTerminated(LastSocketError()); listenerCallbacks->connectionTerminated(LastSocketError());
return; break;
}
memcpy(buffer, &err, sizeof(err));
err = LbqOfferQueueItem(&packetQueue, buffer);
if (err != LBQ_SUCCESS) {
free(buffer);
} }
if (err == LBQ_BOUND_EXCEEDED) { // queueRtpPacket() copies the data it needs to we can reuse the buffer
Limelog("Video packet queue overflow\n"); queueRtpPacket((PRTP_PACKET) buffer, err);
}
else if (err == LBQ_INTERRUPTED) {
Limelog("Receive thread terminating #2\n");
return;
}
} }
}
static void DepacketizerThreadProc(void* context) { free(buffer);
int length;
int err;
char *data;
while (!PltIsThreadInterrupted(&depacketizerThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**)&data);
if (err != LBQ_SUCCESS) {
Limelog("Depacketizer thread terminating\n");
return;
}
memcpy(&length, data, sizeof(int));
queueRtpPacket((PRTP_PACKET) &data[sizeof(int)], length);
free(data);
}
} }
static void DecoderThreadProc(void* context) { static void DecoderThreadProc(void* context) {
@ -179,7 +133,6 @@ void stopVideoStream(void) {
PltInterruptThread(&udpPingThread); PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread); PltInterruptThread(&receiveThread);
PltInterruptThread(&depacketizerThread);
PltInterruptThread(&decoderThread); PltInterruptThread(&decoderThread);
if (firstFrameSocket != INVALID_SOCKET) { if (firstFrameSocket != INVALID_SOCKET) {
@ -193,12 +146,10 @@ void stopVideoStream(void) {
PltJoinThread(&udpPingThread); PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread); PltJoinThread(&receiveThread);
PltJoinThread(&depacketizerThread);
PltJoinThread(&decoderThread); PltJoinThread(&decoderThread);
PltCloseThread(&udpPingThread); PltCloseThread(&udpPingThread);
PltCloseThread(&receiveThread); PltCloseThread(&receiveThread);
PltCloseThread(&depacketizerThread);
PltCloseThread(&decoderThread); PltCloseThread(&decoderThread);
} }
@ -225,11 +176,6 @@ int startVideoStream(void* rendererContext, int drFlags) {
return err; return err;
} }
err = PltCreateThread(DepacketizerThreadProc, NULL, &depacketizerThread);
if (err != 0) {
return err;
}
err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread); err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread);
if (err != 0) { if (err != 0) {
return err; return err;