Newer
Older
/*
This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course.
© Copyright Utrecht University (Department of Information and Computing Sciences)
*/
"query-service/internal/entity"
"query-service/internal/usecases/trackerproducer"
"git.science.uu.nl/graphpolaris/broker"
"git.science.uu.nl/graphpolaris/query-conversion"
HandleMessage is the tracking wrapper for handleMessageHelper
func (s *Service) HandleMessage(msg *broker.Message) {
// Grab the headers from the message
sessionID, clientID, queryID, err := getHeaders(&msg.Headers)
if err != nil {
log.Println("no headers in message")
return
}
// Defer a panic intercepting function
defer func(sessionID, queryID *string) {
if err := recover(); err != nil {
log.Printf("PANIC: %s", string(debug.Stack()))
s.publishStatus("Critical error", sessionID, queryID)
}
}(sessionID, queryID)
s.publishStatus("Received", sessionID, queryID)
var tracker trackerproducer.UseCase
tracker = trackerproducer.NewService(s.brokerDriver, "query-handling", *sessionID, *clientID, *queryID)
println("start tracking in QS")
tracker.StartTracking()
s.handleMessageHelper(msg, sessionID, clientID, queryID)
println("stop tracking in handlemessage.go")
tracker.StopTracking()
}
// handleMessageHelper is the actual implementation, which is nested inside the tracker wrapper in HandleMessage
func (s *Service) handleMessageHelper(msg *broker.Message, sessionID *string, clientID *string, queryID *string) {
// Unmarshall the incoming message into an IncomingJSONQuery object
JSONQuery, err := query.UnmarshalJSON(&msg.Body)
s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad Request: failed to unmarshall the incoming JSON query")
// Check if there was a database name
if JSONQuery.DatabaseName == "" {
// If not, there is no way to proceed past this point
s.publishError(sessionID, queryID, entity.MalformedRequestError, "Bad request: no database name")
s.publishStatus("Translating", sessionID, queryID)
// Convert the json byte msg to a query string
query, _, err := s.queryConverter.ConvertQuery(JSONQuery)
s.publishError(sessionID, queryID, entity.TranslationError, "Translation error")
// Send the resulting query back to the client
translationMsg := entity.MessageStruct{
Type: "query_translation_result",
Value: *query,
translationMsgBytes, _ := json.Marshal(translationMsg)
s.producer.PublishMessage(&translationMsgBytes, sessionID)
s.publishStatus("Getting database credentials", sessionID, queryID)
// Fetch database credentials from the user service
databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(clientID, &JSONQuery.DatabaseName)
s.publishError(sessionID, queryID, entity.DatabaseCredentialsError, "Bad credentials")
s.publishStatus("Processing on database", sessionID, queryID)
result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName)
s.publishError(sessionID, queryID, entity.DatabaseExecutionError, "Database error")
s.publishStatus("Completed", sessionID, queryID)
// Add the MachineLearning params and information to the result if it exists
var resultMsg entity.MessageStruct
if len(JSONQuery.MachineLearning) == 0 {
resultMsg = entity.MessageStruct{
QueryID: *queryID,
Type: "query_result",
Value: res,
}
} else if len(JSONQuery.MachineLearning) == 1 {
resultMsg = entity.MessageStruct{
QueryID: *queryID,
Type: "query_result",
Value: res,
MLAttributes: JSONQuery.MachineLearning[0].Parameters,
}
}
resultMsgBytes, err := json.Marshal(resultMsg)
if err != nil {
errorhandler.LogError(err, "Marshalling query_result went wrong!") // TODO: send error message to client instead
return
}
// If MachineLearning object is of length 0 we do regular query processing
// If MachineLearning object is of length 1 we just take the first ML item queuename
// If there are more than items in the list (current implementation does not support this but future might) we should do something different
if len(JSONQuery.MachineLearning) == 0 {
s.producer.PublishMessage(&resultMsgBytes, sessionID)
} else if len(JSONQuery.MachineLearning) == 1 {
s.producer.PublishMLMessage(&resultMsgBytes, sessionID, &JSONQuery.MachineLearning[0].Queuename)
}
cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20)
defer cancelCacheQueryResult()
s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", *clientID, *queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes))