Skip to content
Snippets Groups Projects
handlemessage.go 5.24 KiB
Newer Older
/*
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
*/

sivan's avatar
sivan committed
package consume

import (
	"bytes"
	"context"
sivan's avatar
sivan committed
	"encoding/json"
	"fmt"
	"query-service/internal/entity"
	"query-service/internal/usecases/trackerproducer"
sivan's avatar
sivan committed
	"query-service/pkg/errorhandler"
	"runtime/debug"
	"time"
thijsheijden's avatar
thijsheijden committed
	"git.science.uu.nl/graphpolaris/broker"
	"git.science.uu.nl/graphpolaris/query-conversion"
sivan's avatar
sivan committed
)

HandleMessage is the tracking wrapper for handleMessageHelper
func (s *Service) HandleMessage(msg *broker.Message) {
	// Grab the headers from the message
	sessionID, clientID, queryID, err := getHeaders(&msg.Headers)
	if err != nil {
		log.Println("no headers in message")
		return
	}

	// Defer a panic intercepting function
	defer func(sessionID, queryID *string) {
		if err := recover(); err != nil {
			log.Printf("PANIC: %s", string(debug.Stack()))

			s.publishStatus("Critical error", sessionID, queryID)
		}
	}(sessionID, queryID)
	s.publishStatus("Received", sessionID, queryID)
	var tracker trackerproducer.UseCase
	tracker = trackerproducer.NewService(s.brokerDriver, "query-handling", *sessionID, *clientID, *queryID)
	println("start tracking in QS")
	tracker.StartTracking()
	s.handleMessageHelper(msg, sessionID, clientID, queryID)

	println("stop tracking in handlemessage.go")
	tracker.StopTracking()
}

// handleMessageHelper is the actual implementation, which is nested inside the tracker wrapper in HandleMessage
func (s *Service) handleMessageHelper(msg *broker.Message, sessionID *string, clientID *string, queryID *string) {
	// Unmarshall the incoming message into an IncomingJSONQuery object
	JSONQuery, err := query.UnmarshalJSON(&msg.Body)
		s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad Request: failed to unmarshall the incoming JSON query")
	// Check if there was a database name
	if JSONQuery.DatabaseName == "" {
		println("no database name")
		// If not, there is no way to proceed past this point
		s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad request: no database name")
sivan's avatar
sivan committed
		return
	}
	s.publishStatus("Translating", sessionID, queryID)
	// Convert the json byte msg to a query string
	query, _, err := s.queryConverter.ConvertQuery(JSONQuery)
	if err != nil {
		s.publishError(sessionID, queryID, entity.TranslationError, "Translation error")
	// Send the resulting query back to the client
	translationMsg := entity.MessageStruct{
		QueryID: *queryID,
		Type:    "query_translation_result",
		Value:   *query,
	translationMsgBytes, _ := json.Marshal(translationMsg)
	s.producer.PublishMessage(&translationMsgBytes, sessionID)

	s.publishStatus("Getting database credentials", sessionID, queryID)
	// Fetch database credentials from the user service
	databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(clientID, &JSONQuery.DatabaseName)
	if err != nil {
		s.publishError(sessionID, queryID, entity.DatabaseCredentialsError, "Bad credentials")
	s.publishStatus("Processing on database", sessionID, queryID)
sivan's avatar
sivan committed
	// execute and retrieve result
Lelieveld,J.R.J. (Joris)'s avatar
Lelieveld,J.R.J. (Joris) committed
	// convert result to general (node-link) format
	result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName)
sivan's avatar
sivan committed
	if err != nil {
		s.publishError(sessionID, queryID, entity.DatabaseExecutionError, "Database error")
sivan's avatar
sivan committed
	}

	s.publishStatus("Completed", sessionID, queryID)
sivan's avatar
sivan committed
	// Add type indicator to result from database
Bouma,C.J. (Chris)'s avatar
Bouma,C.J. (Chris) committed
	var res interface{}
	json.Unmarshal(*result, &res)
	// Add the MachineLearning params and information to the result if it exists
	var resultMsg entity.MessageStruct
	if len(JSONQuery.MachineLearning) == 0 {
		resultMsg = entity.MessageStruct{
			QueryID: *queryID,
			Type:    "query_result",
			Value:   res,
		}
	} else if len(JSONQuery.MachineLearning) == 1 {
		resultMsg = entity.MessageStruct{
			QueryID:      *queryID,
			Type:         "query_result",
			Value:        res,
			MLAttributes: JSONQuery.MachineLearning[0].Parameters,
		}
	}
	resultMsgBytes, err := json.Marshal(resultMsg)
	if err != nil {
		errorhandler.LogError(err, "Marshalling query_result went wrong!") // TODO: send error message to client instead
		return
	}
sivan's avatar
sivan committed

	// Publish the result
	// If MachineLearning object is of length 0 we do regular query processing
	// If MachineLearning object is of length 1 we just take the first ML item queuename
	// If there are more than items in the list (current implementation does not support this but future might) we should do something different
	if len(JSONQuery.MachineLearning) == 0 {
		s.producer.PublishMessage(&resultMsgBytes, sessionID)
	} else if len(JSONQuery.MachineLearning) == 1 {
		s.producer.PublishMLMessage(&resultMsgBytes, sessionID, &JSONQuery.MachineLearning[0].Queuename)
	}
	// Caching message in object store
	if *queryID != "" {
		cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20)
		defer cancelCacheQueryResult()
		s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", *clientID, *queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes))
sivan's avatar
sivan committed
}