Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • graphpolaris/microservices/query-service
1 result
Show changes
Commits on Source (2)
......@@ -91,10 +91,10 @@ export const insightProcessor = async () => {
const visualQuery = ss.queryStates.openQueryArray[queryIndex].graph;
const queryBuilderSettings = ss.queryStates.openQueryArray[queryIndex].settings;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, []);
const query = query2Cypher(convertedQuery);
if (query == null) return;
const cypher = query2Cypher(convertedQuery);
if (cypher == null) return;
try {
const result = await queryService(ss.dbConnections[0], query);
const result = await queryService(ss.dbConnections[0], cypher);
insight.status = false;
......
......@@ -4,27 +4,22 @@ import { QUERY_CACHE_DURATION, rabbitMq, redis, ums, type QueryExecutionTypes }
import { log } from '../logger';
import { QueryPublisher } from '../utils/queryPublisher';
import { query2Cypher } from '../utils/cypher/converter';
import { parseCypherQuery } from '../utils/cypher/queryParser';
import { parseCountCypherQuery, parseCypherQuery } from '../utils/cypher/queryParser';
import { formatTimeDifference } from 'ts-common/src/logger/logger';
import { Query2BackendQuery } from '../utils/reactflow/query2backend';
import type { GraphQueryResultFromBackend, GraphQueryResultMetaFromBackend } from 'ts-common/src/model/webSocket/graphResult';
import { RabbitMqBroker } from 'ts-common/rabbitMq';
import { Neo4jConnection } from 'ts-common/neo4j';
import type { QueryCypher } from '../utils/cypher/converter/queryConverter';
export const queryService = async (db: DbConnection, query: string): Promise<GraphQueryResultMetaFromBackend> => {
let index = 0;
const disambiguatedQuery = query.replace(/\d{13}/g, () => (index++).toString());
const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString();
async function cacheCheck(cacheKey: string): Promise<GraphQueryResultMetaFromBackend | undefined> {
if (QUERY_CACHE_DURATION === '') {
log.info('Query cache disabled, skipping cache check');
} else {
// check for cached results
log.debug('Checking cache for query, with cache ttl', QUERY_CACHE_DURATION, 'seconds');
const cached = await redis.client.get(cacheKey);
if (cached) {
log.info('Cache hit for query');
log.debug('Cache hit for query', disambiguatedQuery);
const buf = Buffer.from(cached, 'base64');
const inflated = Bun.gunzipSync(new Uint8Array(buf));
const dec = new TextDecoder();
......@@ -32,15 +27,29 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra
return cachedMessage;
}
}
}
export const queryService = async (db: DbConnection, cypher: QueryCypher): Promise<GraphQueryResultMetaFromBackend> => {
let index = 0;
const disambiguatedQuery = cypher.query.replace(/\d{13}/g, () => (index++).toString());
const cacheKey = Bun.hash(JSON.stringify({ db: db, query: disambiguatedQuery })).toString();
const cachedMessage = await cacheCheck(cacheKey);
if (cachedMessage) {
log.debug('Cache hit for query', disambiguatedQuery);
return cachedMessage;
}
// TODO: only neo4j is supported for now
const connection = new Neo4jConnection(db);
try {
const [neo4jResult] = await connection.run([query]);
const [neo4jResult, neo4jCountResult] = await connection.run([cypher.query, cypher.countQuery]);
const graph = parseCypherQuery(neo4jResult.records);
const countGraph = parseCountCypherQuery(neo4jCountResult.records);
// calculate metadata
const result = graphQueryBackend2graphQuery(graph);
const result = graphQueryBackend2graphQuery(graph, countGraph);
result.nodeCounts.updatedAt = Date.now();
// cache result
const compressedMessage = Bun.gzipSync(JSON.stringify(result));
......@@ -53,7 +62,7 @@ export const queryService = async (db: DbConnection, query: string): Promise<Gra
return result;
} catch (error) {
log.error('Error parsing query result:', query, error);
log.error('Error parsing query result:', cypher, error);
throw new Error('Error parsing query result');
} finally {
connection.close();
......@@ -135,6 +144,7 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
publisher.publishResultToFrontend({
nodes: [],
edges: [],
nodeCounts: { updatedAt: 0 },
metaData: {
topological: { density: 0, self_loops: 0 },
nodes: { count: 0, labels: [], types: {} },
......@@ -151,7 +161,8 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
const query = query2Cypher(convertedQuery);
const cypher = query2Cypher(convertedQuery);
const query = cypher.query;
if (query == null) {
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
......@@ -160,31 +171,41 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.debug('Translated query FROM:', convertedQuery);
log.info('Translated query:', query);
log.info('Translated query:', cypher.countQuery);
publisher.publishTranslationResultToFrontend(query);
for (let i = 0; i < ss.dbConnections.length; i++) {
queryService(ss.dbConnections[i], query)
.then(result => {
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
}
const result = await queryService(ss.dbConnections[i], cypher);
try {
// Cache nodeCounts such that we can display differentiation for each query
await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, {
...activeQueryInfo,
graph: {
...activeQueryInfo.graph,
nodeCounts: result.nodeCounts,
},
});
publisher.publishResultToFrontend(result);
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
}
}
})
.catch(error => {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
});
}
} catch (error) {
log.error('Error querying database', error);
publisher.publishErrorToFrontend('Error querying database');
}
}
});
};
......@@ -59,8 +59,12 @@ describe('query2Cypher', () => {
const expectedCypher = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie))
MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre))
RETURN * LIMIT 5000`;
const expectedCypherCount = `MATCH path1 = ((p1:Person)-[:DIRECTED*1..1]->(m1:Movie))
MATCH path2 = ((p1:Person)-[:IN_GENRE*1..1]->(g1:Genre))
RETURN COUNT(p1) as p1_count, COUNT(m1) as m1_count, COUNT(g1) as g1_count`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.countQuery)).toBe(fixCypherSpaces(expectedCypherCount));
});
it('should return correctly on a complex query with logic', () => {
......@@ -111,7 +115,7 @@ describe('query2Cypher', () => {
WHERE (p1.name <> "Raymond Campbell")
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with group by logic', () => {
......@@ -154,7 +158,7 @@ describe('query2Cypher', () => {
WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age))
RETURN path2 LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with no label', () => {
......@@ -188,7 +192,7 @@ describe('query2Cypher', () => {
WHERE ((movie.year - p1.year) < 10)
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with no depth', () => {
......@@ -227,7 +231,7 @@ describe('query2Cypher', () => {
WHERE ((movie.imdbRating < 7.5) and (p2.age = p1.age))
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with average calculation', () => {
......@@ -264,7 +268,7 @@ describe('query2Cypher', () => {
WHERE (p1.age < p1_age_avg)
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with average calculation and multiple paths', () => {
......@@ -319,7 +323,7 @@ describe('query2Cypher', () => {
MATCH path2 = ((p2:Person)-[acted:ACTED_IN*1..1]->(movie:Movie))
WHERE (p1.age < p1_age_avg) RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a single entity query with lower like logic', () => {
......@@ -344,7 +348,7 @@ describe('query2Cypher', () => {
WHERE (toLower(p1.name) =~ (".*" + "john" + ".*"))
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with like logic', () => {
......@@ -375,7 +379,7 @@ describe('query2Cypher', () => {
WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*"))
RETURN * LIMIT 500`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with both direction relation', () => {
......@@ -406,7 +410,7 @@ describe('query2Cypher', () => {
WHERE (id_1691576718400.title =~ (".*" + "ale" + ".*"))
RETURN * LIMIT 500`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with relation logic', () => {
......@@ -436,7 +440,7 @@ describe('query2Cypher', () => {
WHERE ALL(path_0_rel_id_1698231933579 in id_1698231933579 WHERE (path_0_rel_id_1698231933579.unitPrice < 10))
RETURN * LIMIT 500`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with count logic', () => {
......@@ -472,7 +476,7 @@ describe('query2Cypher', () => {
WHERE (p1_count > 1)
RETURN * LIMIT 5000`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with empty relation', () => {
......@@ -503,7 +507,7 @@ describe('query2Cypher', () => {
const expectedCypher = `MATCH path_0 = ((id_1730483610947:Movie))
RETURN * LIMIT 500`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with upper case logic', () => {
......@@ -529,7 +533,7 @@ describe('query2Cypher', () => {
RETURN *
LIMIT 500`;
expect(fixCypherSpaces(cypher)).toBe(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toBe(fixCypherSpaces(expectedCypher));
});
it('should return correctly on a query with boolean logic in relation', () => {
......@@ -624,6 +628,6 @@ describe('query2Cypher', () => {
WHERE ALL(path_2_rel_id_1737115886717 in id_1737115886717 WHERE ((id_1737115397423.GO = "852349") and
(path_2_rel_id_1737115886717.goodFlow = false))) RETURN * LIMIT 100`;
expect(fixCypherSpaces(cypher)).toEqual(fixCypherSpaces(expectedCypher));
expect(fixCypherSpaces(cypher.query)).toEqual(fixCypherSpaces(expectedCypher));
});
});
......@@ -8,9 +8,15 @@ import { extractLogicCypher } from './logic';
import { extractExportCypher } from './export';
import type { QueryCacheData } from './model';
import { getNodeCypher } from './node';
import { log } from 'ts-common/src/logger/logger';
export type QueryCypher = {
query: string;
countQuery: string;
};
// formQuery uses the hierarchy to create cypher for each part of the query in the right order
export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
export function query2Cypher(JSONQuery: BackendQueryFormat): QueryCypher {
let totalQuery = '';
let matchQuery = '';
let cacheData: QueryCacheData = { entities: [], relations: [], unwinds: [] };
......@@ -27,6 +33,10 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
for (let i = 0; i < cacheData.relations.length; i++) {
cacheData.relations[i].queryId = query.id;
}
// force contents of cacheData to be unique
cacheData.entities = [...new Map(cacheData.entities.map(item => [item.id, item])).values()];
cacheData.relations = [...new Map(cacheData.relations.map(item => [item.id, item])).values()];
cacheData.unwinds = [...new Set(cacheData.unwinds)];
// Generate cypher query for path
if (query.id) {
......@@ -69,6 +79,8 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
totalQuery += `WHERE ${logic}\n`;
}
let countQuery = totalQuery;
// RETURN block
if (JSONQuery.return.length === 0 || JSONQuery.return[0] === '*') {
totalQuery += 'RETURN *';
......@@ -79,5 +91,14 @@ export function query2Cypher(JSONQuery: BackendQueryFormat): string | null {
// LIMIT block
totalQuery += `\nLIMIT ${JSONQuery.limit}`;
return totalQuery;
countQuery += 'RETURN ';
countQuery += Object.values(cacheData.entities)
.map(e => `COUNT(DISTINCT ${e.id}) as ${e.id}_count`)
.join(', ');
countQuery += Object.values(cacheData.relations).length > 0 ? ', ' : '';
countQuery += Object.values(cacheData.relations)
.map(r => `COUNT(DISTINCT ${r.id}) as ${r.id}_count`)
.join(', ');
return { query: totalQuery, countQuery };
}
......@@ -13,7 +13,7 @@ import {
type RecordShape,
} from 'neo4j-driver';
import { log } from '../../logger';
import type { EdgeQueryResult, NodeQueryResult } from 'ts-common';
import type { CountQueryResultFromBackend, EdgeQueryResult, NodeQueryResult } from 'ts-common';
import type { GraphQueryResultFromBackend } from 'ts-common';
export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' | 'table' = 'nodelink'): GraphQueryResultFromBackend {
......@@ -40,6 +40,23 @@ export function parseCypherQuery(result: RecordShape[], returnType: 'nodelink' |
throw err;
}
}
export function parseCountCypherQuery(result: RecordShape[]): CountQueryResultFromBackend {
try {
const countResult: CountQueryResultFromBackend = {};
for (let i = 0; i < result.length; i++) {
const r = result[i];
for (let j = 0; j < r.keys.length; j++) {
const k = r.keys[j];
countResult[k] = r.get(k).toNumber();
}
}
return countResult;
} catch (err) {
log.error(`Error executing query`, err);
throw err;
}
}
function parseNodeLinkQuery(results: RecordShape[]): GraphQueryResultFromBackend {
const nodes: NodeQueryResult[] = [];
const edges: EdgeQueryResult[] = [];
......