diff --git a/src/index.ts b/src/index.ts index 40397daf2ba90fcd5be8741fa7bde296326d4398..a9e94039550d733f5e56206700888d329d294818 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import { RabbitMqBroker } from 'ts-common/rabbitMq'; import { rabbitMq, redis } from './variables'; import { log } from './logger'; -import { queryServiceReader } from './readers/queryService'; +import { queryServiceReader, queryServiceReaderTest } from './readers/queryService'; import { insightProcessor } from './readers/insightProcessor'; async function main() { @@ -23,9 +23,12 @@ async function main() { await redis.connect(); log.info('Connected to Redis!'); - queryServiceReader(frontendPublisher, mlPublisher, 'neo4j'); + //queryServiceReader(frontendPublisher, mlPublisher, 'neo4j'); insightProcessor(); + //queryServiceReader(frontendPublisher, mlPublisher, 'neo4j'); + // TODO: other query services for other databases + //queryServiceReaderTest(frontendPublisher, mlPublisher, 'neo4j'); log.info('Connected to RabbitMQ!'); } diff --git a/src/readers/appearanceCheck.ts b/src/readers/appearanceCheck.ts index 9a280b47fa20045eb0be062da2dc6ac8f8a2e71b..69a7e9cf7f48655885d9e85fe6a3362b8111ea6b 100644 --- a/src/readers/appearanceCheck.ts +++ b/src/readers/appearanceCheck.ts @@ -1,54 +1,49 @@ import type { InsightModel } from 'ts-common'; import type { GraphQueryResultMetaFromBackend } from 'ts-common/src/model/webSocket/graphResult'; import type { SaveState } from 'ts-common'; +import { log } from '../logger'; -export type AppearanceMap = { - [key: string]: { count: number; queries: string[] }; +export type AppearanceEntry = { + nodeID: string; + count: number; + queries: string[]; }; +export type AppearanceMap = AppearanceEntry[]; + export function appearanceCheck( result: GraphQueryResultMetaFromBackend, insight: InsightModel, ss: SaveState, queryIndex: number, - appeareanceResult: { - nodeAppearance: AppearanceMap; - edgeAppearances: AppearanceMap; - }, -) { + nodeAppearance: AppearanceMap, +): AppearanceMap { const nameQuery = ss.queryStates.openQueryArray[queryIndex].name; - const queryNodesID = result.nodes + const queryNodes = result.nodes .filter(node => !insight.entitiesAppearances?.nodeLabel || node.label === insight.entitiesAppearances.nodeLabel) .map(node => node._id); - const queryEdgesID = result.edges - .filter(edge => !insight.entitiesAppearances?.edgeLabel || edge.label === insight.entitiesAppearances.edgeLabel) - .map(edge => edge._id); - - trackEntityAppearances( - queryNodesID, - appeareanceResult.nodeAppearance, - nameQuery ? nameQuery : ss.queryStates.openQueryArray[queryIndex].id?.toString() || 'default-id', - ); - - trackEntityAppearances( - queryEdgesID, - appeareanceResult.edgeAppearances, - nameQuery ? nameQuery : ss.queryStates.openQueryArray[queryIndex].id?.toString() || 'default-id', - ); + trackEntityAppearances(queryNodes, nodeAppearance, nameQuery || ss.queryStates.openQueryArray[queryIndex].id?.toString() || 'default-id'); + + const sortedAppearance = [...nodeAppearance].sort((a, b) => b.count - a.count); + + // TODO: filter by threshold returns empty array + //const threshold = insight.entitiesAppearances?.threshold ?? 0; + //sortedAppearance.filter(entry => entry.count >= threshold); + + return sortedAppearance; } -export const trackEntityAppearances = ( - entities: string[], - appearances: AppearanceMap, - queryId: string, // Changed to string -) => { +export function trackEntityAppearances(entities: string[], appearances: AppearanceMap, queryId: string) { for (const entity of entities) { - if (!appearances[entity]) { - appearances[entity] = { count: 0, queries: [] }; + const existing = appearances.find(entry => entry.nodeID === entity); + + if (existing) { + existing.count++; + existing.queries.push(queryId); + } else { + appearances.push({ nodeID: entity, count: 1, queries: [queryId] }); } - appearances[entity].count++; - appearances[entity].queries.push(queryId); } -}; +} diff --git a/src/readers/insightProcessor.ts b/src/readers/insightProcessor.ts index abf44129986d4308ad67d01ef36aa192d986f535..bdc1f8e739b065a4a04fc9d14120aa438c28b9da 100644 --- a/src/readers/insightProcessor.ts +++ b/src/readers/insightProcessor.ts @@ -88,13 +88,14 @@ export const insightProcessor = async () => { const queries = ss.queryStates.openQueryArray; const visualizations = ss.visualizations.openVisualizationArray; - //const nodeAppearances: AppearanceMap = {}; - const edgeAppearances: AppearanceMap = {}; + const nodeAppearances: AppearanceMap = []; + /* const nodeAppearances: AppearanceMap = { NodeA: { count: 5, queries: ['Query1', 'Query2', 'Query3'] }, NodeB: { count: 3, queries: ['Query4', 'Query5'] }, }; + */ for (const queryIndex in queries) { const visualQuery = ss.queryStates.openQueryArray[queryIndex].graph; const queryBuilderSettings = ss.queryStates.openQueryArray[queryIndex].settings; @@ -113,8 +114,8 @@ export const insightProcessor = async () => { } else if (insight.alarmMode === 'conditional' && insight.conditionsCheck && insight.conditionsCheck.length > 0) { insight = statCheck(insight, result); } else if (insight.alarmMode === 'entitiesAppearances') { - //appearanceCheck(result, insight, ss, Number(queryIndex), { nodeAppearance: nodeAppearances, edgeAppearances: edgeAppearances }); - log.info('LogicSet resultSet:', nodeAppearances); + appearanceCheck(result, insight, ss, Number(queryIndex), nodeAppearances); + log.info('LogicSet insight resultSet:', nodeAppearances); } if (insight.userId == null) return; // fixes ts but never is the case @@ -126,13 +127,12 @@ export const insightProcessor = async () => { editor.read(async () => { const cleanUpDom = setUpDom(); - //let html = $generateHtmlFromNodes(editor); + let html = $generateHtmlFromNodes(editor); cleanUpDom(); - //html = await populateTemplate(html, result, visualizations); - - // + html = await populateTemplate(html, result, visualizations); + /* const nodeAppearancesArray = Object.entries(nodeAppearances).map(([node, { count, queries }]) => ({ Node: node, 'Appearance Count': count, @@ -142,7 +142,8 @@ export const insightProcessor = async () => { const headers = ['Node', 'Appearance Count', 'Queries']; const html = await generateTable(nodeAppearancesArray, headers); - + log.info('dsadasd:', html); + */ // for (const recipient of insight.recipients) { diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index c8a18e543ef7c877580b3e30095c1854e243548e..b44a106dae04e2a494d53662f2b6b3092569ced5 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -11,6 +11,7 @@ import type { GraphQueryResultFromBackend, GraphQueryResultMetaFromBackend } fro import { RabbitMqBroker } from 'ts-common/rabbitMq'; import { Neo4jConnection } from 'ts-common/neo4j'; import type { QueryCypher } from '../utils/cypher/converter/queryConverter'; +import { appearanceCheck, type AppearanceMap } from './appearanceCheck'; async function cacheCheck(cacheKey: string): Promise<GraphQueryResultMetaFromBackend | undefined> { if (QUERY_CACHE_DURATION === '') { @@ -209,3 +210,163 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu } }); }; + +export const queryServiceReaderTest = async (frontendPublisher: RabbitMqBroker, mlPublisher: RabbitMqBroker, type: QueryExecutionTypes) => { + if (type == null) { + log.error('Unsupported query execution type:', type); + throw new Error('Unsupported query execution type'); + } + log.info('Starting query reader for', type); + + const publisher = new QueryPublisher(frontendPublisher, mlPublisher); + + const queryServiceConsumer = await new RabbitMqBroker( + rabbitMq, + 'requests-exchange', + `${type}-query-queue`, + `${type}-query-request`, + ).connect(); + + log.info('Connected to RabbitMQ ST!'); + + await queryServiceConsumer.startConsuming<QueryRequest>('query-service', async (message, headers) => { + const startTime = Date.now(); + const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); + const insight = { + id: 1, + entitiesAppearances: { + nodeLabel: 'Movie', + threshold: 2, + }, + description: 'Test Insight', + }; + + if (!ss) { + log.error('Invalid SaveState received in queryServiceConsumer:', ss); + publisher.publishErrorToFrontend('Invalid SaveState'); + return; + } + + log.debug('Received query request:', message, headers, ss); + log.debug('Received routing key:', headers.routingKey); + + if (!headers.callID) { + log.error('QueryID not set in headers:', headers); + return; + } + + publisher.withHeaders(headers).withRoutingKey(headers.routingKey).withQueryID(headers.callID); + publisher.publishStatusToFrontend('Received'); + + if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { + log.error('Invalid SaveState received in queryServiceConsumer:', ss); + publisher.publishErrorToFrontend('Invalid SaveState'); + return; + } + + let activeQuery = ss.queryStates.activeQueryId; + if (message.queryID) { + if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) { + log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray); + publisher.publishErrorToFrontend('Query not found'); + return; + } + activeQuery = message.queryID; + } + + if (activeQuery == null || activeQuery == -1) { + log.error('No active query found in SaveState:', ss); + publisher.publishErrorToFrontend('No active query found'); + return; + } + + const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery); + if (activeQueryInfo == null) { + log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray); + publisher.publishErrorToFrontend('Active query not found'); + return; + } + + const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph; + log.debug('Received query request:', message, headers, visualQuery); + if (visualQuery.nodes.length === 0) { + log.info('Empty query received'); + publisher.publishResultToFrontend({ + nodes: [], + edges: [], + metaData: { + topological: { density: 0, self_loops: 0 }, + nodes: { count: 0, labels: [], types: {} }, + edges: { count: 0, labels: [], types: {} }, + }, + }); + return; + } + + const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings; + const ml = message.ml; + const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); + + log.debug('translating query:', convertedQuery); + publisher.publishStatusToFrontend('Translating'); + + const query = query2Cypher(convertedQuery); + if (query == null) { + log.error('Error translating query:', convertedQuery); + publisher.publishErrorToFrontend('Error translating query'); + return; + } + + log.debug('Translated query FROM:', convertedQuery); + log.info('Translated query:', query); + publisher.publishTranslationResultToFrontend(query); + + for (let i = 0; i < ss.dbConnections.length; i++) { + let nodeAppearances: AppearanceMap = []; + + // + log.info('------openquereeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeey---------:', ss.queryStates.openQueryArray); + + for (let j = 0; j < ss.queryStates.openQueryArray.length; j++) { + const visualQueryTest = ss.queryStates.openQueryArray[j].graph; + const convertedQueryTest = Query2BackendQuery(ss.id, visualQueryTest, queryBuilderSettings, ml); + const nameQuery = ss.queryStates.openQueryArray[j].name; + + const queryTest = query2Cypher(convertedQueryTest); + + if (queryTest == null) { + log.error('Error translating query:', convertedQueryTest); + } + + queryService(ss.dbConnections[i], queryTest) + .then(result => { + publisher.publishResultToFrontend(result); + + // + nodeAppearances = appearanceCheck(result, insight, ss, Number(j), nodeAppearances); + + log.info('LogicSet resultSet:', nodeAppearances); + + // + + if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { + for (let i = 0; i < convertedQuery.machineLearning.length; i++) { + try { + //publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); + //log.debug('Published machine learning request', convertedQuery.machineLearning[i]); + } catch (error) { + //log.error('Error publishing machine learning request', error); + //publisher.publishErrorToFrontend('Error publishing machine learning request'); + } + } + } + }) + .catch(error => { + log.error('Error querying database', error); + publisher.publishErrorToFrontend('Error querying database'); + }); + } + // + } + }); +}; diff --git a/src/utils/insights.ts b/src/utils/insights.ts index bb43b6d3eefadccd58420444a858556f9d5be114..ff5abeca5931842d0bf1e8142d2b890e7ec6480d 100644 --- a/src/utils/insights.ts +++ b/src/utils/insights.ts @@ -151,6 +151,29 @@ export async function generateTable(data: any[], headers: string[]): Promise<str return tableHTML; } +export async function generateList(data: any[], headers: string[]): Promise<string> { + if (!data || data.length === 0) { + return '<p>No data available</p>'; + } + + let listHTML = `<ul style="margin-top: 20px;">`; + + data.forEach(row => { + listHTML += `<li><strong>${headers[0]}:</strong> ${row[headers[0]]}`; + for (let i = 1; i < headers.length; i++) { + let value = row[headers[i]]; + if (Array.isArray(value)) { + value = value.join(', '); // Convert array to string + } + listHTML += `, <strong>${headers[i]}:</strong> ${value || ''}`; + } + listHTML += `</li>`; + }); + + listHTML += `</ul>`; + return listHTML; +} + const svgToBase64 = (svgString: string) => { return new Promise((resolve, reject) => { svg2img(svgString, (error: any, buffer: Buffer) => {