From 064a7af7d6ef6bb5e5d814cfeadf2528bd0f51fe Mon Sep 17 00:00:00 2001 From: Dennis Collaris <d.collaris@me.com> Date: Thu, 6 Mar 2025 12:20:17 +0100 Subject: [PATCH 1/2] fix: create new querypublisher for each message Every user was using the same global queryPublisher, with the risk of one users message setting headers that were then used for another users response. --- src/readers/queryService.ts | 5 ++--- src/utils/queryPublisher.ts | 18 +++--------------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index 1f5fe97..984bbc3 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -78,8 +78,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu } log.info('Starting query reader for', type); - const publisher = new QueryPublisher(frontendPublisher, mlPublisher); - const queryServiceConsumer = await new RabbitMqBroker( rabbitMq, 'requests-exchange', @@ -90,6 +88,8 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu log.info('Connected to RabbitMQ ST!'); await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => { + const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers); + const startTime = Date.now(); const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); @@ -107,7 +107,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu return; } - publisher.withHeaders(headers).withRoutingKey(headers.routingKey).withQueryID(headers.callID); publisher.publishStatusToFrontend('Received'); if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { diff --git a/src/utils/queryPublisher.ts b/src/utils/queryPublisher.ts index ca17e2f..03e0737 100644 --- a/src/utils/queryPublisher.ts +++ b/src/utils/queryPublisher.ts @@ -10,24 +10,12 @@ export class QueryPublisher { private headers?: BackendMessageHeader; private queryID?: string; - constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker) { + constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, headers: BackendMessageHeader) { this.frontendPublisher = frontendPublisher; this.mlPublisher = mlPublisher; - } - - withHeaders(headers?: BackendMessageHeader) { this.headers = headers; - return this; - } - - withRoutingKey(routingKey?: string) { - this.routingKey = routingKey; - return this; - } - - withQueryID(queryID?: string) { - this.queryID = queryID; - return this; + this.routingKey = headers.routingKey; + this.queryID = headers.callID; // FIXME: WTF? } publishStatusToFrontend(status: string) { -- GitLab From 2ad009384b443410c7857c8b4f57f639fec7f3c9 Mon Sep 17 00:00:00 2001 From: Dennis Collaris <d.collaris@me.com> Date: Thu, 6 Mar 2025 13:12:39 +0100 Subject: [PATCH 2/2] fix: set query ID to actually be the query ID, not incorrectly the call ID Note that this query ID is currently not used in the frontend anyway --- src/readers/queryService.ts | 12 ++++++------ src/utils/queryPublisher.ts | 34 +++++++--------------------------- 2 files changed, 13 insertions(+), 33 deletions(-) diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index 984bbc3..56d4455 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -88,7 +88,12 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu log.info('Connected to RabbitMQ ST!'); await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => { - const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers); + if (message.queryID == null) { + log.error('QueryID not set in message:', message.queryID); + return; + } + + const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID); const startTime = Date.now(); const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); @@ -102,11 +107,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu log.debug('Received query request:', message, headers, ss); log.debug('Received routing key:', headers.routingKey); - if (!headers.callID) { - log.error('QueryID not set in headers:', headers); - return; - } - publisher.publishStatusToFrontend('Received'); if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { diff --git a/src/utils/queryPublisher.ts b/src/utils/queryPublisher.ts index 03e0737..d10bea7 100644 --- a/src/utils/queryPublisher.ts +++ b/src/utils/queryPublisher.ts @@ -6,23 +6,19 @@ import type { RabbitMqBroker } from 'ts-common/rabbitMq'; export class QueryPublisher { private frontendPublisher: RabbitMqBroker; private mlPublisher: RabbitMqBroker; - private routingKey?: string; - private headers?: BackendMessageHeader; - private queryID?: string; + private routingKey: string; + private headers: BackendMessageHeader; + private queryID: number; - constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, headers: BackendMessageHeader) { + constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, headers: BackendMessageHeader, queryID: number) { this.frontendPublisher = frontendPublisher; this.mlPublisher = mlPublisher; this.headers = headers; this.routingKey = headers.routingKey; - this.queryID = headers.callID; // FIXME: WTF? + this.queryID = queryID; } publishStatusToFrontend(status: string) { - if (!this.headers || !this.routingKey || !this.queryID) { - throw new Error('Headers or RoutingKey or queryID not set'); - } - this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusUpdate, @@ -36,10 +32,6 @@ export class QueryPublisher { } publishErrorToFrontend(reason: string) { - if (!this.headers || !this.routingKey || !this.queryID) { - throw new Error('Headers or RoutingKey or queryID not set'); - } - this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusError, @@ -53,17 +45,13 @@ export class QueryPublisher { } publishTranslationResultToFrontend(query: string) { - if (!this.headers || !this.routingKey || !this.queryID) { - throw new Error('Headers or RoutingKey or queryID not set'); - } - this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusTranslationResult, callID: this.headers.callID, value: { result: query, - queryID: this.headers.callID, + queryID: this.queryID, }, status: 'success', }, @@ -73,10 +61,6 @@ export class QueryPublisher { } publishResultToFrontend(result: GraphQueryResultMetaFromBackend) { - if (!this.headers || !this.routingKey || !this.queryID) { - throw new Error('Headers or RoutingKey or queryID not set'); - } - this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusResult, @@ -86,7 +70,7 @@ export class QueryPublisher { type: 'nodelink', payload: result, }, - queryID: this.headers.callID, + queryID: this.queryID, }, status: 'success', }, @@ -96,10 +80,6 @@ export class QueryPublisher { } publishMachineLearningRequest(result: GraphQueryResultFromBackend, mlAttributes: MachineLearning, headers: BackendMessageHeader) { - if (!this.headers || !this.routingKey) { - throw new Error('Headers or RoutingKey or queryID not set'); - } - // FIXME: Change ML to use the same message format that the frontend uses const toMlResult = { nodes: result.nodes.map(node => ({ ...node, id: node._id })), -- GitLab