Skip to content
Snippets Groups Projects
Commit 95e4069a authored by Dennis Collaris's avatar Dennis Collaris
Browse files

feat: run queries fully in the backend

parent 517b7ea4
No related branches found
No related tags found
1 merge request!15feat: run queries fully in the backend
Pipeline #142749 passed
No preview for this file type
import { getUserSaveState, type BackendQueryFormat, type GraphQueryResultFromBackend, type GraphQueryResultMetaFromBackend } from "ts-common";
import { getUserSaveState, type BackendQueryFormat, type QueryRequest, type GraphQueryResultFromBackend, type GraphQueryResultMetaFromBackend, mlDefaultState } from "ts-common";
import {
Neo4jConnection, type DbConnection,
RabbitMqBroker, RedisConnector
......@@ -10,6 +10,7 @@ import { query2Cypher } from "../utils/cypher/converter";
import { parseCypherQuery } from "../utils/cypher/queryParser";
import { formatTimeDifference } from "ts-common/src/logger/logger";
import { graphQueryBackend2graphQuery } from "../frontend/statistics";
import { Query2BackendQuery } from "../utils/reactflow/query2backend";
export const queryService = async (db: DbConnection, query: string): Promise<GraphQueryResultMetaFromBackend> => {
......@@ -52,7 +53,7 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.info('Connected to RabbitMQ ST!');
await queryServiceConsumer.startConsuming<BackendQueryFormat>("query-service", async (message, headers) => {
await queryServiceConsumer.startConsuming<QueryRequest>("query-service", async (message, headers) => {
const startTime = Date.now();
const ss = await getUserSaveState(headers.message.sessionData.userID, headers.message.sessionData.saveStateID, USER_MANAGEMENT_SERVICE_API);
const routingKey = await redis.getRouting(headers.message.sessionData.sessionID)
......@@ -67,7 +68,6 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.debug('Received query request:', message, headers);
publisher.withHeaders(headers).withRoutingKey(routingKey.QueueID).withQueryID(headers.queryID);
publisher.publishStatusToFrontend('Received');
......@@ -77,12 +77,17 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
return;
}
log.debug('translating query:', message.query);
const visualQuery = ss.queries[0].graph;
const queryBuilderSettings = ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
log.debug('translating query:', convertedQuery);
publisher.publishStatusToFrontend('Translating');
const query = query2Cypher(message);
const query = query2Cypher(convertedQuery);
if (query == null) {
log.error('Error translating query:', message.query);
log.error('Error translating query:', convertedQuery);
publisher.publishErrorToFrontend('Error translating query');
return;
}
......@@ -96,11 +101,11 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu
log.debug('Query result!');
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
if (message.machineLearning && message.machineLearning.length > 0) {
for (let i = 0; i < message.machineLearning.length; i++) {
if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) {
for (let i = 0; i < convertedQuery.machineLearning.length; i++) {
try {
publisher.publishMachineLearningRequest(result, message.machineLearning[i], headers);
log.debug('Published machine learning request', message.machineLearning[i]);
publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers);
log.debug('Published machine learning request', convertedQuery.machineLearning[i]);
} catch (error) {
log.error('Error publishing machine learning request', error);
publisher.publishErrorToFrontend('Error publishing machine learning request');
......
import type { SerializedNode } from 'graphology-types';
import { hasCycle } from 'graphology-dag';
import Graph from 'graphology';
import { allSimplePaths } from 'graphology-simple-path';
import type { BackendQueryFormat, NodeStruct, QueryStruct, RelationStruct } from 'ts-common';
import type { MachineLearning } from 'ts-common/src/model/query/queryRequestModel';
import type { QueryMultiGraph, EntityNodeAttributes, LogicNodeAttributes, QueryGraphNodes, RelationNodeAttributes } from 'ts-common/src/model/graphology';
import type { AllLogicStatement } from 'ts-common/src/model/query/logic/general';
import { QueryElementTypes } from 'ts-common/src/model/reactflow';
import { type QueryBuilderSettings, QueryUnionType } from 'ts-common/src/model/query/queryBuilderModel';
// Chunk extraction: traverse graph to find all paths of logic between relations and entities
const traverseEntityRelationPaths = (
node: SerializedNode<QueryGraphNodes>,
paths: QueryGraphNodes[][],
currentIdx: number,
graph: QueryMultiGraph,
entities: SerializedNode<EntityNodeAttributes>[],
relations: SerializedNode<RelationNodeAttributes>[],
settings: QueryBuilderSettings,
): number => {
if (!node?.attributes) throw Error('Malformed Graph! Node has no attributes');
if (!paths?.[currentIdx]) {
paths.push([]);
if (node.attributes.type === QueryElementTypes.Relation) {
paths[currentIdx].push({ type: QueryElementTypes.Entity, x: node.attributes.x, y: node.attributes.y, attributes: [] });
}
} else if (paths[currentIdx].length > 0) {
const lastNode = paths[currentIdx][paths[currentIdx].length - 1];
if (lastNode.type === node.attributes.type) {
if (lastNode.type === QueryElementTypes.Entity) {
paths[currentIdx].push({
type: QueryElementTypes.Relation,
collection: node.key,
x: node.attributes.x,
y: node.attributes.x,
depth: { min: settings.depth.min, max: settings.depth.max },
direction: 'both',
attributes: [],
});
} else {
paths[currentIdx].push({ type: QueryElementTypes.Entity, x: node.attributes.x, y: node.attributes.x, attributes: [] });
}
}
}
paths[currentIdx].push(node.attributes);
const edges = graph.edges.filter(
(n) =>
n?.attributes?.sourceHandleData.nodeType !== QueryElementTypes.Logic &&
n?.attributes?.targetHandleData.nodeType !== QueryElementTypes.Logic,
);
let connections = edges.filter((e) => e.source === node.key);
if (connections.length === 0) {
if (node.attributes.type === QueryElementTypes.Relation) {
paths[currentIdx].push({ type: QueryElementTypes.Entity, x: node.attributes.x, y: node.attributes.x, attributes: [] });
}
return 0;
}
const nodesToRight = connections
.map((c, i) => {
const rightNodeHandleData = c.attributes?.targetHandleData;
if (!rightNodeHandleData) throw Error('Malformed Graph! One or more edges of a relation node do not exist');
if (paths[currentIdx].length > 10 || currentIdx > 10)
throw Error('Malformed Graph! One or more edges of a relation node do not exist');
const rightNode =
rightNodeHandleData.nodeType === QueryElementTypes.Entity
? entities.find((r) => r.key === c.target)
: relations.find((r) => r.key === c.target);
return rightNode;
})
.filter((n) => n !== undefined) as SerializedNode<QueryGraphNodes>[];
let chunkOffset = 0;
let pathBeforeTraversing = [...paths[currentIdx]];
nodesToRight.forEach((rightNode, i) => {
if (i > 0) {
paths.push([...pathBeforeTraversing]); // clone previous path in case of branching
}
chunkOffset += traverseEntityRelationPaths(rightNode, paths, currentIdx + i + chunkOffset, graph, entities, relations, settings);
});
return chunkOffset + nodesToRight.length - 1; // offset
};
function calculateQueryLogic(
node: SerializedNode<LogicNodeAttributes>,
graph: QueryMultiGraph,
logics: SerializedNode<LogicNodeAttributes>[],
): AllLogicStatement {
if (!node?.attributes) throw Error('Malformed Graph! Node has no attributes');
let connectionsToLeft = graph.edges.filter((e) => e.target === node.key);
let ret = [...node.attributes.logic.logic].map((l) => {
if (typeof l !== 'string') throw Error('Malformed Graph! Logic node has no logic');
if (!node.attributes) throw Error('Malformed Graph! Logic node has no attributes');
if (l.includes('@')) {
// @ means it needs to fetch data from connection
let inputRef = node.attributes.logic.input;
if (l !== '@i' && node.attributes.logic.inputs.length > 0) {
const inputRefIdx = node.attributes.logic.inputs.findIndex((input, i) => input.name === l.replace('@', '')); // fetches the corresponding element from input definition
inputRef = node.attributes.logic.inputs[inputRefIdx];
}
if (!inputRef) {
throw Error('Malformed Graph! Logic node has incorrect input reference');
}
const connectionToInputRef = connectionsToLeft.find((c) => c?.attributes?.targetHandleData.attributeName === inputRef.name);
if (!connectionToInputRef) {
// Not connected, search for set or default value
let val = node.attributes.inputs?.[inputRef.name] || inputRef.default;
if (inputRef.type === 'string') {
if (val) {
val = `\"${val}\"`;
} else {
val = `\".*\"`; // Empty means allow anything
}
}
return val;
} else if (connectionToInputRef.attributes?.sourceHandleData.nodeType === QueryElementTypes.Logic) {
// Is connected to another logic node
const leftLogic = logics.find((r) => r.key === connectionToInputRef.attributes?.sourceHandleData.nodeId);
if (!leftLogic) throw Error('Malformed Graph! Logic node is connected but has no logic data');
return calculateQueryLogic(leftLogic, graph, logics);
} else {
if (!connectionToInputRef.attributes?.sourceHandleData)
throw Error('Malformed Graph! Logic node is connected but has no sourceHandleData');
// Is connected to entity or relation node
if (connectionToInputRef.attributes.sourceHandleData.attributeName === '(# Connection)') {
return ['Count', `@${connectionToInputRef.attributes.sourceHandleData.nodeId}`];
}
return `@${connectionToInputRef.attributes.sourceHandleData.nodeId}.${connectionToInputRef.attributes.sourceHandleData.attributeName}`;
}
} else {
return l;
}
});
return ret as AllLogicStatement;
}
function queryLogicUnion(
nodes: SerializedNode<LogicNodeAttributes>[],
graph: QueryMultiGraph,
logics: SerializedNode<LogicNodeAttributes>[],
unionTypes: { [node_id: string]: QueryUnionType },
): AllLogicStatement | undefined {
let graphLogicChunks = nodes.map((node) => calculateQueryLogic(node, graph, logics));
if (graphLogicChunks.length === 0) return undefined;
if (graphLogicChunks.length === 1) return graphLogicChunks[0];
const constraintNodeId = nodes[0].key;
const entityNodeId = graph.edges.filter((x) => x.target == constraintNodeId)[0].source;
const unionType = unionTypes[entityNodeId] || QueryUnionType.AND;
return [unionType, graphLogicChunks[0], queryLogicUnion(nodes.slice(1), graph, logics, unionTypes) || '0'];
}
/**
* Converts the ReactFlow query to a json data structure to send the query to the backend.
* @returns {BackendQueryFormat} A JSON object in the `JSONFormat`.
*/
export function Query2BackendQuery(
saveStateID: string,
graph: QueryMultiGraph,
settings: QueryBuilderSettings,
ml: MachineLearning[]
): BackendQueryFormat {
let query: BackendQueryFormat = {
saveStateID: saveStateID,
query: [],
machineLearning: ml,
limit: settings.limit,
return: ['*'], // TODO
cached: false,
};
let entities = graph.nodes.filter((n) => n?.attributes?.type === QueryElementTypes.Entity) as SerializedNode<EntityNodeAttributes>[];
let relations = graph.nodes.filter((n) => n?.attributes?.type === QueryElementTypes.Relation) as SerializedNode<RelationNodeAttributes>[];
const graphologyQuery = Graph.from(graph);
graphologyQuery
.filterNodes((n, att) => att?.type == QueryElementTypes.Logic)
.forEach((n) => {
graphologyQuery.dropNode(n);
}); // Remove all logic nodes from the graph for cycle test
if (hasCycle(graphologyQuery)) {
const cycles = entities.map((entity, i) => {
return allSimplePaths(graphologyQuery, entity.key, entity.key);
});
for (let i = 0; i < cycles.length; i++) {
for (let j = 0; j < cycles[i].length; j++) {
const cycle = cycles[i][j];
const origin = cycle[0];
const target = cycle[cycle.length - 2];
const edges = graphologyQuery.edges(target, origin);
if (edges.length > 0) {
const edge = edges[edges.length - 1];
const newOrigin = graphologyQuery.addNode(origin + 'cycle' + edge, graphologyQuery.getNodeAttributes(origin));
const edgeAttributes = graphologyQuery.getEdgeAttributes(edge);
graphologyQuery.dropEdge(edge);
graphologyQuery.addEdge(target, newOrigin, edgeAttributes);
}
break; // only do one cycle
}
break; // only do one cycle
}
return Query2BackendQuery(saveStateID, graphologyQuery.export(), settings, ml);
}
// Chunk extraction: traverse graph to find all paths of logic between relations and entities
let graphSequenceChunks: QueryGraphNodes[][] = [];
let graphSequenceLogicChunks: QueryGraphNodes[][] = [];
let graphSequenceChunksIdMap: Record<string, [number, number]> = {};
let chunkOffset = 0;
let entitiesEmptyLeftHandle = entities.filter((n) => !graph.edges.some((e) => e.target === n.key));
let relationsEmptyLeftHandle = relations.filter((n) => !graph.edges.some((e) => e.target === n.key));
// let entitiesEmptyRightHandle = entities.filter((n) => !n?.attributes?.rightRelationHandleId);
entitiesEmptyLeftHandle.map((entity, i) => {
// start with all entities that have no left handle, which means it "starts" a logic
chunkOffset += traverseEntityRelationPaths(entity, graphSequenceChunks, i + chunkOffset, graph, entities, relations, settings);
});
if (entitiesEmptyLeftHandle.length > 0) chunkOffset += entitiesEmptyLeftHandle.length;
relationsEmptyLeftHandle.map((relation, i) => {
// then, for all relations that have no left handle, since they weren't accounted by the loop above
chunkOffset += traverseEntityRelationPaths(relation, graphSequenceChunks, i + chunkOffset, graph, entities, relations, settings);
});
graphSequenceChunks.forEach((chunkSequence, i) => {
chunkSequence.forEach((chunk, j) => {
graphSequenceChunksIdMap[chunk.id || chunk.name || ''] = [i, j];
});
});
// Logic pathways extraction: now traverse the graph again though the logic components to construct the logic chunks
// The traversal is done in reverse order, starting from the logic pill's right handle connected to an entity or relation, and going backwards
let logics = graph.nodes.filter((n) => n?.attributes?.type === QueryElementTypes.Logic) as SerializedNode<LogicNodeAttributes>[];
let logicsRightHandleConnectedOutside = logics.filter((n) => {
return graph.edges.some((e) => e.source === n.key && e.attributes?.targetHandleData.nodeType === QueryElementTypes.Entity);
});
let logicsRightHandleFinal = logics.filter((n) => {
return !graph.edges.some((e) => e.source === n.key);
});
query.logic = queryLogicUnion(logicsRightHandleFinal, graph, logics, settings.unionTypes);
if (!graphSequenceChunks || graphSequenceChunks.length === 0 || graphSequenceChunks?.[0].length === 0) return query;
const processConnection = (chunk: QueryGraphNodes[], position: number): RelationStruct | NodeStruct => {
const currNode = chunk[position];
if (currNode.type === QueryElementTypes.Relation) {
const _currNode = currNode as RelationNodeAttributes;
const ret: RelationStruct = {
id: _currNode.id,
label: _currNode.name || undefined,
depth: _currNode.depth,
direction: !_currNode.direction || _currNode.direction === 'right' ? 'TO' : _currNode.direction === 'left' ? 'FROM' : 'BOTH',
node: chunk.length === position + 1 ? undefined : (processConnection(chunk, position + 1) as NodeStruct),
};
return ret;
} else if (currNode.type === QueryElementTypes.Entity) {
const ret: NodeStruct = {
id: currNode?.id,
label: currNode?.name,
// logic: LogicStruct[];
// subQuery?: QueryStruct;
// export: ExportNodeStruct[];
relation: chunk.length === position + 1 ? undefined : (processConnection(chunk, position + 1) as RelationStruct),
};
return ret;
}
throw Error('Malformed Chunks! ' + chunk + position);
};
query.query = graphSequenceChunks.map((chunk, i) => {
const ret: QueryStruct = {
id: 'path_' + i, //TODO: chunk[0].id ||
node: processConnection(chunk, 0),
};
return ret;
});
console.debug('%cNew processed query', 'color: aquamarine', graph, query);
return query;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment