Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • graphpolaris/microservices/query-service
1 result
Show changes
Commits on Source (1)
...@@ -53,6 +53,10 @@ export const queryService = async (db: DbConnection, cypher: QueryCypher, useCac ...@@ -53,6 +53,10 @@ export const queryService = async (db: DbConnection, cypher: QueryCypher, useCac
const result = graphQueryBackend2graphQuery(graph, countGraph); const result = graphQueryBackend2graphQuery(graph, countGraph);
result.nodeCounts.updatedAt = Date.now(); result.nodeCounts.updatedAt = Date.now();
// Force garbage collection
neo4jResult.records = [];
Bun.gc(true);
// cache result // cache result
if (QUERY_CACHE_DURATION !== '') { if (QUERY_CACHE_DURATION !== '') {
...@@ -100,118 +104,124 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu ...@@ -100,118 +104,124 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID); const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID);
const startTime = Date.now(); try {
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
if (!ss) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
log.debug('Received query request:', message, headers, ss); if (!ss) {
log.debug('Received routing key:', headers.routingKey); log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
publisher.publishStatusToFrontend('Received'); log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey);
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { publisher.publishStatusToFrontend('Received');
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
let activeQuery = ss.queryStates.activeQueryId; if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
if (message.queryID) { log.error('Invalid SaveState received in queryServiceConsumer:', ss);
if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) { publisher.publishErrorToFrontend('Invalid SaveState');
log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Query not found');
return; return;
} }
activeQuery = message.queryID;
}
if (activeQuery == null || activeQuery == -1) { let activeQuery = ss.queryStates.activeQueryId;
log.error('No active query found in SaveState:', ss); if (message.queryID) {
publisher.publishErrorToFrontend('No active query found'); if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) {
return; log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
} publisher.publishErrorToFrontend('Query not found');
return;
}
activeQuery = message.queryID;
}
const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery); if (activeQuery == null || activeQuery == -1) {
if (activeQueryInfo == null) { log.error('No active query found in SaveState:', ss);
log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray); publisher.publishErrorToFrontend('No active query found');
publisher.publishErrorToFrontend('Active query not found'); return;
return; }
}
const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph; const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery);
log.debug('Received query request:', message, headers, visualQuery); if (activeQueryInfo == null) {
if (visualQuery.nodes.length === 0) { log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray);
log.info('Empty query received'); publisher.publishErrorToFrontend('Active query not found');
publisher.publishResultToFrontend({ return;
nodes: [], }
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings; const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph;
const ml = message.ml; log.debug('Received query request:', message, headers, visualQuery);
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); if (visualQuery.nodes.length === 0) {
log.info('Empty query received');
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
log.debug('translating query:', convertedQuery); const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings;
publisher.publishStatusToFrontend('Translating'); const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
const cypher = query2Cypher(convertedQuery); log.debug('translating query:', convertedQuery);
const query = cypher.query; publisher.publishStatusToFrontend('Translating');
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
log.debug('Translated query FROM:', convertedQuery); const cypher = query2Cypher(convertedQuery);
log.info('Translated query:', query); const query = cypher.query;
log.info('Translated query:', cypher.countQuery); if (query == null) {
publisher.publishTranslationResultToFrontend(query); log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
for (let i = 0; i < ss.dbConnections.length; i++) { return;
try { }
const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
publisher.publishResultToFrontend(result); log.debug('Translated query FROM:', convertedQuery);
log.debug('Query result!'); log.info('Translated query:', query);
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) { for (let i = 0; i < ss.dbConnections.length; i++) {
try { try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) { // Cache nodeCounts such that we can display differentiation for each query
log.error('Error publishing machine learning request', error); await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
publisher.publishErrorToFrontend('Error publishing machine learning request'); ...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
}
} }
} }
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
} }
} catch (error) {
log.error('Error querying database', error); Bun.gc(true);
publisher.publishErrorToFrontend('Error querying database');
} }
} finally {
setTimeout(() => Bun.gc(true), 100);
} }
}); });
}; };