Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • graphpolaris/microservices/query-service
1 result
Show changes
Commits on Source (1)
......@@ -53,6 +53,10 @@ export const queryService = async (db: DbConnection, cypher: QueryCypher, useCac
const result = graphQueryBackend2graphQuery(graph, countGraph);
result.nodeCounts.updatedAt = Date.now();
// Force garbage collection
neo4jResult.records = [];
Bun.gc(true);
// cache result
if (QUERY_CACHE_DURATION !== '') {
......@@ -100,118 +104,124 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID);
const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
if (!ss) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
try {
const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey);
if (!ss) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
publisher.publishStatusToFrontend('Received');
log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey);
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
publisher.publishStatusToFrontend('Received');
let activeQuery = ss.queryStates.activeQueryId;
if (message.queryID) {
if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) {
log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Query not found');
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
activeQuery = message.queryID;
}
if (activeQuery == null || activeQuery == -1) {
log.error('No active query found in SaveState:', ss);
publisher.publishErrorToFrontend('No active query found');
return;
}
let activeQuery = ss.queryStates.activeQueryId;
if (message.queryID) {
if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) {
log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Query not found');
return;
}
activeQuery = message.queryID;
}
const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery);
if (activeQueryInfo == null) {
log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Active query not found');
return;
}
if (activeQuery == null || activeQuery == -1) {
log.error('No active query found in SaveState:', ss);
publisher.publishErrorToFrontend('No active query found');
return;
}
const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph;
log.debug('Received query request:', message, headers, visualQuery);
if (visualQuery.nodes.length === 0) {
log.info('Empty query received');
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery);
if (activeQueryInfo == null) {
log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Active query not found');
return;
}
const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph;
log.debug('Received query request:', message, headers, visualQuery);
if (visualQuery.nodes.length === 0) {
log.info('Empty query received');
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
const cypher = query2Cypher(convertedQuery);
const query = cypher.query;
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
log.debug('Translated query FROM:', convertedQuery);
log.info('Translated query:', query);
log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
for (let i = 0; i < ss.dbConnections.length; i++) {
try {
const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
const cypher = query2Cypher(convertedQuery);
const query = cypher.query;
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
log.debug('Translated query FROM:', convertedQuery);
log.info('Translated query:', query);
log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
for (let i = 0; i < ss.dbConnections.length; i++) {
try {
const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
}
}
}
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
}
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
Bun.gc(true);
}
} finally {
setTimeout(() => Bun.gc(true), 100);
}
});
};