diff --git a/bun.lockb b/bun.lockb index 2207e737365016760d1a78a9f35c8d1a457ac494..60c32177a430fc1b9b24ea51dc61fd09c778caf1 100755 Binary files a/bun.lockb and b/bun.lockb differ diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index 4609e9363341d77ad898702ae7ee2ab4cb955347..765a995e01b8c4b37126cd917a62ad5f4de574ba 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -87,6 +87,8 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu } log.info('Starting query reader for', type); + const publisher = new QueryPublisher(frontendPublisher, mlPublisher); + const queryServiceConsumer = await new RabbitMqBroker( rabbitMq, 'requests-exchange', diff --git a/src/utils/queryPublisher.ts b/src/utils/queryPublisher.ts index d10bea719a1387114ca8c66ff08a5f42bc2b0930..ca17e2f169c136a5f7f73df401a68fe086749eb0 100644 --- a/src/utils/queryPublisher.ts +++ b/src/utils/queryPublisher.ts @@ -6,19 +6,35 @@ import type { RabbitMqBroker } from 'ts-common/rabbitMq'; export class QueryPublisher { private frontendPublisher: RabbitMqBroker; private mlPublisher: RabbitMqBroker; - private routingKey: string; - private headers: BackendMessageHeader; - private queryID: number; + private routingKey?: string; + private headers?: BackendMessageHeader; + private queryID?: string; - constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, headers: BackendMessageHeader, queryID: number) { + constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker) { this.frontendPublisher = frontendPublisher; this.mlPublisher = mlPublisher; + } + + withHeaders(headers?: BackendMessageHeader) { this.headers = headers; - this.routingKey = headers.routingKey; + return this; + } + + withRoutingKey(routingKey?: string) { + this.routingKey = routingKey; + return this; + } + + withQueryID(queryID?: string) { this.queryID = queryID; + return this; } publishStatusToFrontend(status: string) { + if (!this.headers || !this.routingKey || !this.queryID) { + throw new Error('Headers or RoutingKey or queryID not set'); + } + this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusUpdate, @@ -32,6 +48,10 @@ export class QueryPublisher { } publishErrorToFrontend(reason: string) { + if (!this.headers || !this.routingKey || !this.queryID) { + throw new Error('Headers or RoutingKey or queryID not set'); + } + this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusError, @@ -45,13 +65,17 @@ export class QueryPublisher { } publishTranslationResultToFrontend(query: string) { + if (!this.headers || !this.routingKey || !this.queryID) { + throw new Error('Headers or RoutingKey or queryID not set'); + } + this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusTranslationResult, callID: this.headers.callID, value: { result: query, - queryID: this.queryID, + queryID: this.headers.callID, }, status: 'success', }, @@ -61,6 +85,10 @@ export class QueryPublisher { } publishResultToFrontend(result: GraphQueryResultMetaFromBackend) { + if (!this.headers || !this.routingKey || !this.queryID) { + throw new Error('Headers or RoutingKey or queryID not set'); + } + this.frontendPublisher.publishMessageToFrontend( { type: wsReturnKey.queryStatusResult, @@ -70,7 +98,7 @@ export class QueryPublisher { type: 'nodelink', payload: result, }, - queryID: this.queryID, + queryID: this.headers.callID, }, status: 'success', }, @@ -80,6 +108,10 @@ export class QueryPublisher { } publishMachineLearningRequest(result: GraphQueryResultFromBackend, mlAttributes: MachineLearning, headers: BackendMessageHeader) { + if (!this.headers || !this.routingKey) { + throw new Error('Headers or RoutingKey or queryID not set'); + } + // FIXME: Change ML to use the same message format that the frontend uses const toMlResult = { nodes: result.nodes.map(node => ({ ...node, id: node._id })),