From 1762b548894fd06a3d34ffafee0d34fc6f793877 Mon Sep 17 00:00:00 2001 From: Sjoerd Vink <sjoerdvink@Sjoerds-MacBook-Pro.local> Date: Thu, 12 Dec 2024 15:45:30 -0500 Subject: [PATCH] feat(reportOnNewData): initial setup report on new data --- src/index.ts | 3 +- src/readers/queryService.ts | 112 +++++++++++++++++++++++++++++++++++- 2 files changed, 113 insertions(+), 2 deletions(-) diff --git a/src/index.ts b/src/index.ts index af32b26..c01f718 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,7 +1,7 @@ import { RabbitMqBroker, RedisConnector } from "ts-common"; import { REDIS_PASSWORD, REDIS_HOST, REDIS_PORT, RABBIT_USER, RABBIT_PASSWORD, RABBIT_PORT, RABBIT_HOST } from "./variables"; import { log } from "./logger"; -import { queryServiceReader, queryServiceReaderStatsCheck } from "./readers/queryService"; +import { queryServiceReader, queryServiceReaderStatsCheck, queryServiceReaderDiffCheck } from "./readers/queryService"; async function main() { log.info("Starting query-service..."); @@ -39,6 +39,7 @@ async function testStatsCheck() { log.info("Starting query-service-statsCheck ..."); await queryServiceReaderStatsCheck("neo4j"); + await queryServiceReaderDiffCheck("neo4j"); } main(); diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index 0501943..be55e4c 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -1,11 +1,12 @@ import { getUserSaveState, - //getUserSaveStateInsight, + getUserSaveStateInsight, type BackendQueryFormat, type QueryRequest, type GraphQueryResultFromBackend, type GraphQueryResultMetaFromBackend, mlDefaultState, + updateUserSaveStateInsight, } from "ts-common"; import { Neo4jConnection, type DbConnection, RabbitMqBroker, RedisConnector } from "ts-common"; import { @@ -257,3 +258,112 @@ export const queryServiceReaderStatsCheck = async (type: QueryExecutionTypes) => } }); }; + +export const queryServiceReaderDiffCheck = async (type: QueryExecutionTypes) => { + if (type == null) { + log.error("Unsupported query execution type:", type); + throw new Error("Unsupported query execution type"); + } + + // TODO: change requests-exchange -> requests-exchange-stats + const queryServiceConsumer = new RabbitMqBroker("requests-exchange", `${type}-query-queue`, `${type}-query-request`); + + await queryServiceConsumer.connect({ + protocol: "amqp", + hostname: RABBIT_HOST, + port: RABBIT_PORT, + username: RABBIT_USER, + password: RABBIT_PASSWORD, + }); + + log.info("Connected to RabbitMQ ST!"); + + // query-service to query-service-insight-reader + await queryServiceConsumer.startConsuming<QueryRequest>("query-service-insight-reader", async (message, headers) => { + const startTime = Date.now(); + const ss = await getUserSaveState( + headers.message.sessionData.userID, + headers.message.sessionData.saveStateID, + USER_MANAGEMENT_SERVICE_API + ); + + // const ssInsight = await getUserSaveStateInsight( + // headers.message.sessionData.userID, + // headers.message.sessionData.saveStateID, + // USER_MANAGEMENT_SERVICE_API + // ); + + const previousQueryResult = { + nodes: [], + edges: [], + }; + + log.debug("Received query request:", ss); + + if (!headers.queryID) { + log.error("QueryID not set in headers:", headers); + return; + } + + log.debug("Received check stat request:", message, headers); + + if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { + log.error("Invalid SaveState received in queryServiceConsumer:", ss); + return; + } + + const visualQuery = ss.queries[0].graph; + const queryBuilderSettings = ss.queries[0].settings; + const ml = message.ml; + const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); + + log.debug("translating query:", convertedQuery); + + const query = query2Cypher(convertedQuery); + if (query == null) { + log.error("Error translating query:", convertedQuery); + return; + } + + for (let i = 0; i < ss.dbConnections.length; i++) { + queryService(ss.dbConnections[i], query) + .then((result) => { + log.debug("Query result!"); + log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); + + const queryResult = { + nodes: result.nodes.map((node) => node._id), + edges: result.edges.map((edge) => edge._id), + }; + + // Compare query results + const nodesDifferent = queryResult.nodes.some((node, index) => node !== previousQueryResult.nodes[index]); + const edgesDifferent = queryResult.edges.some((edge, index) => edge !== previousQueryResult.edges[index]); + + if ( + queryResult.nodes.length !== previousQueryResult.nodes.length || + queryResult.edges.length !== previousQueryResult.edges.length || + nodesDifferent || + edgesDifferent + ) { + log.info("Different results, use Dennis code..."); + } else { + log.info("No difference in result sets"); + } + + // Save query result for future reference + updateUserSaveStateInsight( + headers.message.sessionData.userID, + headers.message.sessionData.saveStateID, + USER_MANAGEMENT_SERVICE_API, + queryResult + ); + + log.info("Updated node and edge ids in SaveState"); + }) + .catch((error) => { + log.error("Error querying database", error); + }); + } + }); +}; -- GitLab