Skip to content
Snippets Groups Projects
Commit fefc0ff0 authored by thijsheijden's avatar thijsheijden
Browse files

Cleanup of error handling in consumemessage func

parent ebb62686
Branches
Tags
No related merge requests found
package entity
/*
ServiceError defines how internal service errors look
*/
type ServiceError struct {
errorMessage string
code ErrorCode
}
/*
ErrorCode is an error code
*/
type ErrorCode int
const (
// TranslationError is thrown if there is an error translating
TranslationError ErrorCode = iota
// MalformedRequestError is thrown if the query request is malformed
MalformedRequestError
// DatabaseCredentialsError is thrown if the database credentials can't be retrieved
DatabaseCredentialsError
// DatabaseExecutionError is thrown if there is an error while executing the query on the database
DatabaseExecutionError
// MessageHeaderError is thrown if headers are missing from request
MessageHeaderError
)
/*
NewError creates a new ServiceError
errorMessage: string, the error message
code: int, the error code
Return: error, a created error
*/
func NewError(errorMessage string, code ErrorCode) error {
return &ServiceError{
errorMessage: errorMessage,
code: TranslationError,
}
}
func (se *ServiceError) Error() string {
return se.errorMessage
}
...@@ -18,3 +18,13 @@ type Status struct { ...@@ -18,3 +18,13 @@ type Status struct {
Status string `json:"status"` Status string `json:"status"`
QueryID string `json:"queryID"` QueryID string `json:"queryID"`
} }
/*
ErrorMessage describes an error message which is sent to the frontend
*/
type ErrorMessage struct {
Type string `json:"type"`
QueryID string `json:"queryID"`
Code int `json:"errorCode"`
Value string `json:"value"`
}
package consume
import "query-service/internal/entity"
// getHeaders retrieves the headers from the message
func getHeaders(headers *map[string]interface{}) (*string, *string, *string, error) {
sessionID, ok := (*headers)["sessionID"].(string)
if !ok {
return nil, nil, nil, entity.NewError("missing sessionID in header", entity.MessageHeaderError)
}
clientID, ok := (*headers)["clientID"].(string)
if !ok {
return nil, nil, nil, entity.NewError("missing clientID in header", entity.MessageHeaderError)
}
var queryID string
queryID, ok = (*headers)["queryID"].(string)
if !ok {
return nil, nil, nil, entity.NewError("missing queryID in header", entity.MessageHeaderError)
}
return &sessionID, &clientID, &queryID, nil
}
...@@ -5,6 +5,7 @@ import ( ...@@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"log"
"query-service/internal/entity" "query-service/internal/entity"
"query-service/pkg/errorhandler" "query-service/pkg/errorhandler"
"time" "time"
...@@ -15,116 +16,74 @@ import ( ...@@ -15,116 +16,74 @@ import (
// HandleMessage gets called when a message is received // HandleMessage gets called when a message is received
func (s *Service) HandleMessage(msg *broker.Message) { func (s *Service) HandleMessage(msg *broker.Message) {
// Grab sessionID and clientID from the headers // Grab the headers from the message
sessionID, ok := msg.Headers["sessionID"].(string) sessionID, clientID, queryID, err := getHeaders(&msg.Headers)
if !ok { if err != nil {
log.Println("no headers in message")
return return
} }
clientID, ok := msg.Headers["clientID"].(string) s.publishStatus("Received", sessionID, queryID)
if !ok {
return
}
var queryID string
queryID, ok = msg.Headers["queryID"].(string)
if !ok {
queryID = ""
}
s.sendStatus("Received", &sessionID, &queryID)
// Unmarshall the incoming message into an IncomingJSONQuery object // Unmarshall the incoming message into an IncomingJSONQuery object
JSONQuery, err := query.UnmarshalJSON(&msg.Body) JSONQuery, err := query.UnmarshalJSON(&msg.Body)
if err != nil { if err != nil {
errorMsg := entity.MessageStruct{ s.publishError(sessionID, queryID, entity.MalformedRequestError, "malformed request")
QueryID: queryID,
Type: "query_translation_error",
Value: "failed to unmarshall the incoming JSON query",
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, &sessionID)
return return
} }
// Check if there was a database name // Check if there was a database name
if JSONQuery.DatabaseName == "" { if JSONQuery.DatabaseName == "" {
errorMsg := entity.MessageStruct{ // If not, there is no way to proceed past this point
QueryID: queryID, s.publishError(sessionID, queryID, entity.MalformedRequestError, "malformed request - no database name")
Type: "query_malformed_request_error",
Value: "no database name supplied",
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, &sessionID)
return return
} }
s.sendStatus("Translating", &sessionID, &queryID) s.publishStatus("Translating", sessionID, queryID)
// Convert the json byte msg to a query string // Convert the json byte msg to a query string
query, err := s.queryConverter.ConvertQuery(JSONQuery) query, err := s.queryConverter.ConvertQuery(JSONQuery)
if err != nil { if err != nil {
errorMsg := entity.MessageStruct{ s.publishError(sessionID, queryID, entity.TranslationError, "failed to translate query")
QueryID: queryID,
Type: "query_translation_error",
Value: err.Error(),
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, &sessionID)
return return
} }
// Send the resulting query back to the client // Send the resulting query back to the client
translationMsg := entity.MessageStruct{ translationMsg := entity.MessageStruct{
QueryID: queryID, QueryID: *queryID,
Type: "query_translation_result", Type: "query_translation_result",
Value: *query, Value: *query,
} }
translationMsgBytes, err := json.Marshal(translationMsg) translationMsgBytes, _ := json.Marshal(translationMsg)
if err != nil {
errorhandler.LogError(err, "Marshalling query_translation_result went wrong!") // TODO: send error message to client instead
return
}
s.producer.PublishMessage(&translationMsgBytes, &sessionID) // TODO: should this be a go routine?
s.sendStatus("Getting database credentials", &sessionID, &queryID) s.producer.PublishMessage(&translationMsgBytes, sessionID)
s.publishStatus("Getting database credentials", sessionID, queryID)
// Fetch database credentials from the user service // Fetch database credentials from the user service
databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(&clientID, &JSONQuery.DatabaseName) databaseInfo, err := s.databaseInfoService.GetDatabaseInfo(clientID, &JSONQuery.DatabaseName)
if err != nil { if err != nil {
errorMsg := entity.MessageStruct{ s.publishError(sessionID, queryID, entity.DatabaseCredentialsError, "failed to retrieve database credentials")
QueryID: queryID,
Type: "query_database_not_found",
Value: err.Error(),
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, &sessionID)
return return
} }
s.sendStatus("Processing on database", &sessionID, &queryID) s.publishStatus("Processing on database", sessionID, queryID)
// execute and retrieve result // execute and retrieve result
// convert result to general (node-link) format // convert result to general (node-link) format
result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName) result, err := s.requestSender.ExecuteQuery(*query, databaseInfo.Username, databaseInfo.Password, databaseInfo.URL, databaseInfo.Port, databaseInfo.InternalDatabaseName)
if err != nil { if err != nil {
errorMsg := entity.MessageStruct{ s.publishError(sessionID, queryID, entity.DatabaseExecutionError, "query failed to execute on database")
QueryID: queryID,
Type: "query_database_error",
Value: err.Error(),
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, &sessionID)
return return
} }
s.sendStatus("Completed", &sessionID, &queryID) s.publishStatus("Completed", sessionID, queryID)
// Add type indicator to result from database // Add type indicator to result from database
var res interface{} var res interface{}
json.Unmarshal(*result, &res) json.Unmarshal(*result, &res)
resultMsg := entity.MessageStruct{ resultMsg := entity.MessageStruct{
QueryID: queryID, QueryID: *queryID,
Type: "query_result", Type: "query_result",
Value: res, Value: res,
} }
...@@ -135,30 +94,12 @@ func (s *Service) HandleMessage(msg *broker.Message) { ...@@ -135,30 +94,12 @@ func (s *Service) HandleMessage(msg *broker.Message) {
} }
// Publish the result // Publish the result
s.producer.PublishMessage(&resultMsgBytes, &sessionID) s.producer.PublishMessage(&resultMsgBytes, sessionID)
// Caching message in object store // Caching message in object store
if queryID != "" { if *queryID != "" {
cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20) cacheQueryResultContext, cancelCacheQueryResult := context.WithTimeout(context.Background(), time.Second*20)
defer cancelCacheQueryResult() defer cancelCacheQueryResult()
s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", sessionID, queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes)) s.objectStore.Put(cacheQueryResultContext, "cached-queries", fmt.Sprintf("%s-%s", *sessionID, *queryID), int64(len(resultMsgBytes)), bytes.NewReader(resultMsgBytes))
} }
} }
func (s *Service) sendStatus(status string, sessionID *string, queryID *string) {
msg := entity.StatusMessageStruct{
Type: "query_status_update",
Value: entity.Status{
Status: status,
QueryID: *queryID,
},
}
msgBytes, err := json.Marshal(msg)
if err != nil {
errorhandler.FailWithError(err, "Marshalling status update message went wrong")
return
}
s.producer.PublishMessage(&msgBytes, sessionID)
}
package consume
import (
"encoding/json"
"query-service/internal/entity"
)
func (s *Service) publishError(sessionID *string, queryID *string, errorCode entity.ErrorCode, reason string) {
errorMsg := entity.ErrorMessage{
Type: "query_error",
QueryID: *queryID,
Code: int(errorCode),
Value: reason,
}
errorMsgBytes, _ := json.Marshal(errorMsg)
s.producer.PublishMessage(&errorMsgBytes, sessionID)
return
}
package consume
import (
"encoding/json"
"query-service/internal/entity"
"query-service/pkg/errorhandler"
)
func (s *Service) publishStatus(status string, sessionID *string, queryID *string) {
msg := entity.StatusMessageStruct{
Type: "query_status_update",
Value: entity.Status{
Status: status,
QueryID: *queryID,
},
}
msgBytes, err := json.Marshal(msg)
if err != nil {
errorhandler.FailWithError(err, "Marshalling status update message went wrong")
return
}
s.producer.PublishMessage(&msgBytes, sessionID)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment