Skip to content
Snippets Groups Projects
handlemessage.go 5.33 KiB
Newer Older
/*
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
*/

sivan's avatar
sivan committed
package consume

import (
	"bytes"
	"context"
sivan's avatar
sivan committed
	"encoding/json"
	"fmt"
	"query-service/internal/entity"
	"runtime/debug"
	"time"
thijsheijden's avatar
thijsheijden committed
	"git.science.uu.nl/graphpolaris/broker"
	"git.science.uu.nl/graphpolaris/query-conversion"
	"github.com/rs/zerolog/log"
sivan's avatar
sivan committed
)

HandleMessage is the tracking wrapper for handleMessageHelper
func (s *Service) HandleMessage(msg *broker.Message) {
	// Grab the headers from the message
	sessionID, userID, queryID, err := getHeaders(&msg.Headers)
	if err != nil {
		log.Error().Err(err).Msg("missing headers in message")
	// Defer a panic intercepting function
	defer func(sessionID, queryID string) {
		if err := recover(); err != nil {
			log.Error().Msg(string(debug.Stack()))

			s.publishStatus("Critical error", sessionID, queryID)
		}
	}(sessionID, queryID)
	s.publishStatus("Received", sessionID, queryID)
	// var tracker trackerproducer.UseCase
	// tracker = trackerproducer.NewService(s.brokerDriver, "query-handling", sessionID, userID, queryID)
	// println("start tracking in QS")
	// tracker.StartTracking()
	s.handleMessageHelper(msg, sessionID, userID, queryID)
	// println("stop tracking in handlemessage.go")
	// tracker.StopTracking()
}

// handleMessageHelper is the actual implementation, which is nested inside the tracker wrapper in HandleMessage
func (s *Service) handleMessageHelper(msg *broker.Message, sessionID, clientID, queryID string) {
	// Unmarshall the incoming message into an IncomingJSONQuery object
	JSONQuery, err := query.UnmarshalJSON(&msg.Body)
		s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad Request: failed to unmarshall the incoming JSON query")
	// Check if there was a database name
	if JSONQuery.DatabaseName == "" {
		println("no database name")
		// If not, there is no way to proceed past this point
		s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad request: no database name")
sivan's avatar
sivan committed
		return
	}
	s.publishStatus("Translating", sessionID, queryID)
	// Convert the json byte msg to a query string
	query, _, err := s.queryConverter.ConvertQuery(JSONQuery)
	if err != nil {
		log.Error().AnErr("Err", err).Msg("Error translating query json")
		s.publishError(sessionID, queryID, entity.TranslationError, "Translation error")
	// Send the resulting query back to the client
	translationMsg := entity.MessageStruct{
		Type:    "query_translation_result",
		Value:   *query,
	translationMsgBytes, _ := json.Marshal(translationMsg)
	s.producer.PublishMessage(&translationMsgBytes, sessionID)

	s.publishStatus("Getting database credentials", sessionID, queryID)
	// Fetch database credentials from the user service
	databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(clientID, JSONQuery.DatabaseName)
	if err != nil {
		s.publishError(sessionID, queryID, entity.DatabaseCredentialsError, "Bad credentials")
	s.publishStatus("Processing on database", sessionID, queryID)
sivan's avatar
sivan committed
	// execute and retrieve result
Lelieveld,J.R.J. (Joris)'s avatar
Lelieveld,J.R.J. (Joris) committed
	// convert result to general (node-link) format
	result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName)
sivan's avatar
sivan committed
	if err != nil {
		log.Error().AnErr("Err", err).Str("sessionID", sessionID).Str("queryID", queryID).Str("query", *query).Msg("Error querying the database")
		s.publishError(sessionID, queryID, entity.DatabaseExecutionError, "Database error")
sivan's avatar
sivan committed
	}

	s.publishStatus("Completed", sessionID, queryID)
sivan's avatar
sivan committed
	// Add type indicator to result from database
Bouma,C.J. (Chris)'s avatar
Bouma,C.J. (Chris) committed
	var res interface{}
	json.Unmarshal(*result, &res)
	// Add the MachineLearning params and information to the result if it exists
	var resultMsg entity.MessageStruct
	if len(JSONQuery.MachineLearning) == 0 {
		resultMsg = entity.MessageStruct{
			Type:    "query_result",
			Value:   res,
		}
	} else if len(JSONQuery.MachineLearning) == 1 {
		resultMsg = entity.MessageStruct{
			Type:         "query_result",
			Value:        res,
			MLAttributes: JSONQuery.MachineLearning[0].Parameters,
		}
	}
	resultMsgBytes, err := json.Marshal(resultMsg)
		log.Error().Err(err).Msg("error during result marshalling")
sivan's avatar
sivan committed

	// Publish the result
	// If MachineLearning object is of length 0 we do regular query processing
	// If MachineLearning object is of length 1 we just take the first ML item queuename
	// If there are more than items in the list (current implementation does not support this but future might) we should do something different
	if len(JSONQuery.MachineLearning) == 0 {
		s.producer.PublishMessage(&resultMsgBytes, sessionID)
	} else if len(JSONQuery.MachineLearning) == 1 {
		s.producer.PublishMLMessage(&resultMsgBytes, sessionID, JSONQuery.MachineLearning[0].Queuename)
	// Caching message in object store
		cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20)
		defer cancelCacheQueryResult()
		s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", clientID, queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes))
sivan's avatar
sivan committed
}