Newer
Older
/*
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
*/
"query-service/internal/entity"
"git.science.uu.nl/graphpolaris/broker"
"git.science.uu.nl/graphpolaris/query-conversion"
"github.com/rs/zerolog/log"
HandleMessage is the tracking wrapper for handleMessageHelper
func (s *Service) HandleMessage(msg *broker.Message) {
// Grab the headers from the message
sessionID, userID, queryID, err := getHeaders(&msg.Headers)
log.Error().Err(err).Msg("missing headers in message")
// Defer a panic intercepting function
defer func(sessionID, queryID string) {
if err := recover(); err != nil {
s.publishStatus("Critical error", sessionID, queryID)
}
}(sessionID, queryID)
s.publishStatus("Received", sessionID, queryID)
// var tracker trackerproducer.UseCase
// tracker = trackerproducer.NewService(s.brokerDriver, "query-handling", sessionID, userID, queryID)
// println("start tracking in QS")
// tracker.StartTracking()
s.handleMessageHelper(msg, sessionID, userID, queryID)
// println("stop tracking in handlemessage.go")
// tracker.StopTracking()
}
// handleMessageHelper is the actual implementation, which is nested inside the tracker wrapper in HandleMessage
func (s *Service) handleMessageHelper(msg *broker.Message, sessionID, clientID, queryID string) {
// Unmarshall the incoming message into an IncomingJSONQuery object
JSONQuery, err := query.UnmarshalJSON(&msg.Body)
s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad Request: failed to unmarshall the incoming JSON query")
// Check if there was a database name
if JSONQuery.DatabaseName == "" {
// If not, there is no way to proceed past this point
s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad request: no database name")
s.publishStatus("Translating", sessionID, queryID)
// Convert the json byte msg to a query string
query, _, err := s.queryConverter.ConvertQuery(JSONQuery)
Leonardo Christino
committed
log.Error().AnErr("Err", err).Msg("Error translating query json")
s.publishError(sessionID, queryID, entity.TranslationError, "Translation error")
// Send the resulting query back to the client
translationMsg := entity.MessageStruct{
QueryID: queryID,
Type: "query_translation_result",
Value: *query,
translationMsgBytes, _ := json.Marshal(translationMsg)
s.producer.PublishMessage(&translationMsgBytes, sessionID)
s.publishStatus("Getting database credentials", sessionID, queryID)
// Fetch database credentials from the user service
databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(clientID, JSONQuery.DatabaseName)
s.publishError(sessionID, queryID, entity.DatabaseCredentialsError, "Bad credentials")
s.publishStatus("Processing on database", sessionID, queryID)
result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName)
log.Error().AnErr("Err", err).Str("sessionID", sessionID).Str("queryID", queryID).Str("query", *query).Msg("Error querying the database")
s.publishError(sessionID, queryID, entity.DatabaseExecutionError, "Database error")
s.publishStatus("Completed", sessionID, queryID)
// Add the MachineLearning params and information to the result if it exists
var resultMsg entity.MessageStruct
if len(JSONQuery.MachineLearning) == 0 {
resultMsg = entity.MessageStruct{
QueryID: queryID,
Type: "query_result",
Value: res,
}
} else if len(JSONQuery.MachineLearning) == 1 {
resultMsg = entity.MessageStruct{
QueryID: queryID,
Type: "query_result",
Value: res,
MLAttributes: JSONQuery.MachineLearning[0].Parameters,
}
}
resultMsgBytes, err := json.Marshal(resultMsg)
log.Error().Err(err).Msg("error during result marshalling")
// If MachineLearning object is of length 0 we do regular query processing
// If MachineLearning object is of length 1 we just take the first ML item queuename
// If there are more than items in the list (current implementation does not support this but future might) we should do something different
if len(JSONQuery.MachineLearning) == 0 {
s.producer.PublishMessage(&resultMsgBytes, sessionID)
} else if len(JSONQuery.MachineLearning) == 1 {
s.producer.PublishMLMessage(&resultMsgBytes, sessionID, JSONQuery.MachineLearning[0].Queuename)
if queryID != "" {
cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20)
defer cancelCacheQueryResult()
s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", clientID, queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes))