diff --git a/src/readers/insightProcessor.ts b/src/readers/insightProcessor.ts index ad605156628208ff86e991caca03c68560082589..50bd7befe13b3715439cc73bb10ee53710800bb9 100644 --- a/src/readers/insightProcessor.ts +++ b/src/readers/insightProcessor.ts @@ -91,10 +91,10 @@ export const insightProcessor = async () => { const visualQuery = ss.queryStates.openQueryArray[queryIndex].graph; const queryBuilderSettings = ss.queryStates.openQueryArray[queryIndex].settings; const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, []); - const query = query2Cypher(convertedQuery); - if (query == null) return; + const cypher = query2Cypher(convertedQuery); + if (cypher == null) return; try { - const result = await queryService(ss.dbConnections[0], query); + const result = await queryService(ss.dbConnections[0], cypher); insight.status = false; diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index 21799877b628ab8a27f84484f6bee11f96140ae8..c8a18e543ef7c877580b3e30095c1854e243548e 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -4,27 +4,22 @@ import { QUERY_CACHE_DURATION, rabbitMq, redis, ums, type QueryExecutionTypes } import { log } from '../logger'; import { QueryPublisher } from '../utils/queryPublisher'; import { query2Cypher } from '../utils/cypher/converter'; -import { parseCypherQuery } from '../utils/cypher/queryParser'; +import { parseCountCypherQuery, parseCypherQuery } from '../utils/cypher/queryParser'; import { formatTimeDifference } from 'ts-common/src/logger/logger'; import { Query2BackendQuery } from '../utils/reactflow/query2backend'; import type { GraphQueryResultFromBackend, GraphQueryResultMetaFromBackend } from 'ts-common/src/model/webSocket/graphResult'; import { RabbitMqBroker } from 'ts-common/rabbitMq'; import { Neo4jConnection } from 'ts-common/neo4j'; +import type { QueryCypher } from '../utils/cypher/converter/queryConverter'; -export const queryService = async (db: DbConnection, query: string): Promise<GraphQueryResultMetaFromBackend> => { - let index = 0; - const disambiguatedQuery = query.replace(/\d{13}/g, () => (index++).toString()); - const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString(); - +async function cacheCheck(cacheKey: string): Promise<GraphQueryResultMetaFromBackend | undefined> { if (QUERY_CACHE_DURATION === '') { log.info('Query cache disabled, skipping cache check'); } else { - // check for cached results log.debug('Checking cache for query, with cache ttl', QUERY_CACHE_DURATION, 'seconds'); const cached = await redis.client.get(cacheKey); if (cached) { log.info('Cache hit for query'); - log.debug('Cache hit for query', disambiguatedQuery); const buf = Buffer.from(cached, 'base64'); const inflated = Bun.gunzipSync(new Uint8Array(buf)); const dec = new TextDecoder(); @@ -32,15 +27,29 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra return cachedMessage; } } +} + +export const queryService = async (db: DbConnection, cypher: QueryCypher): Promise<GraphQueryResultMetaFromBackend> => { + let index = 0; + const disambiguatedQuery = cypher.query.replace(/\d{13}/g, () => (index++).toString()); + const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString(); + + const cachedMessage = await cacheCheck(cacheKey); + if (cachedMessage) { + log.debug('Cache hit for query', disambiguatedQuery); + return cachedMessage; + } // TODO: only neo4j is supported for now const connection = new Neo4jConnection(db); try { - const [neo4jResult] = await connection.run([query]); + const [neo4jResult, neo4jCountResult] = await connection.run([cypher.query, cypher.countQuery]); const graph = parseCypherQuery(neo4jResult.records); + const countGraph = parseCountCypherQuery(neo4jCountResult.records); // calculate metadata - const result = graphQueryBackend2graphQuery(graph); + const result = graphQueryBackend2graphQuery(graph, countGraph); + result.nodeCounts.updatedAt = Date.now(); // cache result const compressedMessage = Bun.gzipSync(JSON.stringify(result)); @@ -53,7 +62,7 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra return result; } catch (error) { - log.error('Error parsing query result:', query, error); + log.error('Error parsing query result:', cypher, error); throw new Error('Error parsing query result'); } finally { connection.close(); @@ -135,6 +144,7 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu publisher.publishResultToFrontend({ nodes: [], edges: [], + nodeCounts: { updatedAt: 0 }, metaData: { topological: { density: 0, self_loops: 0 }, nodes: { count: 0, labels: [], types: {} }, @@ -151,7 +161,8 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu log.debug('translating query:', convertedQuery); publisher.publishStatusToFrontend('Translating'); - const query = query2Cypher(convertedQuery); + const cypher = query2Cypher(convertedQuery); + const query = cypher.query; if (query == null) { log.error('Error translating query:', convertedQuery); publisher.publishErrorToFrontend('Error translating query'); @@ -160,31 +171,41 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu log.debug('Translated query FROM:', convertedQuery); log.info('Translated query:', query); + log.info('Translated query:', cypher.countQuery); publisher.publishTranslationResultToFrontend(query); for (let i = 0; i < ss.dbConnections.length; i++) { - queryService(ss.dbConnections[i], query) - .then(result => { - publisher.publishResultToFrontend(result); - log.debug('Query result!'); - log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); - - if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { - for (let i = 0; i < convertedQuery.machineLearning.length; i++) { - try { - publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); - log.debug('Published machine learning request', convertedQuery.machineLearning[i]); - } catch (error) { - log.error('Error publishing machine learning request', error); - publisher.publishErrorToFrontend('Error publishing machine learning request'); - } + const result = await queryService(ss.dbConnections[i], cypher); + + try { + // Cache nodeCounts such that we can display differentiation for each query + await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, { + ...activeQueryInfo, + graph: { + ...activeQueryInfo.graph, + nodeCounts: result.nodeCounts, + }, + }); + + publisher.publishResultToFrontend(result); + log.debug('Query result!'); + log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); + + if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { + for (let i = 0; i < convertedQuery.machineLearning.length; i++) { + try { + publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); + log.debug('Published machine learning request', convertedQuery.machineLearning[i]); + } catch (error) { + log.error('Error publishing machine learning request', error); + publisher.publishErrorToFrontend('Error publishing machine learning request'); } } - }) - .catch(error => { - log.error('Error querying database', error); - publisher.publishErrorToFrontend('Error querying database'); - }); + } + } catch (error) { + log.error('Error querying database', error); + publisher.publishErrorToFrontend('Error querying database'); + } } }); }; diff --git a/src/utils/cypher/converter/queryConverter.test.ts b/src/utils/cypher/converter/queryConverter.test.ts index f65b5c5815c445f0127cb1892fbbe0e75fe5ee20..eb561d33deea702e621fb6efd0cbba3330d4c205 100644 --- a/src/utils/cypher/converter/queryConverter.test.ts +++ b/src/utils/cypher/converter/queryConverter.test.ts @@ -59,8 +59,12 @@ describe('query2Cypher', () => { const expectedCypher = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie)) MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre)) RETURN * LIMIT 5000`; + const expectedCypherCount = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie)) + MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre)) + RETURN COUNT(p1) as p1_count, COUNT(m1) as m1_count, COUNT(g1) as g1_count`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.countQuery)).toBe(fixCypherSpaces(expectedCypherCount)); }); it('should return correctly on a complex query with logic', () => { @@ -111,7 +115,7 @@ describe('query2Cypher', () => { WHERE (p1.name <> "Raymond Campbell") RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with group by logic', () => { @@ -154,7 +158,7 @@ describe('query2Cypher', () => { WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age)) RETURN path2 LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with no label', () => { @@ -188,7 +192,7 @@ describe('query2Cypher', () => { WHERE ((movie.year - p1.year) < 10) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with no depth', () => { @@ -227,7 +231,7 @@ describe('query2Cypher', () => { WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age)) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with average calculation', () => { @@ -264,7 +268,7 @@ describe('query2Cypher', () => { WHERE (p1.age < p1_age_avg) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with average calculation and multiple paths', () => { @@ -319,7 +323,7 @@ describe('query2Cypher', () => { MATCH path2 = ((p2:Person)-[acted:ACTED_IN*1..1]->(movie:Movie)) WHERE (p1.age < p1_age_avg) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a single entity query with lower like logic', () => { @@ -344,7 +348,7 @@ describe('query2Cypher', () => { WHERE (toLower(p1.name) =~ (".*" + "john" + ".*")) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with like logic', () => { @@ -375,7 +379,7 @@ describe('query2Cypher', () => { WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*")) RETURN * LIMIT 500`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with both direction relation', () => { @@ -406,7 +410,7 @@ describe('query2Cypher', () => { WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*")) RETURN * LIMIT 500`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with relation logic', () => { @@ -436,7 +440,7 @@ describe('query2Cypher', () => { WHERE ALL(path_0_rel_id_1698231933579 in id_1698231933579 WHERE (path_0_rel_id_1698231933579.unitPrice < 10)) RETURN * LIMIT 500`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with count logic', () => { @@ -472,7 +476,7 @@ describe('query2Cypher', () => { WHERE (p1_count > 1) RETURN * LIMIT 5000`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with empty relation', () => { @@ -503,7 +507,7 @@ describe('query2Cypher', () => { const expectedCypher = `MATCH path_0 = ((id_1730483610947:Movie)) RETURN * LIMIT 500`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with upper case logic', () => { @@ -529,7 +533,7 @@ describe('query2Cypher', () => { RETURN * LIMIT 500`; - expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher)); }); it('should return correctly on a query with boolean logic in relation', () => { @@ -624,6 +628,6 @@ describe('query2Cypher', () => { WHERE ALL(path_2_rel_id_1737115886717 in id_1737115886717 WHERE ((id_1737115397423.GO = "852349") and (path_2_rel_id_1737115886717.goodFlow = false))) RETURN * LIMIT 100`; - expect(fixCypherSpaces(cypher)).toEqual(fixCypherSpaces(expectedCypher)); + expect(fixCypherSpaces(cypher.query)).toEqual(fixCypherSpaces(expectedCypher)); }); }); diff --git a/src/utils/cypher/converter/queryConverter.ts b/src/utils/cypher/converter/queryConverter.ts index b127f76f5481aacd8fa109329f50e893ae2ffb46..d46071bef8af99b9baffbf53ae2f0ea2b1daa8f8 100644 --- a/src/utils/cypher/converter/queryConverter.ts +++ b/src/utils/cypher/converter/queryConverter.ts @@ -8,9 +8,15 @@ import { extractLogicCypher } from './logic'; import { extractExportCypher } from './export'; import type { QueryCacheData } from './model'; import { getNodeCypher } from './node'; +import { log } from 'ts-common/src/logger/logger'; + +export type QueryCypher = { + query: string; + countQuery: string; +}; // formQuery uses the hierarchy to create cypher for each part of the query in the right order -export function query2Cypher(JSONQuery: BackendQueryFormat): string | null { +export function query2Cypher(JSONQuery: BackendQueryFormat): QueryCypher { let totalQuery = ''; let matchQuery = ''; let cacheData: QueryCacheData = { entities: [], relations: [], unwinds: [] }; @@ -27,6 +33,10 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null { for (let i = 0; i < cacheData.relations.length; i++) { cacheData.relations[i].queryId = query.id; } + // force contents of cacheData to be unique + cacheData.entities = [...new Map(cacheData.entities.map(item => [item.id, item])).values()]; + cacheData.relations = [...new Map(cacheData.relations.map(item => [item.id, item])).values()]; + cacheData.unwinds = [...new Set(cacheData.unwinds)]; // Generate cypher query for path if (query.id) { @@ -69,6 +79,8 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null { totalQuery += `WHERE ${logic}\n`; } + let countQuery = totalQuery; + // RETURN block if (JSONQuery.return.length === 0 || JSONQuery.return[0] === '*') { totalQuery += 'RETURN *'; @@ -79,5 +91,14 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null { // LIMIT block totalQuery += `\nLIMIT ${JSONQuery.limit}`; - return totalQuery; + countQuery += 'RETURN '; + countQuery += Object.values(cacheData.entities) + .map(e => `COUNT(DISTINCT ${e.id}) as ${e.id}_count`) + .join(', '); + countQuery += Object.values(cacheData.relations).length > 0 ? ', ' : ''; + countQuery += Object.values(cacheData.relations) + .map(r => `COUNT(DISTINCT ${r.id}) as ${r.id}_count`) + .join(', '); + + return { query: totalQuery, countQuery }; } diff --git a/src/utils/cypher/queryParser.ts b/src/utils/cypher/queryParser.ts index 9b61108980f0272cd408df17ccb351fa4280edae..4f25cdd7d23a432cb6d63f4ad18bc221e115dc43 100644 --- a/src/utils/cypher/queryParser.ts +++ b/src/utils/cypher/queryParser.ts @@ -13,7 +13,7 @@ import { type RecordShape, } from 'neo4j-driver'; import { log } from '../../logger'; -import type { EdgeQueryResult, NodeQueryResult } from 'ts-common'; +import type { CountQueryResultFromBackend, EdgeQueryResult, NodeQueryResult } from 'ts-common'; import type { GraphQueryResultFromBackend } from 'ts-common'; export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' | 'table' = 'nodelink'): GraphQueryResultFromBackend { @@ -40,6 +40,23 @@ export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' | throw err; } } +export function parseCountCypherQuery(result: RecordShape[]): CountQueryResultFromBackend { + try { + const countResult: CountQueryResultFromBackend = {}; + for (let i = 0; i < result.length; i++) { + const r = result[i]; + for (let j = 0; j < r.keys.length; j++) { + const k = r.keys[j]; + countResult[k] = r.get(k).toNumber(); + } + } + return countResult; + } catch (err) { + log.error(`Error executing query`, err); + throw err; + } +} + function parseNodeLinkQuery(results: RecordShape[]): GraphQueryResultFromBackend { const nodes: NodeQueryResult[] = []; const edges: EdgeQueryResult[] = [];