import { graphQueryBackend2graphQuery, type DbConnection, type QueryRequest } from 'ts-common'; import { QUERY_CACHE_DURATION, rabbitMq, redis, ums, type QueryExecutionTypes } from '../variables'; import { log } from '../logger'; import { QueryPublisher } from '../utils/queryPublisher'; import { query2Cypher } from '../utils/cypher/converter'; import { parseCountCypherQuery, parseCypherQuery } from '../utils/cypher/queryParser'; import { formatTimeDifference } from 'ts-common/src/logger/logger'; import { Query2BackendQuery } from '../utils/reactflow/query2backend'; import type { GraphQueryResultFromBackend, GraphQueryResultMetaFromBackend } from 'ts-common/src/model/webSocket/graphResult'; import { RabbitMqBroker } from 'ts-common/rabbitMq'; import { Neo4jConnection } from 'ts-common/neo4j'; import type { QueryCypher } from '../utils/cypher/converter/queryConverter'; async function cacheCheck(cacheKey: string): Promise<GraphQueryResultMetaFromBackend | undefined> { log.debug('Checking cache for query, with cache ttl', QUERY_CACHE_DURATION, 'seconds'); const cached = await redis.client.get(cacheKey); if (cached) { log.info('Cache hit for query'); const buf = Buffer.from(cached, 'base64'); const inflated = Bun.gunzipSync(new Uint8Array(buf)); const dec = new TextDecoder(); const cachedMessage = JSON.parse(dec.decode(inflated)) as GraphQueryResultMetaFromBackend; return cachedMessage; } } export const queryService = async (db: DbConnection, cypher: QueryCypher, useCached: boolean): Promise<GraphQueryResultMetaFromBackend> => { let index = 0; const disambiguatedQuery = cypher.query.replace(/\d{13}/g, () => (index++).toString()); const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString(); if (QUERY_CACHE_DURATION === '') { log.info('Query cache disabled, skipping cache check'); } else if (!useCached) { log.info('Skipping cache check for query due to parameter', useCached); } else { const cachedMessage = await cacheCheck(cacheKey); if (cachedMessage) { log.debug('Cache hit for query', disambiguatedQuery); return cachedMessage; } } // TODO: only neo4j is supported for now const connection = new Neo4jConnection(db); try { const [neo4jResult, neo4jCountResult] = await connection.run([cypher.query, cypher.countQuery]); const graph = parseCypherQuery(neo4jResult.records); const countGraph = parseCountCypherQuery(neo4jCountResult.records); // calculate metadata const result = graphQueryBackend2graphQuery(graph, countGraph); result.nodeCounts.updatedAt = Date.now(); // cache result if (QUERY_CACHE_DURATION !== '') { log.info('Started gzipping...'); const compressedMessage = Bun.gzipSync(JSON.stringify(result)); log.info('Done gzipping, started encoding to base64...'); const base64Message = Buffer.from(compressedMessage).toString('base64'); log.info('Done encoding, sending to redis...'); // if cache enabled, cache the result await redis.setWithExpire(cacheKey, base64Message, QUERY_CACHE_DURATION); // ttl in seconds log.info('cached in redis'); } return result; } catch (error) { log.error('Error parsing query result:', cypher, error); throw new Error('Error parsing query result'); } finally { connection.close(); } }; export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, type: QueryExecutionTypes) => { if (type == null) { log.error('Unsupported query execution type:', type); throw new Error('Unsupported query execution type'); } log.info('Starting query reader for', type); const queryServiceConsumer = await new RabbitMqBroker( rabbitMq, 'requests-exchange', `${type}-query-queue`, `${type}-query-request`, ).connect(); log.info('Connected to RabbitMQ ST!'); await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => { if (message.queryID == null) { log.error('QueryID not set in message:', message.queryID); return; } const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID); const startTime = Date.now(); const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); if (!ss) { log.error('Invalid SaveState received in queryServiceConsumer:', ss); publisher.publishErrorToFrontend('Invalid SaveState'); return; } log.debug('Received query request:', message, headers, ss); log.debug('Received routing key:', headers.routingKey); publisher.publishStatusToFrontend('Received'); if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { log.error('Invalid SaveState received in queryServiceConsumer:', ss); publisher.publishErrorToFrontend('Invalid SaveState'); return; } let activeQuery = ss.queryStates.activeQueryId; if (message.queryID) { if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) { log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray); publisher.publishErrorToFrontend('Query not found'); return; } activeQuery = message.queryID; } if (activeQuery == null || activeQuery == -1) { log.error('No active query found in SaveState:', ss); publisher.publishErrorToFrontend('No active query found'); return; } const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery); if (activeQueryInfo == null) { log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray); publisher.publishErrorToFrontend('Active query not found'); return; } const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph; log.debug('Received query request:', message, headers, visualQuery); if (visualQuery.nodes.length === 0) { log.info('Empty query received'); publisher.publishResultToFrontend({ nodes: [], edges: [], nodeCounts: { updatedAt: 0 }, metaData: { topological: { density: 0, self_loops: 0 }, nodes: { count: 0, labels: [], types: {} }, edges: { count: 0, labels: [], types: {} }, }, }); return; } const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings; const ml = message.ml; const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); log.debug('translating query:', convertedQuery); publisher.publishStatusToFrontend('Translating'); const cypher = query2Cypher(convertedQuery); const query = cypher.query; if (query == null) { log.error('Error translating query:', convertedQuery); publisher.publishErrorToFrontend('Error translating query'); return; } log.debug('Translated query FROM:', convertedQuery); log.info('Translated query:', query); log.info('Translated query:', cypher.countQuery); publisher.publishTranslationResultToFrontend(query); for (let i = 0; i < ss.dbConnections.length; i++) { try { const result = await queryService(ss.dbConnections[i], cypher, message.useCached); // Cache nodeCounts such that we can display differentiation for each query await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, { ...activeQueryInfo, graph: { ...activeQueryInfo.graph, nodeCounts: result.nodeCounts, }, }); publisher.publishResultToFrontend(result); log.debug('Query result!'); log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { for (let i = 0; i < convertedQuery.machineLearning.length; i++) { try { publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); log.debug('Published machine learning request', convertedQuery.machineLearning[i]); } catch (error) { log.error('Error publishing machine learning request', error); publisher.publishErrorToFrontend('Error publishing machine learning request'); } } } } catch (error) { log.error('Error querying database', error); publisher.publishErrorToFrontend('Error querying database'); } } }); };