diff --git a/src/ControlStream.c b/src/ControlStream.c index c8e21b5..3476d71 100644 --- a/src/ControlStream.c +++ b/src/ControlStream.c @@ -38,6 +38,34 @@ typedef struct _QUEUED_FRAME_FEC_STATUS { LINKED_BLOCKING_QUEUE_ENTRY entry; } QUEUED_FRAME_FEC_STATUS, *PQUEUED_FRAME_FEC_STATUS; +typedef struct _QUEUED_ASYNC_CALLBACK { + int typeIndex; + union { + struct { + uint16_t controllerNumber; + uint16_t lowFreqRumble; + uint16_t highFreqRumble; + } rumble; + struct { + uint16_t controllerNumber; + uint16_t leftTriggerMotor; + uint16_t rightTriggerMotor; + } rumbleTriggers; + struct { + uint16_t controllerNumber; + uint16_t reportRateHz; + uint8_t motionType; + } setMotionEventState; + struct { + uint16_t controllerNumber; + uint8_t r; + uint8_t g; + uint8_t b; + } setControllerLed; + } data; + LINKED_BLOCKING_QUEUE_ENTRY entry; +} QUEUED_ASYNC_CALLBACK, *PQUEUED_ASYNC_CALLBACK; + static SOCKET ctlSock = INVALID_SOCKET; static ENetHost* client; static ENetPeer* peer; @@ -48,6 +76,7 @@ static PLT_THREAD lossStatsThread; static PLT_THREAD invalidateRefFramesThread; static PLT_THREAD requestIdrFrameThread; static PLT_THREAD controlReceiveThread; +static PLT_THREAD asyncCallbackThread; static int lossCountSinceLastReport; static int lastGoodFrame; static int lastSeenFrame; @@ -67,6 +96,7 @@ static uint64_t firstFrameTimeMs; static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples; static LINKED_BLOCKING_QUEUE frameFecStatusQueue; +static LINKED_BLOCKING_QUEUE asyncCallbackQueue; static PLT_EVENT idrFrameRequiredEvent; static PPLT_CRYPTO_CONTEXT encryptionCtx; @@ -251,6 +281,7 @@ int initializeControlStream(void) { PltCreateEvent(&idrFrameRequiredEvent); LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20); LbqInitializeLinkedBlockingQueue(&frameFecStatusQueue, 8); // Limits number of frame status reports per periodic ping interval + LbqInitializeLinkedBlockingQueue(&asyncCallbackQueue, 30); PltCreateMutex(&enetMutex); encryptedControlStream = APP_VERSION_AT_LEAST(7, 1, 431); @@ -326,7 +357,8 @@ void destroyControlStream(void) { PltCloseEvent(&idrFrameRequiredEvent); freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples)); freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&frameFecStatusQueue)); - + freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&asyncCallbackQueue)); + PltDeleteMutex(&enetMutex); } @@ -786,6 +818,118 @@ static int ignoreDisconnectIntercept(ENetHost* host, ENetEvent* event) { return 0; } +static void asyncCallbackThreadFunc(void* context) { + PQUEUED_ASYNC_CALLBACK queuedCb; + + while (LbqWaitForQueueElement(&asyncCallbackQueue, (void**)&queuedCb) == LBQ_SUCCESS) { + switch (queuedCb->typeIndex) { + case IDX_RUMBLE_DATA: + ListenerCallbacks.rumble(queuedCb->data.rumble.controllerNumber, + queuedCb->data.rumble.lowFreqRumble, + queuedCb->data.rumble.highFreqRumble); + break; + case IDX_RUMBLE_TRIGGER_DATA: + ListenerCallbacks.rumbleTriggers(queuedCb->data.rumbleTriggers.controllerNumber, + queuedCb->data.rumbleTriggers.leftTriggerMotor, + queuedCb->data.rumbleTriggers.rightTriggerMotor); + break; + case IDX_SET_MOTION_EVENT: + ListenerCallbacks.setMotionEventState(queuedCb->data.setMotionEventState.controllerNumber, + queuedCb->data.setMotionEventState.motionType, + queuedCb->data.setMotionEventState.reportRateHz); + break; + case IDX_SET_RGB_LED: + ListenerCallbacks.setControllerLED(queuedCb->data.setControllerLed.controllerNumber, + queuedCb->data.setControllerLed.r, + queuedCb->data.setControllerLed.g, + queuedCb->data.setControllerLed.b); + break; + case IDX_HDR_INFO: + // HDR state is maintained globally, so we just invoke the client callback here. + // If the state has changed multiple times, the client could receive duplicate + // callbacks but that is fine. + ListenerCallbacks.setHdrMode(hdrEnabled); + break; + default: + // Unhandled packet type from queueAsyncCallback() + LC_ASSERT(false); + break; + } + + free(queuedCb); + } +} + +static bool needsAsyncCallback(unsigned short packetType) { + return packetType == packetTypes[IDX_RUMBLE_DATA] || + packetType == packetTypes[IDX_RUMBLE_TRIGGER_DATA] || + packetType == packetTypes[IDX_SET_MOTION_EVENT] || + packetType == packetTypes[IDX_SET_RGB_LED] || + packetType == packetTypes[IDX_HDR_INFO]; +} + +static void queueAsyncCallback(PNVCTL_ENET_PACKET_HEADER_V1 ctlHdr, int packetLength) { + BYTE_BUFFER bb; + PQUEUED_ASYNC_CALLBACK queuedCb; + int err; + + LC_ASSERT(needsAsyncCallback(ctlHdr->type)); + + queuedCb = malloc(sizeof(*queuedCb)); + if (!queuedCb) { + return; + } + + BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE); + + if (ctlHdr->type == packetTypes[IDX_RUMBLE_DATA]) { + BbAdvanceBuffer(&bb, 4); + + BbGet16(&bb, &queuedCb->data.rumble.controllerNumber); + BbGet16(&bb, &queuedCb->data.rumble.lowFreqRumble); + BbGet16(&bb, &queuedCb->data.rumble.highFreqRumble); + + queuedCb->typeIndex = IDX_RUMBLE_DATA; + } + else if (ctlHdr->type == packetTypes[IDX_RUMBLE_TRIGGER_DATA]) { + BbGet16(&bb, &queuedCb->data.rumbleTriggers.controllerNumber); + BbGet16(&bb, &queuedCb->data.rumbleTriggers.leftTriggerMotor); + BbGet16(&bb, &queuedCb->data.rumbleTriggers.rightTriggerMotor); + + queuedCb->typeIndex = IDX_RUMBLE_TRIGGER_DATA; + } + else if (ctlHdr->type == packetTypes[IDX_SET_MOTION_EVENT]) { + BbGet16(&bb, &queuedCb->data.setMotionEventState.controllerNumber); + BbGet16(&bb, &queuedCb->data.setMotionEventState.reportRateHz); + BbGet8(&bb, &queuedCb->data.setMotionEventState.motionType); + + queuedCb->typeIndex = IDX_SET_MOTION_EVENT; + } + else if (ctlHdr->type == packetTypes[IDX_SET_RGB_LED]) { + BbGet16(&bb, &queuedCb->data.setControllerLed.controllerNumber); + BbGet8(&bb, &queuedCb->data.setControllerLed.r); + BbGet8(&bb, &queuedCb->data.setControllerLed.g); + BbGet8(&bb, &queuedCb->data.setControllerLed.b); + + queuedCb->typeIndex = IDX_SET_RGB_LED; + } + else if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) { + queuedCb->typeIndex = IDX_HDR_INFO; + } + else { + // Unhandled packet type from needsAsyncCallback() + LC_ASSERT(false); + free(queuedCb); + return; + } + + err = LbqOfferQueueItem(&asyncCallbackQueue, queuedCb, &queuedCb->entry); + if (err != LBQ_SUCCESS) { + Limelog("Failed to queue async callback: %d\n", err); + free(queuedCb); + } +} + static void controlReceiveThreadFunc(void* context) { int err; @@ -929,68 +1073,9 @@ static void controlReceiveThreadFunc(void* context) { // All below codepaths must free ctlHdr!!! - if (ctlHdr->type == packetTypes[IDX_RUMBLE_DATA]) { - BYTE_BUFFER bb; - - BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE); - BbAdvanceBuffer(&bb, 4); - - uint16_t controllerNumber; - uint16_t lowFreqRumble; - uint16_t highFreqRumble; - - BbGet16(&bb, &controllerNumber); - BbGet16(&bb, &lowFreqRumble); - BbGet16(&bb, &highFreqRumble); - - ListenerCallbacks.rumble(controllerNumber, lowFreqRumble, highFreqRumble); - } - else if (ctlHdr->type == packetTypes[IDX_RUMBLE_TRIGGER_DATA]) { - BYTE_BUFFER bb; - - BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE); - - uint16_t controllerNumber; - uint16_t leftTriggerMotor; - uint16_t rightTriggerMotor; - - BbGet16(&bb, &controllerNumber); - BbGet16(&bb, &leftTriggerMotor); - BbGet16(&bb, &rightTriggerMotor); - - ListenerCallbacks.rumbleTriggers(controllerNumber, leftTriggerMotor, rightTriggerMotor); - } - else if (ctlHdr->type == packetTypes[IDX_SET_MOTION_EVENT]) { - BYTE_BUFFER bb; - - BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE); - - uint16_t controllerNumber; - uint16_t reportRateHz; - uint8_t motionType; - - BbGet16(&bb, &controllerNumber); - BbGet16(&bb, &reportRateHz); - BbGet8(&bb, &motionType); - - ListenerCallbacks.setMotionEventState(controllerNumber, motionType, reportRateHz); - } - else if (ctlHdr->type == packetTypes[IDX_SET_RGB_LED]) { - BYTE_BUFFER bb; - - BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE); - - uint16_t controllerNumber; - uint8_t r, g, b; - - BbGet16(&bb, &controllerNumber); - BbGet8(&bb, &r); - BbGet8(&bb, &g); - BbGet8(&bb, &b); - - ListenerCallbacks.setControllerLED(controllerNumber, r, g, b); - } - else if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) { + // Process HDR data immediately to update global HDR enabled state and HDR metadata. + // The actual client callback will be invoked in the async callback thread. + if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) { BYTE_BUFFER bb; uint8_t enableByte; @@ -1016,7 +1101,11 @@ static void controlReceiveThreadFunc(void* context) { } hdrEnabled = (enableByte != 0); - ListenerCallbacks.setHdrMode(hdrEnabled); + } + + // Process client callbacks in a separate thread + if (needsAsyncCallback(ctlHdr->type)) { + queueAsyncCallback(ctlHdr, packetLength); } else if (ctlHdr->type == packetTypes[IDX_TERMINATION]) { BYTE_BUFFER bb; @@ -1336,6 +1425,7 @@ int stopControlStream(void) { stopping = true; LbqSignalQueueShutdown(&invalidReferenceFrameTuples); LbqSignalQueueShutdown(&frameFecStatusQueue); + LbqSignalQueueDrain(&asyncCallbackQueue); PltSetEvent(&idrFrameRequiredEvent); // This must be set to stop in a timely manner @@ -1348,14 +1438,17 @@ int stopControlStream(void) { PltInterruptThread(&lossStatsThread); PltInterruptThread(&requestIdrFrameThread); PltInterruptThread(&controlReceiveThread); + PltInterruptThread(&asyncCallbackThread); PltJoinThread(&lossStatsThread); PltJoinThread(&requestIdrFrameThread); PltJoinThread(&controlReceiveThread); + PltJoinThread(&asyncCallbackThread); PltCloseThread(&lossStatsThread); PltCloseThread(&requestIdrFrameThread); PltCloseThread(&controlReceiveThread); + PltCloseThread(&asyncCallbackThread); // We will only have an RFI thread if RFI is enabled if (isReferenceFrameInvalidationEnabled()) { @@ -1673,12 +1766,51 @@ int startControlStream(void) { return err; } + err = PltCreateThread("CtrlAsyncCb", asyncCallbackThreadFunc, NULL, &asyncCallbackThread); + if (err != 0) { + stopping = true; + PltSetEvent(&idrFrameRequiredEvent); + + if (ctlSock != INVALID_SOCKET) { + shutdownTcpSocket(ctlSock); + } + else { + ConnectionInterrupted = true; + } + + PltInterruptThread(&lossStatsThread); + PltJoinThread(&lossStatsThread); + PltCloseThread(&lossStatsThread); + + PltInterruptThread(&controlReceiveThread); + PltJoinThread(&controlReceiveThread); + PltCloseThread(&controlReceiveThread); + + PltInterruptThread(&requestIdrFrameThread); + PltJoinThread(&requestIdrFrameThread); + PltCloseThread(&requestIdrFrameThread); + + if (ctlSock != INVALID_SOCKET) { + closeSocket(ctlSock); + ctlSock = INVALID_SOCKET; + } + else { + enet_peer_disconnect_now(peer, 0); + peer = NULL; + enet_host_destroy(client); + client = NULL; + } + + return err; + } + // Only create the reference frame invalidation thread if RFI is enabled if (isReferenceFrameInvalidationEnabled()) { err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread); if (err != 0) { stopping = true; PltSetEvent(&idrFrameRequiredEvent); + LbqSignalQueueShutdown(&asyncCallbackQueue); if (ctlSock != INVALID_SOCKET) { shutdownTcpSocket(ctlSock); @@ -1699,6 +1831,10 @@ int startControlStream(void) { PltJoinThread(&requestIdrFrameThread); PltCloseThread(&requestIdrFrameThread); + PltInterruptThread(&asyncCallbackThread); + PltJoinThread(&asyncCallbackThread); + PltCloseThread(&asyncCallbackThread); + if (ctlSock != INVALID_SOCKET) { closeSocket(ctlSock); ctlSock = INVALID_SOCKET;