From e9ab5ca7fdd225877cad67cda0780e73c2580237 Mon Sep 17 00:00:00 2001
From: Dennis Collaris <d.a.c.collaris@uu.nl>
Date: Mon, 17 Feb 2025 09:45:01 +0000
Subject: [PATCH] feat: differentation

---
 src/readers/insightProcessor.ts               |  6 +-
 src/readers/queryService.ts                   | 85 ++++++++++++-------
 .../cypher/converter/queryConverter.test.ts   | 34 ++++----
 src/utils/cypher/converter/queryConverter.ts  | 25 +++++-
 src/utils/cypher/queryParser.ts               | 19 ++++-
 5 files changed, 116 insertions(+), 53 deletions(-)

diff --git a/src/readers/insightProcessor.ts b/src/readers/insightProcessor.ts
index ad60515..50bd7be 100644
--- a/src/readers/insightProcessor.ts
+++ b/src/readers/insightProcessor.ts
@@ -91,10 +91,10 @@ export const insightProcessor = async () => {
       const visualQuery = ss.queryStates.openQueryArray[queryIndex].graph;
       const queryBuilderSettings = ss.queryStates.openQueryArray[queryIndex].settings;
       const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, []);
-      const query = query2Cypher(convertedQuery);
-      if (query == null) return;
+      const cypher = query2Cypher(convertedQuery);
+      if (cypher == null) return;
       try {
-        const result = await queryService(ss.dbConnections[0], query);
+        const result = await queryService(ss.dbConnections[0], cypher);
 
         insight.status = false;
 
diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts
index 2179987..c8a18e5 100644
--- a/src/readers/queryService.ts
+++ b/src/readers/queryService.ts
@@ -4,27 +4,22 @@ import { QUERY_CACHE_DURATION, rabbitMq, redis, ums, type QueryExecutionTypes }
 import { log } from '../logger';
 import { QueryPublisher } from '../utils/queryPublisher';
 import { query2Cypher } from '../utils/cypher/converter';
-import { parseCypherQuery } from '../utils/cypher/queryParser';
+import { parseCountCypherQuery, parseCypherQuery } from '../utils/cypher/queryParser';
 import { formatTimeDifference } from 'ts-common/src/logger/logger';
 import { Query2BackendQuery } from '../utils/reactflow/query2backend';
 import type { GraphQueryResultFromBackend, GraphQueryResultMetaFromBackend } from 'ts-common/src/model/webSocket/graphResult';
 import { RabbitMqBroker } from 'ts-common/rabbitMq';
 import { Neo4jConnection } from 'ts-common/neo4j';
+import type { QueryCypher } from '../utils/cypher/converter/queryConverter';
 
-export const queryService = async (db: DbConnection, query: string): Promise<GraphQueryResultMetaFromBackend> => {
-  let index = 0;
-  const disambiguatedQuery = query.replace(/\d{13}/g, () => (index++).toString());
-  const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString();
-
+async function cacheCheck(cacheKey: string): Promise<GraphQueryResultMetaFromBackend | undefined> {
   if (QUERY_CACHE_DURATION === '') {
     log.info('Query cache disabled, skipping cache check');
   } else {
-    // check for cached results
     log.debug('Checking cache for query, with cache ttl', QUERY_CACHE_DURATION, 'seconds');
     const cached = await redis.client.get(cacheKey);
     if (cached) {
       log.info('Cache hit for query');
-      log.debug('Cache hit for query', disambiguatedQuery);
       const buf = Buffer.from(cached, 'base64');
       const inflated = Bun.gunzipSync(new Uint8Array(buf));
       const dec = new TextDecoder();
@@ -32,15 +27,29 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra
       return cachedMessage;
     }
   }
+}
+
+export const queryService = async (db: DbConnection, cypher: QueryCypher): Promise<GraphQueryResultMetaFromBackend> => {
+  let index = 0;
+  const disambiguatedQuery = cypher.query.replace(/\d{13}/g, () => (index++).toString());
+  const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString();
+
+  const cachedMessage = await cacheCheck(cacheKey);
+  if (cachedMessage) {
+    log.debug('Cache hit for query', disambiguatedQuery);
+    return cachedMessage;
+  }
 
   // TODO: only neo4j is supported for now
   const connection = new Neo4jConnection(db);
   try {
-    const [neo4jResult] = await connection.run([query]);
+    const [neo4jResult, neo4jCountResult] = await connection.run([cypher.query, cypher.countQuery]);
     const graph = parseCypherQuery(neo4jResult.records);
+    const countGraph = parseCountCypherQuery(neo4jCountResult.records);
 
     // calculate metadata
-    const result = graphQueryBackend2graphQuery(graph);
+    const result = graphQueryBackend2graphQuery(graph, countGraph);
+    result.nodeCounts.updatedAt = Date.now();
 
     // cache result
     const compressedMessage = Bun.gzipSync(JSON.stringify(result));
@@ -53,7 +62,7 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra
 
     return result;
   } catch (error) {
-    log.error('Error parsing query result:', query, error);
+    log.error('Error parsing query result:', cypher, error);
     throw new Error('Error parsing query result');
   } finally {
     connection.close();
@@ -135,6 +144,7 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
       publisher.publishResultToFrontend({
         nodes: [],
         edges: [],
+        nodeCounts: { updatedAt: 0 },
         metaData: {
           topological: { density: 0, self_loops: 0 },
           nodes: { count: 0, labels: [], types: {} },
@@ -151,7 +161,8 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
     log.debug('translating query:', convertedQuery);
     publisher.publishStatusToFrontend('Translating');
 
-    const query = query2Cypher(convertedQuery);
+    const cypher = query2Cypher(convertedQuery);
+    const query = cypher.query;
     if (query == null) {
       log.error('Error translating query:', convertedQuery);
       publisher.publishErrorToFrontend('Error translating query');
@@ -160,31 +171,41 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
 
     log.debug('Translated query FROM:', convertedQuery);
     log.info('Translated query:', query);
+    log.info('Translated query:', cypher.countQuery);
     publisher.publishTranslationResultToFrontend(query);
 
     for (let i = 0; i < ss.dbConnections.length; i++) {
-      queryService(ss.dbConnections[i], query)
-        .then(result => {
-          publisher.publishResultToFrontend(result);
-          log.debug('Query result!');
-          log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
-
-          if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
-            for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
-              try {
-                publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
-                log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
-              } catch (error) {
-                log.error('Error publishing machine learning request', error);
-                publisher.publishErrorToFrontend('Error publishing machine learning request');
-              }
+      const result = await queryService(ss.dbConnections[i], cypher);
+
+      try {
+        // Cache nodeCounts such that we can display differentiation for each query
+        await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
+          ...activeQueryInfo,
+          graph: {
+            ...activeQueryInfo.graph,
+            nodeCounts: result.nodeCounts,
+          },
+        });
+
+        publisher.publishResultToFrontend(result);
+        log.debug('Query result!');
+        log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
+
+        if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
+          for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
+            try {
+              publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
+              log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
+            } catch (error) {
+              log.error('Error publishing machine learning request', error);
+              publisher.publishErrorToFrontend('Error publishing machine learning request');
             }
           }
-        })
-        .catch(error => {
-          log.error('Error querying database', error);
-          publisher.publishErrorToFrontend('Error querying database');
-        });
+        }
+      } catch (error) {
+        log.error('Error querying database', error);
+        publisher.publishErrorToFrontend('Error querying database');
+      }
     }
   });
 };
diff --git a/src/utils/cypher/converter/queryConverter.test.ts b/src/utils/cypher/converter/queryConverter.test.ts
index f65b5c5..eb561d3 100644
--- a/src/utils/cypher/converter/queryConverter.test.ts
+++ b/src/utils/cypher/converter/queryConverter.test.ts
@@ -59,8 +59,12 @@ describe('query2Cypher', () => {
     const expectedCypher = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie))
         MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre))
         RETURN * LIMIT 5000`;
+    const expectedCypherCount = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie))
+        MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre))
+        RETURN COUNT(p1) as p1_count, COUNT(m1) as m1_count, COUNT(g1) as g1_count`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.countQuery)).toBe(fixCypherSpaces(expectedCypherCount));
   });
 
   it('should return correctly on a complex query with logic', () => {
@@ -111,7 +115,7 @@ describe('query2Cypher', () => {
         WHERE (p1.name <> "Raymond Campbell")
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with group by logic', () => {
@@ -154,7 +158,7 @@ describe('query2Cypher', () => {
         WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age))
         RETURN path2 LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with no label', () => {
@@ -188,7 +192,7 @@ describe('query2Cypher', () => {
         WHERE ((movie.year - p1.year) < 10)
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with no depth', () => {
@@ -227,7 +231,7 @@ describe('query2Cypher', () => {
         WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age))
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with average calculation', () => {
@@ -264,7 +268,7 @@ describe('query2Cypher', () => {
         WHERE (p1.age <  p1_age_avg)
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with average calculation and multiple paths', () => {
@@ -319,7 +323,7 @@ describe('query2Cypher', () => {
       MATCH path2 = ((p2:Person)-[acted:ACTED_IN*1..1]->(movie:Movie))
       WHERE (p1.age < p1_age_avg) RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a single entity query with lower like logic', () => {
@@ -344,7 +348,7 @@ describe('query2Cypher', () => {
         WHERE (toLower(p1.name) =~ (".*" + "john" + ".*"))
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with like logic', () => {
@@ -375,7 +379,7 @@ describe('query2Cypher', () => {
         WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*")) 
         RETURN * LIMIT 500`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with both direction relation', () => {
@@ -406,7 +410,7 @@ describe('query2Cypher', () => {
         WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*")) 
         RETURN * LIMIT 500`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with relation logic', () => {
@@ -436,7 +440,7 @@ describe('query2Cypher', () => {
         WHERE ALL(path_0_rel_id_1698231933579 in id_1698231933579 WHERE (path_0_rel_id_1698231933579.unitPrice < 10))
         RETURN * LIMIT 500`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with count logic', () => {
@@ -472,7 +476,7 @@ describe('query2Cypher', () => {
         WHERE (p1_count > 1)
         RETURN * LIMIT 5000`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with empty relation', () => {
@@ -503,7 +507,7 @@ describe('query2Cypher', () => {
     const expectedCypher = `MATCH path_0 = ((id_1730483610947:Movie))
         RETURN * LIMIT 500`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with upper case logic', () => {
@@ -529,7 +533,7 @@ describe('query2Cypher', () => {
         RETURN *
         LIMIT 500`;
 
-    expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
   });
 
   it('should return correctly on a query with boolean logic in relation', () => {
@@ -624,6 +628,6 @@ describe('query2Cypher', () => {
     WHERE ALL(path_2_rel_id_1737115886717 in id_1737115886717 WHERE ((id_1737115397423.GO = "852349") and
     (path_2_rel_id_1737115886717.goodFlow = false))) RETURN * LIMIT 100`;
 
-    expect(fixCypherSpaces(cypher)).toEqual(fixCypherSpaces(expectedCypher));
+    expect(fixCypherSpaces(cypher.query)).toEqual(fixCypherSpaces(expectedCypher));
   });
 });
diff --git a/src/utils/cypher/converter/queryConverter.ts b/src/utils/cypher/converter/queryConverter.ts
index b127f76..d46071b 100644
--- a/src/utils/cypher/converter/queryConverter.ts
+++ b/src/utils/cypher/converter/queryConverter.ts
@@ -8,9 +8,15 @@ import { extractLogicCypher } from './logic';
 import { extractExportCypher } from './export';
 import type { QueryCacheData } from './model';
 import { getNodeCypher } from './node';
+import { log } from 'ts-common/src/logger/logger';
+
+export type QueryCypher = {
+  query: string;
+  countQuery: string;
+};
 
 // formQuery uses the hierarchy to create cypher for each part of the query in the right order
-export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
+export function query2Cypher(JSONQuery: BackendQueryFormat): QueryCypher {
   let totalQuery = '';
   let matchQuery = '';
   let cacheData: QueryCacheData = { entities: [], relations: [], unwinds: [] };
@@ -27,6 +33,10 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
     for (let i = 0; i < cacheData.relations.length; i++) {
       cacheData.relations[i].queryId = query.id;
     }
+    // force contents of cacheData to be unique
+    cacheData.entities = [...new Map(cacheData.entities.map(item => [item.id, item])).values()];
+    cacheData.relations = [...new Map(cacheData.relations.map(item => [item.id, item])).values()];
+    cacheData.unwinds = [...new Set(cacheData.unwinds)];
 
     // Generate cypher query for path
     if (query.id) {
@@ -69,6 +79,8 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
     totalQuery += `WHERE ${logic}\n`;
   }
 
+  let countQuery = totalQuery;
+
   // RETURN block
   if (JSONQuery.return.length === 0 || JSONQuery.return[0] === '*') {
     totalQuery += 'RETURN *';
@@ -79,5 +91,14 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
   // LIMIT block
   totalQuery += `\nLIMIT ${JSONQuery.limit}`;
 
-  return totalQuery;
+  countQuery += 'RETURN ';
+  countQuery += Object.values(cacheData.entities)
+    .map(e => `COUNT(DISTINCT ${e.id}) as ${e.id}_count`)
+    .join(', ');
+  countQuery += Object.values(cacheData.relations).length > 0 ? ', ' : '';
+  countQuery += Object.values(cacheData.relations)
+    .map(r => `COUNT(DISTINCT ${r.id}) as ${r.id}_count`)
+    .join(', ');
+
+  return { query: totalQuery, countQuery };
 }
diff --git a/src/utils/cypher/queryParser.ts b/src/utils/cypher/queryParser.ts
index 9b61108..4f25cdd 100644
--- a/src/utils/cypher/queryParser.ts
+++ b/src/utils/cypher/queryParser.ts
@@ -13,7 +13,7 @@ import {
   type RecordShape,
 } from 'neo4j-driver';
 import { log } from '../../logger';
-import type { EdgeQueryResult, NodeQueryResult } from 'ts-common';
+import type { CountQueryResultFromBackend, EdgeQueryResult, NodeQueryResult } from 'ts-common';
 import type { GraphQueryResultFromBackend } from 'ts-common';
 
 export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' | 'table' = 'nodelink'): GraphQueryResultFromBackend {
@@ -40,6 +40,23 @@ export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' |
     throw err;
   }
 }
+export function parseCountCypherQuery(result: RecordShape[]): CountQueryResultFromBackend {
+  try {
+    const countResult: CountQueryResultFromBackend = {};
+    for (let i = 0; i < result.length; i++) {
+      const r = result[i];
+      for (let j = 0; j < r.keys.length; j++) {
+        const k = r.keys[j];
+        countResult[k] = r.get(k).toNumber();
+      }
+    }
+    return countResult;
+  } catch (err) {
+    log.error(`Error executing query`, err);
+    throw err;
+  }
+}
+
 function parseNodeLinkQuery(results: RecordShape[]): GraphQueryResultFromBackend {
   const nodes: NodeQueryResult[] = [];
   const edges: EdgeQueryResult[] = [];
-- 
GitLab