From 4906aa55986831619575946f1a18e489edd4cc5a Mon Sep 17 00:00:00 2001 From: Dennis Collaris <d.collaris@me.com> Date: Thu, 13 Mar 2025 11:24:53 +0100 Subject: [PATCH] fix: force garbage collection --- src/readers/queryService.ts | 198 +++++++++++++++++++----------------- 1 file changed, 104 insertions(+), 94 deletions(-) diff --git a/src/readers/queryService.ts b/src/readers/queryService.ts index e972fab..4609e93 100644 --- a/src/readers/queryService.ts +++ b/src/readers/queryService.ts @@ -53,6 +53,10 @@ export const queryService = async (db: DbConnection, cypher: QueryCypher, useCac const result = graphQueryBackend2graphQuery(graph, countGraph); result.nodeCounts.updatedAt = Date.now(); + // Force garbage collection + neo4jResult.records = []; + Bun.gc(true); + // cache result if (QUERY_CACHE_DURATION !== '') { @@ -100,118 +104,124 @@ export const queryServiceReader = async (frontendPublisher: RabbitMqBroker, mlPu const publisher = new QueryPublisher(frontendPublisher, mlPublisher, headers, message.queryID); - const startTime = Date.now(); - const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); - - if (!ss) { - log.error('Invalid SaveState received in queryServiceConsumer:', ss); - publisher.publishErrorToFrontend('Invalid SaveState'); - return; - } + try { + const startTime = Date.now(); + const ss = await ums.getUserSaveState(headers.message.sessionData.userID, message.saveStateID); - log.debug('Received query request:', message, headers, ss); - log.debug('Received routing key:', headers.routingKey); + if (!ss) { + log.error('Invalid SaveState received in queryServiceConsumer:', ss); + publisher.publishErrorToFrontend('Invalid SaveState'); + return; + } - publisher.publishStatusToFrontend('Received'); + log.debug('Received query request:', message, headers, ss); + log.debug('Received routing key:', headers.routingKey); - if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { - log.error('Invalid SaveState received in queryServiceConsumer:', ss); - publisher.publishErrorToFrontend('Invalid SaveState'); - return; - } + publisher.publishStatusToFrontend('Received'); - let activeQuery = ss.queryStates.activeQueryId; - if (message.queryID) { - if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) { - log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray); - publisher.publishErrorToFrontend('Query not found'); + if (ss == null || ss.dbConnections == null || ss.dbConnections[0] == null || ss.dbConnections.length === 0) { + log.error('Invalid SaveState received in queryServiceConsumer:', ss); + publisher.publishErrorToFrontend('Invalid SaveState'); return; } - activeQuery = message.queryID; - } - if (activeQuery == null || activeQuery == -1) { - log.error('No active query found in SaveState:', ss); - publisher.publishErrorToFrontend('No active query found'); - return; - } + let activeQuery = ss.queryStates.activeQueryId; + if (message.queryID) { + if (ss.queryStates.openQueryArray.find(q => q.id === message.queryID) == null) { + log.error('Query not found in SaveState:', message.queryID, ss.queryStates.openQueryArray); + publisher.publishErrorToFrontend('Query not found'); + return; + } + activeQuery = message.queryID; + } - const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery); - if (activeQueryInfo == null) { - log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray); - publisher.publishErrorToFrontend('Active query not found'); - return; - } + if (activeQuery == null || activeQuery == -1) { + log.error('No active query found in SaveState:', ss); + publisher.publishErrorToFrontend('No active query found'); + return; + } - const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph; - log.debug('Received query request:', message, headers, visualQuery); - if (visualQuery.nodes.length === 0) { - log.info('Empty query received'); - publisher.publishResultToFrontend({ - nodes: [], - edges: [], - nodeCounts: { updatedAt: 0 }, - metaData: { - topological: { density: 0, self_loops: 0 }, - nodes: { count: 0, labels: [], types: {} }, - edges: { count: 0, labels: [], types: {} }, - }, - }); - return; - } + const activeQueryInfo = ss.queryStates.openQueryArray.find(q => q.id === activeQuery); + if (activeQueryInfo == null) { + log.error('Active query not found in SaveState:', ss.queryStates.activeQueryId, ss.queryStates.openQueryArray); + publisher.publishErrorToFrontend('Active query not found'); + return; + } - const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings; - const ml = message.ml; - const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); + const visualQuery = activeQueryInfo.graph; //ss.queries[0].graph; + log.debug('Received query request:', message, headers, visualQuery); + if (visualQuery.nodes.length === 0) { + log.info('Empty query received'); + publisher.publishResultToFrontend({ + nodes: [], + edges: [], + nodeCounts: { updatedAt: 0 }, + metaData: { + topological: { density: 0, self_loops: 0 }, + nodes: { count: 0, labels: [], types: {} }, + edges: { count: 0, labels: [], types: {} }, + }, + }); + return; + } - log.debug('translating query:', convertedQuery); - publisher.publishStatusToFrontend('Translating'); + const queryBuilderSettings = activeQueryInfo.settings; //ss.queries[0].settings; + const ml = message.ml; + const convertedQuery = Query2BackendQuery(ss.id, visualQuery, queryBuilderSettings, ml); - const cypher = query2Cypher(convertedQuery); - const query = cypher.query; - if (query == null) { - log.error('Error translating query:', convertedQuery); - publisher.publishErrorToFrontend('Error translating query'); - return; - } + log.debug('translating query:', convertedQuery); + publisher.publishStatusToFrontend('Translating'); - log.debug('Translated query FROM:', convertedQuery); - log.info('Translated query:', query); - log.info('Translated query:', cypher.countQuery); - publisher.publishTranslationResultToFrontend(query); - - for (let i = 0; i < ss.dbConnections.length; i++) { - try { - const result = await queryService(ss.dbConnections[i], cypher, message.useCached); - - // Cache nodeCounts such that we can display differentiation for each query - await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, { - ...activeQueryInfo, - graph: { - ...activeQueryInfo.graph, - nodeCounts: result.nodeCounts, - }, - }); + const cypher = query2Cypher(convertedQuery); + const query = cypher.query; + if (query == null) { + log.error('Error translating query:', convertedQuery); + publisher.publishErrorToFrontend('Error translating query'); + return; + } - publisher.publishResultToFrontend(result); - log.debug('Query result!'); - log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); - - if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { - for (let i = 0; i < convertedQuery.machineLearning.length; i++) { - try { - publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); - log.debug('Published machine learning request', convertedQuery.machineLearning[i]); - } catch (error) { - log.error('Error publishing machine learning request', error); - publisher.publishErrorToFrontend('Error publishing machine learning request'); + log.debug('Translated query FROM:', convertedQuery); + log.info('Translated query:', query); + log.info('Translated query:', cypher.countQuery); + publisher.publishTranslationResultToFrontend(query); + + for (let i = 0; i < ss.dbConnections.length; i++) { + try { + const result = await queryService(ss.dbConnections[i], cypher, message.useCached); + + // Cache nodeCounts such that we can display differentiation for each query + await ums.updateQuery(headers.message.sessionData.userID, message.saveStateID, { + ...activeQueryInfo, + graph: { + ...activeQueryInfo.graph, + nodeCounts: result.nodeCounts, + }, + }); + + publisher.publishResultToFrontend(result); + log.debug('Query result!'); + log.info(`Query executed in ${formatTimeDifference(Date.now() - startTime)}`); + + if (convertedQuery.machineLearning && convertedQuery.machineLearning.length > 0) { + for (let i = 0; i < convertedQuery.machineLearning.length; i++) { + try { + publisher.publishMachineLearningRequest(result, convertedQuery.machineLearning[i], headers); + log.debug('Published machine learning request', convertedQuery.machineLearning[i]); + } catch (error) { + log.error('Error publishing machine learning request', error); + publisher.publishErrorToFrontend('Error publishing machine learning request'); + } } } + } catch (error) { + log.error('Error querying database', error); + publisher.publishErrorToFrontend('Error querying database'); } - } catch (error) { - log.error('Error querying database', error); - publisher.publishErrorToFrontend('Error querying database'); + + Bun.gc(true); } + } finally { + setTimeout(() => Bun.gc(true), 100); } }); }; -- GitLab