Merge pull request #9 from irtimmer/combineReceiveDecodeThread

Combine receive and decode thread
This commit is contained in:
Cameron Gutman 2015-08-11 19:12:14 -07:00
commit bc57d54e54
4 changed files with 100 additions and 46 deletions

View File

@ -13,6 +13,8 @@ static PLT_THREAD udpPingThread;
static PLT_THREAD receiveThread;
static PLT_THREAD decoderThread;
static unsigned short lastSeq = 0;
#define RTP_PORT 48000
#define MAX_PACKET_SIZE 100
@ -30,7 +32,9 @@ typedef struct _QUEUED_AUDIO_PACKET {
/* Initialize the audio stream */
void initializeAudioStream(void) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
LbqInitializeLinkedBlockingQueue(&packetQueue, 30);
}
RtpqInitializeQueue(&rtpReorderQueue, RTPQ_DEFAULT_MAX_SIZE, RTPQ_DEFUALT_QUEUE_TIME);
}
@ -49,7 +53,9 @@ static void freePacketList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
/* Tear down the audio stream once we're done with it */
void destroyAudioStream(void) {
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
freePacketList(LbqDestroyLinkedBlockingQueue(&packetQueue));
}
RtpqCleanupQueue(&rtpReorderQueue);
}
@ -95,6 +101,21 @@ static int queuePacketToLbq(PQUEUED_AUDIO_PACKET *packet) {
return 1;
}
static void decodeInputData(PQUEUED_AUDIO_PACKET packet) {
PRTP_PACKET rtp;
rtp = (PRTP_PACKET) &packet->data[0];
if (lastSeq != 0 && (unsigned short) (lastSeq + 1) != rtp->sequenceNumber) {
Limelog("Received OOS audio data (expected %d, but got %d)\n", lastSeq + 1, rtp->sequenceNumber);
AudioCallbacks.decodeAndPlaySample(NULL, 0);
}
lastSeq = rtp->sequenceNumber;
AudioCallbacks.decodeAndPlaySample((char *) (rtp + 1), packet->size - sizeof(*rtp));
}
static void ReceiveThreadProc(void* context) {
PRTP_PACKET rtp;
PQUEUED_AUDIO_PACKET packet;
@ -136,9 +157,13 @@ static void ReceiveThreadProc(void* context) {
queueStatus = RtpqAddPacket(&rtpReorderQueue, (PRTP_PACKET) packet, &packet->q.rentry);
if (queueStatus == RTPQ_RET_HANDLE_IMMEDIATELY) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
}
} else {
decodeInputData(packet);
}
}
else {
@ -150,9 +175,13 @@ static void ReceiveThreadProc(void* context) {
if (queueStatus == RTPQ_RET_QUEUED_PACKETS_READY) {
// If packets are ready, pull them and send them to the decoder
while ((packet = (PQUEUED_AUDIO_PACKET) RtpqGetQueuedPacket(&rtpReorderQueue)) != NULL) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (!queuePacketToLbq(&packet)) {
// An exit signal was received
return;
}
} else {
decodeInputData(packet);
}
}
}
@ -164,7 +193,6 @@ static void DecoderThreadProc(void* context) {
PRTP_PACKET rtp;
int err;
PQUEUED_AUDIO_PACKET packet;
unsigned short lastSeq = 0;
while (!PltIsThreadInterrupted(&decoderThread)) {
err = LbqWaitForQueueElement(&packetQueue, (void**) &packet);
@ -173,16 +201,7 @@ static void DecoderThreadProc(void* context) {
return;
}
rtp = (PRTP_PACKET) &packet->data[0];
if (lastSeq != 0 && (unsigned short) (lastSeq + 1) != rtp->sequenceNumber) {
Limelog("Received OOS audio data (expected %d, but got %d)\n", lastSeq + 1, rtp->sequenceNumber);
AudioCallbacks.decodeAndPlaySample(NULL, 0);
}
lastSeq = rtp->sequenceNumber;
AudioCallbacks.decodeAndPlaySample((char *) (rtp + 1), packet->size - sizeof(*rtp));
decodeInputData(packet);
free(packet);
}
@ -191,7 +210,9 @@ static void DecoderThreadProc(void* context) {
void stopAudioStream(void) {
PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread);
PltInterruptThread(&decoderThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltInterruptThread(&decoderThread);
}
if (rtpSocket != INVALID_SOCKET) {
closesocket(rtpSocket);
@ -200,11 +221,15 @@ void stopAudioStream(void) {
PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread);
PltJoinThread(&decoderThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&udpPingThread);
PltCloseThread(&receiveThread);
PltCloseThread(&decoderThread);
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltCloseThread(&decoderThread);
}
AudioCallbacks.cleanup();
}
@ -229,9 +254,11 @@ int startAudioStream(void) {
return err;
}
err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
return err;
if ((AudioCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
return err;
}
}
return 0;

View File

@ -52,6 +52,8 @@ typedef struct _DECODE_UNIT {
PLENTRY bufferList;
} DECODE_UNIT, *PDECODE_UNIT;
#define CAPABILITY_DIRECT_SUBMIT 0x1
// This callback is invoked to provide details about the video stream and allow configuration of the decoder
typedef void(*DecoderRendererSetup)(int width, int height, int redrawRate, void* context, int drFlags);
@ -69,6 +71,7 @@ typedef struct _DECODER_RENDERER_CALLBACKS {
DecoderRendererSetup setup;
DecoderRendererCleanup cleanup;
DecoderRendererSubmitDecodeUnit submitDecodeUnit;
int capabilities;
} DECODER_RENDERER_CALLBACKS, *PDECODER_RENDERER_CALLBACKS;
// This callback initializes the audio renderer
@ -84,6 +87,7 @@ typedef struct _AUDIO_RENDERER_CALLBACKS {
AudioRendererInit init;
AudioRendererCleanup cleanup;
AudioRendererDecodeAndPlaySample decodeAndPlaySample;
int capabilities;
} AUDIO_RENDERER_CALLBACKS, *PAUDIO_RENDERER_CALLBACKS;
// Subject to change in future releases

View File

@ -29,7 +29,9 @@ typedef struct _BUFFER_DESC {
/* Init */
void initializeVideoDepacketizer(int pktSize) {
LbqInitializeLinkedBlockingQueue(&decodeUnitQueue, 15);
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
LbqInitializeLinkedBlockingQueue(&decodeUnitQueue, 15);
}
nominalPacketDataLength = pktSize - sizeof(NV_VIDEO_PACKET);
nextFrameNumber = 1;
@ -91,7 +93,9 @@ static void freeDecodeUnitList(PLINKED_BLOCKING_QUEUE_ENTRY entry) {
/* Cleanup video depacketizer and free malloced memory */
void destroyVideoDepacketizer(void) {
freeDecodeUnitList(LbqDestroyLinkedBlockingQueue(&decodeUnitQueue));
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
freeDecodeUnitList(LbqDestroyLinkedBlockingQueue(&decodeUnitQueue));
}
cleanupAvcFrameState();
}
@ -183,23 +187,34 @@ static void reassembleAvcFrame(int frameNumber) {
nalChainHead = NULL;
nalChainDataLength = 0;
if (LbqOfferQueueItem(&decodeUnitQueue, qdu, &qdu->entry) == LBQ_BOUND_EXCEEDED) {
Limelog("Video decode unit queue overflow\n");
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
if (LbqOfferQueueItem(&decodeUnitQueue, qdu, &qdu->entry) == LBQ_BOUND_EXCEEDED) {
Limelog("Video decode unit queue overflow\n");
// Clear frame state and wait for an IDR
nalChainHead = qdu->decodeUnit.bufferList;
nalChainDataLength = qdu->decodeUnit.fullLength;
dropAvcFrameState();
// Clear frame state and wait for an IDR
nalChainHead = qdu->decodeUnit.bufferList;
nalChainDataLength = qdu->decodeUnit.fullLength;
dropAvcFrameState();
// Free the DU
free(qdu);
// Free the DU
free(qdu);
// Flush the decode unit queue
freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue));
// Flush the decode unit queue
freeDecodeUnitList(LbqFlushQueueItems(&decodeUnitQueue));
// FIXME: Get proper lower bound
connectionSinkTooSlow(0, frameNumber);
return;
// FIXME: Get proper lower bound
connectionSinkTooSlow(0, frameNumber);
return;
}
} else {
int ret = VideoCallbacks.submitDecodeUnit(&qdu->decodeUnit);
freeQueuedDecodeUnit(qdu);
if (ret == DR_NEED_IDR) {
Limelog("Request IDR frame on behalf of DR\n");
requestIdrOnDemand();
}
}
// Notify the control connection

View File

@ -150,7 +150,9 @@ int readFirstFrame(void) {
void stopVideoStream(void) {
PltInterruptThread(&udpPingThread);
PltInterruptThread(&receiveThread);
PltInterruptThread(&decoderThread);
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltInterruptThread(&decoderThread);
}
if (firstFrameSocket != INVALID_SOCKET) {
closesocket(firstFrameSocket);
@ -163,11 +165,15 @@ void stopVideoStream(void) {
PltJoinThread(&udpPingThread);
PltJoinThread(&receiveThread);
PltJoinThread(&decoderThread);
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltJoinThread(&decoderThread);
}
PltCloseThread(&udpPingThread);
PltCloseThread(&receiveThread);
PltCloseThread(&decoderThread);
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
PltCloseThread(&decoderThread);
}
VideoCallbacks.cleanup();
}
@ -191,9 +197,11 @@ int startVideoStream(void* rendererContext, int drFlags) {
return err;
}
err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
return err;
if ((VideoCallbacks.capabilities & CAPABILITY_DIRECT_SUBMIT) == 0) {
err = PltCreateThread(DecoderThreadProc, NULL, &decoderThread);
if (err != 0) {
return err;
}
}
if (ServerMajorVersion == 3) {