Skip to content
Snippets Groups Projects
Commit 1762b548 authored by Sjoerd Vink's avatar Sjoerd Vink
Browse files

feat(reportOnNewData): initial setup report on new data

parent 9b3a71e3
No related branches found
No related tags found
No related merge requests found
Pipeline #143110 failed
import { RabbitMqBroker, RedisConnector } from "ts-common";
import { REDIS_PASSWORD, REDIS_HOST, REDIS_PORT, RABBIT_USER, RABBIT_PASSWORD, RABBIT_PORT, RABBIT_HOST } from "./variables";
import { log } from "./logger";
import { queryServiceReader, queryServiceReaderStatsCheck } from "./readers/queryService";
import { queryServiceReader, queryServiceReaderStatsCheck, queryServiceReaderDiffCheck } from "./readers/queryService";
async function main() {
log.info("Starting query-service...");
......@@ -39,6 +39,7 @@ async function testStatsCheck() {
log.info("Starting query-service-statsCheck ...");
await queryServiceReaderStatsCheck("neo4j");
await queryServiceReaderDiffCheck("neo4j");
}
main();
......
import {
getUserSaveState,
//getUserSaveStateInsight,
getUserSaveStateInsight,
type BackendQueryFormat,
type QueryRequest,
type GraphQueryResultFromBackend,
type GraphQueryResultMetaFromBackend,
mlDefaultState,
updateUserSaveStateInsight,
} from "ts-common";
import { Neo4jConnection, type DbConnection, RabbitMqBroker, RedisConnector } from "ts-common";
import {
......@@ -257,3 +258,112 @@ export const queryServiceReaderStatsCheck = async (type: QueryExecutionTypes) =>
}
});
};
export const queryServiceReaderDiffCheck = async (type: QueryExecutionTypes) => {
if (type == null) {
log.error("Unsupported query execution type:", type);
throw new Error("Unsupported query execution type");
}
// TODO: change requests-exchange -> requests-exchange-stats
const queryServiceConsumer = new RabbitMqBroker("requests-exchange", `${type}-query-queue`, `${type}-query-request`);
await queryServiceConsumer.connect({
protocol: "amqp",
hostname: RABBIT_HOST,
port: RABBIT_PORT,
username: RABBIT_USER,
password: RABBIT_PASSWORD,
});
log.info("Connected to RabbitMQ ST!");
// query-service to query-service-insight-reader
await queryServiceConsumer.startConsuming<QueryRequest>("query-service-insight-reader", async (message, headers) => {
const startTime = Date.now();
const ss = await getUserSaveState(
headers.message.sessionData.userID,
headers.message.sessionData.saveStateID,
USER_MANAGEMENT_SERVICE_API
);
// const ssInsight = await getUserSaveStateInsight(
// headers.message.sessionData.userID,
// headers.message.sessionData.saveStateID,
// USER_MANAGEMENT_SERVICE_API
// );
const previousQueryResult = {
nodes: [],
edges: [],
};
log.debug("Received query request:", ss);
if (!headers.queryID) {
log.error("QueryID not set in headers:", headers);
return;
}
log.debug("Received check stat request:", message, headers);
if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) {
log.error("Invalid SaveState received in queryServiceConsumer:", ss);
return;
}
const visualQuery = ss.queries[0].graph;
const queryBuilderSettings = ss.queries[0].settings;
const ml = message.ml;
const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml);
log.debug("translating query:", convertedQuery);
const query = query2Cypher(convertedQuery);
if (query == null) {
log.error("Error translating query:", convertedQuery);
return;
}
for (let i = 0; i < ss.dbConnections.length; i++) {
queryService(ss.dbConnections[i], query)
.then((result) => {
log.debug("Query result!");
log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`);
const queryResult = {
nodes: result.nodes.map((node) => node._id),
edges: result.edges.map((edge) => edge._id),
};
// Compare query results
const nodesDifferent = queryResult.nodes.some((node, index) => node !== previousQueryResult.nodes[index]);
const edgesDifferent = queryResult.edges.some((edge, index) => edge !== previousQueryResult.edges[index]);
if (
queryResult.nodes.length !== previousQueryResult.nodes.length ||
queryResult.edges.length !== previousQueryResult.edges.length ||
nodesDifferent ||
edgesDifferent
) {
log.info("Different results, use Dennis code...");
} else {
log.info("No difference in result sets");
}
// Save query result for future reference
updateUserSaveStateInsight(
headers.message.sessionData.userID,
headers.message.sessionData.saveStateID,
USER_MANAGEMENT_SERVICE_API,
queryResult
);
log.info("Updated node and edge ids in SaveState");
})
.catch((error) => {
log.error("Error querying database", error);
});
}
});
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment