Invoke control stream callbacks on a separate thread

Time spent processing client callbacks would contribute to increased control stream latency,
which means higher RTTs, more lengthy waits before retransmissions, and higher client-reported
network latency stats.
This commit is contained in:
Cameron Gutman 2023-09-17 14:49:06 -05:00
parent b2528faa02
commit 20130e210b

View File

@ -38,6 +38,34 @@ typedef struct _QUEUED_FRAME_FEC_STATUS {
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_FRAME_FEC_STATUS, *PQUEUED_FRAME_FEC_STATUS;
typedef struct _QUEUED_ASYNC_CALLBACK {
int typeIndex;
union {
struct {
uint16_t controllerNumber;
uint16_t lowFreqRumble;
uint16_t highFreqRumble;
} rumble;
struct {
uint16_t controllerNumber;
uint16_t leftTriggerMotor;
uint16_t rightTriggerMotor;
} rumbleTriggers;
struct {
uint16_t controllerNumber;
uint16_t reportRateHz;
uint8_t motionType;
} setMotionEventState;
struct {
uint16_t controllerNumber;
uint8_t r;
uint8_t g;
uint8_t b;
} setControllerLed;
} data;
LINKED_BLOCKING_QUEUE_ENTRY entry;
} QUEUED_ASYNC_CALLBACK, *PQUEUED_ASYNC_CALLBACK;
static SOCKET ctlSock = INVALID_SOCKET;
static ENetHost* client;
static ENetPeer* peer;
@ -48,6 +76,7 @@ static PLT_THREAD lossStatsThread;
static PLT_THREAD invalidateRefFramesThread;
static PLT_THREAD requestIdrFrameThread;
static PLT_THREAD controlReceiveThread;
static PLT_THREAD asyncCallbackThread;
static int lossCountSinceLastReport;
static int lastGoodFrame;
static int lastSeenFrame;
@ -67,6 +96,7 @@ static uint64_t firstFrameTimeMs;
static LINKED_BLOCKING_QUEUE invalidReferenceFrameTuples;
static LINKED_BLOCKING_QUEUE frameFecStatusQueue;
static LINKED_BLOCKING_QUEUE asyncCallbackQueue;
static PLT_EVENT idrFrameRequiredEvent;
static PPLT_CRYPTO_CONTEXT encryptionCtx;
@ -251,6 +281,7 @@ int initializeControlStream(void) {
PltCreateEvent(&idrFrameRequiredEvent);
LbqInitializeLinkedBlockingQueue(&invalidReferenceFrameTuples, 20);
LbqInitializeLinkedBlockingQueue(&frameFecStatusQueue, 8); // Limits number of frame status reports per periodic ping interval
LbqInitializeLinkedBlockingQueue(&asyncCallbackQueue, 30);
PltCreateMutex(&enetMutex);
encryptedControlStream = APP_VERSION_AT_LEAST(7, 1, 431);
@ -326,7 +357,8 @@ void destroyControlStream(void) {
PltCloseEvent(&idrFrameRequiredEvent);
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&invalidReferenceFrameTuples));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&frameFecStatusQueue));
freeBasicLbqList(LbqDestroyLinkedBlockingQueue(&asyncCallbackQueue));
PltDeleteMutex(&enetMutex);
}
@ -786,6 +818,118 @@ static int ignoreDisconnectIntercept(ENetHost* host, ENetEvent* event) {
return 0;
}
static void asyncCallbackThreadFunc(void* context) {
PQUEUED_ASYNC_CALLBACK queuedCb;
while (LbqWaitForQueueElement(&asyncCallbackQueue, (void**)&queuedCb) == LBQ_SUCCESS) {
switch (queuedCb->typeIndex) {
case IDX_RUMBLE_DATA:
ListenerCallbacks.rumble(queuedCb->data.rumble.controllerNumber,
queuedCb->data.rumble.lowFreqRumble,
queuedCb->data.rumble.highFreqRumble);
break;
case IDX_RUMBLE_TRIGGER_DATA:
ListenerCallbacks.rumbleTriggers(queuedCb->data.rumbleTriggers.controllerNumber,
queuedCb->data.rumbleTriggers.leftTriggerMotor,
queuedCb->data.rumbleTriggers.rightTriggerMotor);
break;
case IDX_SET_MOTION_EVENT:
ListenerCallbacks.setMotionEventState(queuedCb->data.setMotionEventState.controllerNumber,
queuedCb->data.setMotionEventState.motionType,
queuedCb->data.setMotionEventState.reportRateHz);
break;
case IDX_SET_RGB_LED:
ListenerCallbacks.setControllerLED(queuedCb->data.setControllerLed.controllerNumber,
queuedCb->data.setControllerLed.r,
queuedCb->data.setControllerLed.g,
queuedCb->data.setControllerLed.b);
break;
case IDX_HDR_INFO:
// HDR state is maintained globally, so we just invoke the client callback here.
// If the state has changed multiple times, the client could receive duplicate
// callbacks but that is fine.
ListenerCallbacks.setHdrMode(hdrEnabled);
break;
default:
// Unhandled packet type from queueAsyncCallback()
LC_ASSERT(false);
break;
}
free(queuedCb);
}
}
static bool needsAsyncCallback(unsigned short packetType) {
return packetType == packetTypes[IDX_RUMBLE_DATA] ||
packetType == packetTypes[IDX_RUMBLE_TRIGGER_DATA] ||
packetType == packetTypes[IDX_SET_MOTION_EVENT] ||
packetType == packetTypes[IDX_SET_RGB_LED] ||
packetType == packetTypes[IDX_HDR_INFO];
}
static void queueAsyncCallback(PNVCTL_ENET_PACKET_HEADER_V1 ctlHdr, int packetLength) {
BYTE_BUFFER bb;
PQUEUED_ASYNC_CALLBACK queuedCb;
int err;
LC_ASSERT(needsAsyncCallback(ctlHdr->type));
queuedCb = malloc(sizeof(*queuedCb));
if (!queuedCb) {
return;
}
BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE);
if (ctlHdr->type == packetTypes[IDX_RUMBLE_DATA]) {
BbAdvanceBuffer(&bb, 4);
BbGet16(&bb, &queuedCb->data.rumble.controllerNumber);
BbGet16(&bb, &queuedCb->data.rumble.lowFreqRumble);
BbGet16(&bb, &queuedCb->data.rumble.highFreqRumble);
queuedCb->typeIndex = IDX_RUMBLE_DATA;
}
else if (ctlHdr->type == packetTypes[IDX_RUMBLE_TRIGGER_DATA]) {
BbGet16(&bb, &queuedCb->data.rumbleTriggers.controllerNumber);
BbGet16(&bb, &queuedCb->data.rumbleTriggers.leftTriggerMotor);
BbGet16(&bb, &queuedCb->data.rumbleTriggers.rightTriggerMotor);
queuedCb->typeIndex = IDX_RUMBLE_TRIGGER_DATA;
}
else if (ctlHdr->type == packetTypes[IDX_SET_MOTION_EVENT]) {
BbGet16(&bb, &queuedCb->data.setMotionEventState.controllerNumber);
BbGet16(&bb, &queuedCb->data.setMotionEventState.reportRateHz);
BbGet8(&bb, &queuedCb->data.setMotionEventState.motionType);
queuedCb->typeIndex = IDX_SET_MOTION_EVENT;
}
else if (ctlHdr->type == packetTypes[IDX_SET_RGB_LED]) {
BbGet16(&bb, &queuedCb->data.setControllerLed.controllerNumber);
BbGet8(&bb, &queuedCb->data.setControllerLed.r);
BbGet8(&bb, &queuedCb->data.setControllerLed.g);
BbGet8(&bb, &queuedCb->data.setControllerLed.b);
queuedCb->typeIndex = IDX_SET_RGB_LED;
}
else if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) {
queuedCb->typeIndex = IDX_HDR_INFO;
}
else {
// Unhandled packet type from needsAsyncCallback()
LC_ASSERT(false);
free(queuedCb);
return;
}
err = LbqOfferQueueItem(&asyncCallbackQueue, queuedCb, &queuedCb->entry);
if (err != LBQ_SUCCESS) {
Limelog("Failed to queue async callback: %d\n", err);
free(queuedCb);
}
}
static void controlReceiveThreadFunc(void* context) {
int err;
@ -929,68 +1073,9 @@ static void controlReceiveThreadFunc(void* context) {
// All below codepaths must free ctlHdr!!!
if (ctlHdr->type == packetTypes[IDX_RUMBLE_DATA]) {
BYTE_BUFFER bb;
BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE);
BbAdvanceBuffer(&bb, 4);
uint16_t controllerNumber;
uint16_t lowFreqRumble;
uint16_t highFreqRumble;
BbGet16(&bb, &controllerNumber);
BbGet16(&bb, &lowFreqRumble);
BbGet16(&bb, &highFreqRumble);
ListenerCallbacks.rumble(controllerNumber, lowFreqRumble, highFreqRumble);
}
else if (ctlHdr->type == packetTypes[IDX_RUMBLE_TRIGGER_DATA]) {
BYTE_BUFFER bb;
BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE);
uint16_t controllerNumber;
uint16_t leftTriggerMotor;
uint16_t rightTriggerMotor;
BbGet16(&bb, &controllerNumber);
BbGet16(&bb, &leftTriggerMotor);
BbGet16(&bb, &rightTriggerMotor);
ListenerCallbacks.rumbleTriggers(controllerNumber, leftTriggerMotor, rightTriggerMotor);
}
else if (ctlHdr->type == packetTypes[IDX_SET_MOTION_EVENT]) {
BYTE_BUFFER bb;
BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE);
uint16_t controllerNumber;
uint16_t reportRateHz;
uint8_t motionType;
BbGet16(&bb, &controllerNumber);
BbGet16(&bb, &reportRateHz);
BbGet8(&bb, &motionType);
ListenerCallbacks.setMotionEventState(controllerNumber, motionType, reportRateHz);
}
else if (ctlHdr->type == packetTypes[IDX_SET_RGB_LED]) {
BYTE_BUFFER bb;
BbInitializeWrappedBuffer(&bb, (char*)ctlHdr, sizeof(*ctlHdr), packetLength - sizeof(*ctlHdr), BYTE_ORDER_LITTLE);
uint16_t controllerNumber;
uint8_t r, g, b;
BbGet16(&bb, &controllerNumber);
BbGet8(&bb, &r);
BbGet8(&bb, &g);
BbGet8(&bb, &b);
ListenerCallbacks.setControllerLED(controllerNumber, r, g, b);
}
else if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) {
// Process HDR data immediately to update global HDR enabled state and HDR metadata.
// The actual client callback will be invoked in the async callback thread.
if (ctlHdr->type == packetTypes[IDX_HDR_INFO]) {
BYTE_BUFFER bb;
uint8_t enableByte;
@ -1016,7 +1101,11 @@ static void controlReceiveThreadFunc(void* context) {
}
hdrEnabled = (enableByte != 0);
ListenerCallbacks.setHdrMode(hdrEnabled);
}
// Process client callbacks in a separate thread
if (needsAsyncCallback(ctlHdr->type)) {
queueAsyncCallback(ctlHdr, packetLength);
}
else if (ctlHdr->type == packetTypes[IDX_TERMINATION]) {
BYTE_BUFFER bb;
@ -1336,6 +1425,7 @@ int stopControlStream(void) {
stopping = true;
LbqSignalQueueShutdown(&invalidReferenceFrameTuples);
LbqSignalQueueShutdown(&frameFecStatusQueue);
LbqSignalQueueDrain(&asyncCallbackQueue);
PltSetEvent(&idrFrameRequiredEvent);
// This must be set to stop in a timely manner
@ -1348,14 +1438,17 @@ int stopControlStream(void) {
PltInterruptThread(&lossStatsThread);
PltInterruptThread(&requestIdrFrameThread);
PltInterruptThread(&controlReceiveThread);
PltInterruptThread(&asyncCallbackThread);
PltJoinThread(&lossStatsThread);
PltJoinThread(&requestIdrFrameThread);
PltJoinThread(&controlReceiveThread);
PltJoinThread(&asyncCallbackThread);
PltCloseThread(&lossStatsThread);
PltCloseThread(&requestIdrFrameThread);
PltCloseThread(&controlReceiveThread);
PltCloseThread(&asyncCallbackThread);
// We will only have an RFI thread if RFI is enabled
if (isReferenceFrameInvalidationEnabled()) {
@ -1673,12 +1766,51 @@ int startControlStream(void) {
return err;
}
err = PltCreateThread("CtrlAsyncCb", asyncCallbackThreadFunc, NULL, &asyncCallbackThread);
if (err != 0) {
stopping = true;
PltSetEvent(&idrFrameRequiredEvent);
if (ctlSock != INVALID_SOCKET) {
shutdownTcpSocket(ctlSock);
}
else {
ConnectionInterrupted = true;
}
PltInterruptThread(&lossStatsThread);
PltJoinThread(&lossStatsThread);
PltCloseThread(&lossStatsThread);
PltInterruptThread(&controlReceiveThread);
PltJoinThread(&controlReceiveThread);
PltCloseThread(&controlReceiveThread);
PltInterruptThread(&requestIdrFrameThread);
PltJoinThread(&requestIdrFrameThread);
PltCloseThread(&requestIdrFrameThread);
if (ctlSock != INVALID_SOCKET) {
closeSocket(ctlSock);
ctlSock = INVALID_SOCKET;
}
else {
enet_peer_disconnect_now(peer, 0);
peer = NULL;
enet_host_destroy(client);
client = NULL;
}
return err;
}
// Only create the reference frame invalidation thread if RFI is enabled
if (isReferenceFrameInvalidationEnabled()) {
err = PltCreateThread("InvRefFrames", invalidateRefFramesFunc, NULL, &invalidateRefFramesThread);
if (err != 0) {
stopping = true;
PltSetEvent(&idrFrameRequiredEvent);
LbqSignalQueueShutdown(&asyncCallbackQueue);
if (ctlSock != INVALID_SOCKET) {
shutdownTcpSocket(ctlSock);
@ -1699,6 +1831,10 @@ int startControlStream(void) {
PltJoinThread(&requestIdrFrameThread);
PltCloseThread(&requestIdrFrameThread);
PltInterruptThread(&asyncCallbackThread);
PltJoinThread(&asyncCallbackThread);
PltCloseThread(&asyncCallbackThread);
if (ctlSock != INVALID_SOCKET) {
closeSocket(ctlSock);
ctlSock = INVALID_SOCKET;