Skip to content
Snippets Groups Projects

test: adds tests for query service

Open Marcos Pieras requested to merge test/queryTests into main
+ 115
99
@@ -53,13 +53,22 @@ export const queryService = async (db: DbConnection, cypher: QueryCypher, useCac
const result = graphQueryBackend2graphQuery(graph, countGraph);
result.nodeCounts.updatedAt = Date.now();
// Force garbage collection
neo4jResult.records = [];
Bun.gc(true);
// cache result
const compressedMessage = Bun.gzipSync(JSON.stringify(result));
const base64Message = Buffer.from(compressedMessage).toString('base64');
if (QUERY_CACHE_DURATION !== '') {
log.info('Started gzipping...');
const compressedMessage = Bun.gzipSync(JSON.stringify(result));
log.info('Done gzipping, started encoding to base64...');
const base64Message = Buffer.from(compressedMessage).toString('base64');
log.info('Done encoding, sending to redis...');
// if cache enabled, cache the result
await redis.setWithExpire(cacheKey, base64Message, QUERY_CACHE_DURATION); // ttl in seconds
log.info('cached in redis');
}
return result;
@@ -90,124 +99,131 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.info('Connected to RabbitMQ ST!');
await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => {
const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
if (!ss) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
if (message.queryID == null) {
log.error('QueryID not set in message:', message.queryID);
return;
}
log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey);
const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID);
if (!headers.callID) {
log.error('QueryID not set in headers:', headers);
return;
}
publisher.withHeaders(headers).withRoutingKey(headers.routingKey).withQueryID(headers.callID);
publisher.publishStatusToFrontend('Received');
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
try {
const startTime = Date.now();
const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID);
let activeQuery = ss.queryStates.activeQueryId;
if (message.queryID) {
if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) {
log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Query not found');
if (!ss) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
activeQuery = message.queryID;
}
if (activeQuery == null || activeQuery == -1) {
log.error('No active query found in SaveState:', ss);
publisher.publishErrorToFrontend('No active query found');
return;
}
log.debug('Received query request:', message, headers, ss);
log.debug('Received routing key:', headers.routingKey);
const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery);
if (activeQueryInfo == null) {
log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Active query not found');
return;
}
publisher.publishStatusToFrontend('Received');
const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph;
log.debug('Received query request:', message, headers, visualQuery);
if (visualQuery.nodes.length === 0) {
log.info('Empty query received');
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
log.error('Invalid SaveState received in queryServiceConsumer:', ss);
publisher.publishErrorToFrontend('Invalid SaveState');
return;
}
const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
let activeQuery = ss.queryStates.activeQueryId;
if (message.queryID) {
if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) {
log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Query not found');
return;
}
activeQuery = message.queryID;
}
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
if (activeQuery == null || activeQuery == -1) {
log.error('No active query found in SaveState:', ss);
publisher.publishErrorToFrontend('No active query found');
return;
}
const cypher = query2Cypher(convertedQuery);
const query = cypher.query;
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery);
if (activeQueryInfo == null) {
log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray);
publisher.publishErrorToFrontend('Active query not found');
return;
}
log.debug('Translated query FROM:', convertedQuery);
log.info('Translated query:', query);
log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
for (let i = 0; i < ss.dbConnections.length; i++) {
try {
const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph;
log.debug('Received query request:', message, headers, visualQuery);
if (visualQuery.nodes.length === 0) {
log.info('Empty query received');
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
edges: { count: 0, labels: [], types: {} },
},
});
return;
}
const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
const cypher = query2Cypher(convertedQuery);
const query = cypher.query;
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
log.debug('Translated query FROM:', convertedQuery);
log.info('Translated query:', query);
log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
for (let i = 0; i < ss.dbConnections.length; i++) {
try {
const result = await queryService(ss.dbConnections[i], cypher, message.useCached);
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
}
}
}
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
}
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
Bun.gc(true);
}
} finally {
setTimeout(() => Bun.gc(true), 100);
}
});
};
Loading