diff --git a/internal/usecases/consume/handlemessage.go b/internal/usecases/consume/handlemessage.go index da2e1fbad0ec5f16c4a2d245018c51cf1fef6c8e..8752c3843c0b8544d74efe15d5d09a00649ad743 100644 --- a/internal/usecases/consume/handlemessage.go +++ b/internal/usecases/consume/handlemessage.go @@ -23,6 +23,8 @@ func (s *Service) HandleMessage(msg *broker.Message) { return } + s.sendStatus("Received", &sessionID) + // Unmarshall the incoming message into an IncomingJSONQuery object JSONQuery, err := query.UnmarshalJSON(&msg.Body) if err != nil { @@ -46,6 +48,8 @@ func (s *Service) HandleMessage(msg *broker.Message) { return } + s.sendStatus("Translating", &sessionID) + // Convert the json byte msg to a query string query, err := s.queryConverter.ConvertQuery(JSONQuery) if err != nil { @@ -75,6 +79,8 @@ func (s *Service) HandleMessage(msg *broker.Message) { } s.producer.PublishMessage(&translationMsgBytes, &sessionID) // TODO: should this be a go routine? + s.sendStatus("Getting database credentials", &sessionID) + // Fetch database credentials from the user service databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(&clientID, &JSONQuery.DatabaseName) if err != nil { @@ -87,6 +93,8 @@ func (s *Service) HandleMessage(msg *broker.Message) { return } + s.sendStatus("Processing on database", &sessionID) + // execute and retrieve result // convert result to general (node-link) format result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName) @@ -100,6 +108,8 @@ func (s *Service) HandleMessage(msg *broker.Message) { return } + s.sendStatus("Done", &sessionID) + // Add type indicator to result from database // TODO: Change key 'values' to key 'value' var res interface{} @@ -116,3 +126,18 @@ func (s *Service) HandleMessage(msg *broker.Message) { s.producer.PublishMessage(&resultMsgBytes, &sessionID) } + +func (s *Service) sendStatus(status string, sessionID *string) { + msg := entity.MessageStruct{ + Type: "query_status_update", + Value: status, + } + + msgBytes, err := json.Marshal(msg) + if err != nil { + errorhandler.FailWithError(err, "Marshalling status update message went wrong") + return + } + + s.producer.PublishMessage(&msgBytes, sessionID) +}