Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • graphpolaris/microservices/query-service
1 result
Show changes
Commits on Source (1)
...@@ -78,8 +78,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu ...@@ -78,8 +78,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
} }
log.info('Starting query reader for', type); log.info('Starting query reader for', type);
const publisher = new QueryPublisher(frontendPublisher, mlPublisher);
const queryServiceConsumer = await new RabbitMqBroker( const queryServiceConsumer = await new RabbitMqBroker(
rabbitMq, rabbitMq,
'requests-exchange', 'requests-exchange',
...@@ -90,6 +88,13 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu ...@@ -90,6 +88,13 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.info('Connected to RabbitMQ ST!'); log.info('Connected to RabbitMQ ST!');
await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => { await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => {
if (message.queryID == null) {
log.error('QueryID not set in message:', message.queryID);
return;
}
const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID);
const startTime = Date.now(); const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
...@@ -102,12 +107,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu ...@@ -102,12 +107,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.debug('Received query request:', message, headers, ss); log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey); log.debug('Received routing key:', headers.routingKey);
if (!headers.callID) {
log.error('QueryID not set in headers:', headers);
return;
}
publisher.withHeaders(headers).withRoutingKey(headers.routingKey).withQueryID(headers.callID);
publisher.publishStatusToFrontend('Received'); publisher.publishStatusToFrontend('Received');
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
......
...@@ -6,35 +6,19 @@ import type { RabbitMqBroker } from 'ts-common/rabbitMq'; ...@@ -6,35 +6,19 @@ import type { RabbitMqBroker } from 'ts-common/rabbitMq';
export class QueryPublisher { export class QueryPublisher {
private frontendPublisher: RabbitMqBroker; private frontendPublisher: RabbitMqBroker;
private mlPublisher: RabbitMqBroker; private mlPublisher: RabbitMqBroker;
private routingKey?: string; private routingKey: string;
private headers?: BackendMessageHeader; private headers: BackendMessageHeader;
private queryID?: string; private queryID: number;
constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker) { constructor(frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, headers: BackendMessageHeader, queryID: number) {
this.frontendPublisher = frontendPublisher; this.frontendPublisher = frontendPublisher;
this.mlPublisher = mlPublisher; this.mlPublisher = mlPublisher;
}
withHeaders(headers?: BackendMessageHeader) {
this.headers = headers; this.headers = headers;
return this; this.routingKey = headers.routingKey;
}
withRoutingKey(routingKey?: string) {
this.routingKey = routingKey;
return this;
}
withQueryID(queryID?: string) {
this.queryID = queryID; this.queryID = queryID;
return this;
} }
publishStatusToFrontend(status: string) { publishStatusToFrontend(status: string) {
if (!this.headers || !this.routingKey || !this.queryID) {
throw new Error('Headers or RoutingKey or queryID not set');
}
this.frontendPublisher.publishMessageToFrontend( this.frontendPublisher.publishMessageToFrontend(
{ {
type: wsReturnKey.queryStatusUpdate, type: wsReturnKey.queryStatusUpdate,
...@@ -48,10 +32,6 @@ export class QueryPublisher { ...@@ -48,10 +32,6 @@ export class QueryPublisher {
} }
publishErrorToFrontend(reason: string) { publishErrorToFrontend(reason: string) {
if (!this.headers || !this.routingKey || !this.queryID) {
throw new Error('Headers or RoutingKey or queryID not set');
}
this.frontendPublisher.publishMessageToFrontend( this.frontendPublisher.publishMessageToFrontend(
{ {
type: wsReturnKey.queryStatusError, type: wsReturnKey.queryStatusError,
...@@ -65,17 +45,13 @@ export class QueryPublisher { ...@@ -65,17 +45,13 @@ export class QueryPublisher {
} }
publishTranslationResultToFrontend(query: string) { publishTranslationResultToFrontend(query: string) {
if (!this.headers || !this.routingKey || !this.queryID) {
throw new Error('Headers or RoutingKey or queryID not set');
}
this.frontendPublisher.publishMessageToFrontend( this.frontendPublisher.publishMessageToFrontend(
{ {
type: wsReturnKey.queryStatusTranslationResult, type: wsReturnKey.queryStatusTranslationResult,
callID: this.headers.callID, callID: this.headers.callID,
value: { value: {
result: query, result: query,
queryID: this.headers.callID, queryID: this.queryID,
}, },
status: 'success', status: 'success',
}, },
...@@ -85,10 +61,6 @@ export class QueryPublisher { ...@@ -85,10 +61,6 @@ export class QueryPublisher {
} }
publishResultToFrontend(result: GraphQueryResultMetaFromBackend) { publishResultToFrontend(result: GraphQueryResultMetaFromBackend) {
if (!this.headers || !this.routingKey || !this.queryID) {
throw new Error('Headers or RoutingKey or queryID not set');
}
this.frontendPublisher.publishMessageToFrontend( this.frontendPublisher.publishMessageToFrontend(
{ {
type: wsReturnKey.queryStatusResult, type: wsReturnKey.queryStatusResult,
...@@ -98,7 +70,7 @@ export class QueryPublisher { ...@@ -98,7 +70,7 @@ export class QueryPublisher {
type: 'nodelink', type: 'nodelink',
payload: result, payload: result,
}, },
queryID: this.headers.callID, queryID: this.queryID,
}, },
status: 'success', status: 'success',
}, },
...@@ -108,10 +80,6 @@ export class QueryPublisher { ...@@ -108,10 +80,6 @@ export class QueryPublisher {
} }
publishMachineLearningRequest(result: GraphQueryResultFromBackend, mlAttributes: MachineLearning, headers: BackendMessageHeader) { publishMachineLearningRequest(result: GraphQueryResultFromBackend, mlAttributes: MachineLearning, headers: BackendMessageHeader) {
if (!this.headers || !this.routingKey) {
throw new Error('Headers or RoutingKey or queryID not set');
}
// FIXME: Change ML to use the same message format that the frontend uses // FIXME: Change ML to use the same message format that the frontend uses
const toMlResult = { const toMlResult = {
nodes: result.nodes.map(node => ({ ...node, id: node._id })), nodes: result.nodes.map(node => ({ ...node, id: node._id })),
......