diff --git a/Makefile b/Makefile index 4b791381fe258b7c08e5e62ab8a6682a5ed2c3f2..36118875197d3c15d4823fba5effe3ad946a471e 100644 --- a/Makefile +++ b/Makefile @@ -35,16 +35,14 @@ run: ./builds/main develop: - # RabbitMQ env variables - $(eval export RABBIT_USER := guest) - $(eval export RABBIT_PASSWORD := guest) - $(eval export RABBIT_HOST := localhost) + + $(eval export RABBIT_USER := haha-test) + $(eval export RABBIT_PASSWORD := dikkedraak) + $(eval export RABBIT_HOST := 192.168.178.158) $(eval export RABBIT_PORT := 5672) - # Whether to log $(eval export LOG_MESSAGES := true) - # Redis env variables $(eval export REDIS_ADDRESS := localhost:6379) @go run cmd/query-service/main.go diff --git a/cmd/query-service/main.go b/cmd/query-service/main.go index bb6200a6b09f345f9fc0983195ce9a64e2cc8da6..a992bbe81e02b607db0122f780bfb134afa87631 100644 --- a/cmd/query-service/main.go +++ b/cmd/query-service/main.go @@ -1,179 +1,39 @@ package main import ( - "context" - "encoding/json" - "fmt" - "log" - "query-service/internal/aql" - "query-service/internal/entity" - "query-service/internal/errorhandler" - "query-service/internal/messagequeue" - "query-service/internal/redisclient" - "query-service/internal/usecases/redisclient" - - "query-service/internal/request" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" + "query-service/internal/adapters/brokeradapter" + "query-service/internal/drivers/brokerdriver" + "query-service/internal/drivers/keyvaluedriver" + "query-service/internal/usecases/consume" + "query-service/internal/usecases/convertquery" + "query-service/internal/usecases/produce" + "query-service/internal/usecases/request" + "query-service/pkg/logger" ) -var producer alice.Producer - func main() { - //FORGLORY() - - // Initialize redis client - entity.NewRedisClient() + logger.Start() // MARK: Create relevant services - redisService := redisclient.NewService() - - exchangeID := "query-requests" - routingKey := "aql-user-request" + redisService := keyvaluedriver.NewRedisDriver() - broker := messagequeue.Create() - messagequeue.StartConsumer(broker, &exchangeID, &routingKey, onMessageReceived) + // MARK: Create alice RabbitMQ services + brokerGateway := brokeradapter.CreateGateway() + aliceBroker := brokerdriver.CreateAliceBroker(brokerGateway) - producer = messagequeue.StartProducer(broker) + // Instantiate an implementation of the produce UseCase + produceService := produce.NewService(aliceBroker, redisService) - redisclient.Start() - - select {} - - // file, _ := ioutil.ReadFile("./internal/data/jsonQuery.json") - // query, _ := aql.ConvertJSONToAQL(&file) - - // result, _ := request.SendAQLQuery(*query) - // //fmt.Println(string(*result)) - // querymap := make(map[string]interface{}) - // querymap["type"] = "query_result" - // querymap["values"] = string(*result) - // fmt.Println(querymap) -} + // MARK: Create relevant services for consuming a message + convertQueryService := convertquery.NewService() + requestSenderService := request.NewService() -func onMessageReceived(msg amqp.Delivery) { - // Retrieve JSON formatted string payload from msg + consumeService := consume.NewService(aliceBroker, produceService, convertQueryService, requestSenderService) - // Bypass MSQ - // jsonQuery, _ := ioutil.ReadFile("./internal/data/jsonQuery.json") - aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body) + // MARK: Start services + produceService.Start() - // Convert the json byte msg to an aql query string - //aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body) - if err != nil { - errorhandler.LogError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead - return - } - fmt.Println("Query: " + *aqlQuery) + go consumeService.Start() - // Get the queueID for this sessionID - sessionID, ok := msg.Headers["sessionID"].(string) - if !ok { - log.Println("No sessionID passed in message") - return - } - - // Get queueID for this client - clientUpdaterID := redisclient.Conn.Get(context.Background(), sessionID).Val() - log.Println(clientUpdaterID) - - // producer.PublishMessage(aqlQuery, , &amqp.Table{}) - - // TODO : Generate database seperately - // execute and retrieve result - // convert result to general (node-link (?)) format - result, err := request.SendAQLQuery(*aqlQuery) - if err != nil { - return // TODO: Send message in queue notifying of error - } - - // TODO: Test MQ result - querymap := make(map[string]interface{}) - querymap["type"] = "query_result" - querymap["values"] = *result - querybyte, err := json.Marshal(querymap) - //fmt.Println(querymap) - - // Publish converted result - headers := amqp.Table{} - headers["sessionID"] = sessionID - headers["type"] = "schemaResult" - log.Println("publishing message") - producer.PublishMessage(querybyte, &clientUpdaterID, &headers) - msg.Ack(true) + select {} } - -// func FORGLORY() { -// s := `{ -// "Return": { -// "Entities": [ -// 0, -// 1 -// ], -// "Relations": [ -// 0 -// ] -// }, -// "Entities": [ -// { -// "Type": "airports", -// "Constraints": [ -// { -// "Attribute": "country", -// "Value": "USA", -// "DataType": "text", -// "MatchType": "exact" -// } -// ] -// }, -// { -// "Type": "airports", -// "Constraints": [ -// { -// "Attribute": "city", -// "Value": "New York", -// "DataType": "text", -// "MatchType": "exact" -// }, -// { -// "Attribute": "vip", -// "Value": "true", -// "DataType": "bool", -// "MatchType": "exact" -// } -// ] -// } -// ], -// "Relations": [ -// { -// "Type": "flights", -// "Depth": { -// "min": 1, -// "max": 1 -// }, -// "EntityFrom": 0, -// "EntityTo": 1, -// "Constraints": [ -// { -// "Attribute": "Month", -// "Value": "1", -// "DataType": "number", -// "MatchType": "exact" -// }, -// { -// "Attribute": "Day", -// "Value": "15", -// "DataType": "number", -// "MatchType": "exact" -// } -// ] -// } -// ] -// }` - -// s3 := []byte(s) - -// yeet, _ := aql.ConvertJSONToAQL(&s3) -// fmt.Print(*yeet) -// } diff --git a/go.sum b/go.sum index df4f2cab688029c43b6e227d614edd76ccda7a50..8d6d3a0e1942650c8c43cec06c4c6ebbf65bf261 100644 --- a/go.sum +++ b/go.sum @@ -52,7 +52,6 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= diff --git a/internal/drivers/brokerdriver/broker.go b/internal/drivers/brokerdriver/broker.go index a37e8cabed02d68be8ab58b1e0403c4a016f4be3..40726144e6ea285ea0ca110ddb201aaa216906df 100644 --- a/internal/drivers/brokerdriver/broker.go +++ b/internal/drivers/brokerdriver/broker.go @@ -4,8 +4,8 @@ import ( "os" "query-service/internal/adapters/brokeradapter" "query-service/pkg/errorhandler" - "strconv" + "time" "github.com/thijsheijden/alice" @@ -63,9 +63,11 @@ func (d *Driver) CreateConsumer() Consumer { return consumer } -// CreateProducer creates an AliceProducer on a certain exchange +// CreateAliceProducer creates an AliceProducer on a certain exchange func (d *Driver) CreateProducer() Producer { - exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct) + exchangeID := "ui-direct-exchange" + + exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) if err != nil { errorhandler.FailWithError(err, "failed to create exchange for producer") } diff --git a/internal/drivers/brokerdriver/consumer.go b/internal/drivers/brokerdriver/consumer.go index d233015d49f0953425573d7628d6ff97ffe1e9d2..194b2ad217e79868a423f2a6d17b27bfad5805ba 100644 --- a/internal/drivers/brokerdriver/consumer.go +++ b/internal/drivers/brokerdriver/consumer.go @@ -14,7 +14,7 @@ type AliceConsumer struct { messageHandler func(msg *brokeradapter.Message) } -// ConsumeMessages starts the consumer +// ConsumeMessages starts the consumer using an alice consumer func (ac *AliceConsumer) ConsumeMessages() { go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage) } diff --git a/internal/drivers/brokerdriver/interface.go b/internal/drivers/brokerdriver/interface.go index 211ea2ec5cdbbfa180638e1e20fd9532cc2ab2c8..3aacf64fea45b25f2eb1dd9c89792ceecbf8192f 100644 --- a/internal/drivers/brokerdriver/interface.go +++ b/internal/drivers/brokerdriver/interface.go @@ -20,5 +20,5 @@ type Consumer interface { // A Producer belongs to a broker and publishes messages to a queue type Producer interface { - PublishMessage(body []byte, queueID *string, headers amqp.Table) + PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) } diff --git a/internal/drivers/brokerdriver/producer.go b/internal/drivers/brokerdriver/producer.go index 00bb70e08151c1c23155edda22ee87e8f27293d1..a3df373ec86e691c6a675389921a037fa7ae6ac7 100644 --- a/internal/drivers/brokerdriver/producer.go +++ b/internal/drivers/brokerdriver/producer.go @@ -12,6 +12,6 @@ type AliceProducer struct { } // PublishMessage will publish a message to the specified queue id -func (ap *AliceProducer) PublishMessage(body []byte, queueID *string, headers *amqp.Table) { - ap.producer.PublishMessage(body, queueID, headers) +func (ap *AliceProducer) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) { + ap.producer.PublishMessage(*body, queueID, headers) } diff --git a/internal/drivers/keyvaluedriver/interface.go b/internal/drivers/keyvaluedriver/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..ad465b3f7f4212470076b09d9edaa38edab9892d --- /dev/null +++ b/internal/drivers/keyvaluedriver/interface.go @@ -0,0 +1,7 @@ +package keyvaluedriver + +// KeyValueStore is an interface for a key value storage +type KeyValueStore interface { + Get(key *string) *string + Set(key *string, value interface{}) error +} diff --git a/internal/drivers/keyvaluedriver/redisdriver.go b/internal/drivers/keyvaluedriver/redisdriver.go new file mode 100644 index 0000000000000000000000000000000000000000..c3420dd52d3196bdced96d97e8de3d30dccfd028 --- /dev/null +++ b/internal/drivers/keyvaluedriver/redisdriver.go @@ -0,0 +1,47 @@ +package keyvaluedriver + +import ( + "context" + "fmt" + "os" + "query-service/pkg/logger" + + "github.com/go-redis/redis/v8" +) + +// RedisDriver models the redis driver +type RedisDriver struct { + client *redis.Client +} + +// NewRedisDriver creates and returns a redis driver +func NewRedisDriver() *RedisDriver { + return &RedisDriver{} +} + +// Start starts the redis driver +func (d *RedisDriver) Start() { + // Grab the redis host and port from environment vars + redisAddress := os.Getenv("REDIS_ADDRESS") + // redisPassword := os.Getenv("REDIS_PASSWORD") + + // Create redis client + d.client = redis.NewClient(&redis.Options{ + Addr: redisAddress, + }) + + pong := d.client.Ping(context.Background()) + logger.Log(fmt.Sprintf("%v", pong)) +} + +// Get retrieves the value from the redis store that belongs to the given key +func (d *RedisDriver) Get(key *string) *string { + value := d.client.Get(context.Background(), *key).Val() + return &value +} + +// Set sets the key value pair in the redis store +func (d *RedisDriver) Set(key *string, value interface{}) error { + status := d.client.Set(context.Background(), *key, value, 0) + return status.Err() +} diff --git a/internal/entity/redis.go b/internal/entity/redis.go index cbceeedd6f25dbc00f2fca0702b14e17d1972300..30fd277bb23cc4326b9b63ef3509c3b8f0a16804 100644 --- a/internal/entity/redis.go +++ b/internal/entity/redis.go @@ -14,15 +14,5 @@ var RedisClient *redis.Client // NewRedisClient creates a new redis client func NewRedisClient() { - // Grab the redis host and port from environment vars - redisAddress := os.Getenv("REDIS_ADDRESS") - // redisPassword := os.Getenv("REDIS_PASSWORD") - - // Create redis client - RedisClient = redis.NewClient(&redis.Options{ - Addr: redisAddress, - }) - - pong := RedisClient.Ping(context.Background()) - logger.Log(fmt.Sprintf("%v", pong)) + } diff --git a/internal/usecases/consume/consume.go b/internal/usecases/consume/consume.go index 4dbd4680d06bbc39bc9dfc1ba4900e0cbac06970..a2426fe427b3a2e2f221da4c753d2051f29c8863 100644 --- a/internal/usecases/consume/consume.go +++ b/internal/usecases/consume/consume.go @@ -1,20 +1,28 @@ package consume import ( - "errors" - "query-service/internal/errorhandler" - "query-service/pkg/logger" + "query-service/internal/drivers/brokerdriver" + "query-service/internal/usecases/convertquery" + "query-service/internal/usecases/produce" + "query-service/internal/usecases/request" ) // Service wraps consumer methods +// broker is Alice broker created in brockerdriver driver type Service struct { - broker brokerdriver.Broker + broker brokerdriver.Broker + producer produce.UseCase + queryConverter convertquery.UseCase + requestSender request.UseCase } // NewService creates a new service -func NewService(broker brokerdriver.Broker) *Service { +func NewService(broker brokerdriver.Broker, produceService produce.UseCase, converQueryService convertquery.UseCase, requestSenderService request.UseCase) *Service { return &Service{ - broker: broker, + broker: broker, + producer: produceService, + queryConverter: converQueryService, + requestSender: requestSenderService, } } @@ -28,51 +36,3 @@ func (s *Service) Start() { // Start consuming messages consumer.ConsumeMessages() } - -func (s *Service) HandleMessage(msg *brokeradapter.Message) { - // Grab sessionID from the headers - sessionID, ok := msg.Headers["sessionID"].(string) - if !ok { - // TODO: Handle error where there is no session ID supplied - } - - // Look at the result type in the headers - switch msg.Headers["type"] { - case "schemaResult": // A schema result - logger.Log("Received schema result for session " + sessionID) - // Send the result into the websocket - s.socketService.WriteToClient(&sessionID, msg.Body) - case "queryResult": // A query result - logger.Log("Received query result for session " + sessionID) - // Send the result into the websocket - s.socketService.WriteToClient(&sessionID, msg.Body) - default: - logger.Log("Received untyped message for " + sessionID) - errorhandler.LogError(errors.New("undefined message type sent"), "error while consuming") - } -} - -// // ConsumeMessageFunc is a function type to be called when a message is consumed -// type ConsumeMessageFunc func(amqp.Delivery) - -// // StartConsumer will start a consumer -// // When a message is received the consumeMessage function will be called -// func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) { -// // Declare the exchange we want to bind to -// exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct) -// if err != nil { -// errorhandler.FailWithError(err, "failed to create exchange") -// } - -// // Declare the queue we will consume from -// queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil) - -// // Create the consumer -// c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler) -// if err != nil { -// errorhandler.FailWithError(err, "failed to create consumer") -// } - -// // Start consuming messages -// go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) }) -// } diff --git a/internal/usecases/consume/handlemessage.go b/internal/usecases/consume/handlemessage.go new file mode 100644 index 0000000000000000000000000000000000000000..ba9787b8356b44f4e2cf5b14896c96771362afd3 --- /dev/null +++ b/internal/usecases/consume/handlemessage.go @@ -0,0 +1,45 @@ +package consume + +import ( + "encoding/json" + "fmt" + "query-service/internal/adapters/brokeradapter" + "query-service/pkg/errorhandler" + "query-service/pkg/logger" +) + +// HandleMessage gets called when a message is received +func (s *Service) HandleMessage(msg *brokeradapter.Message) { + // Grab sessionID from the headers + sessionID, ok := msg.Headers["sessionID"].(string) + if !ok { + // TODO: Handle error where there is no session ID supplied + } + + // Convert the json byte msg to a query string + query, err := s.queryConverter.ConvertQuery(&msg.Body) + if err != nil { + errorhandler.LogError(err, "failed to parse incoming msg to query language") // TODO: don't panic on error, send error message to client instead + return + } + fmt.Println("Query: " + *query) + + // Make request to database + // TODO : Generate database seperately + // execute and retrieve result + // convert result to general (node-link (?)) format + result, err := s.requestSender.SendAQLQuery(*query) + if err != nil { + logger.Log(err.Error()) + return // TODO: Send message in queue notifying of error + } + + // Add type indicator to result from database + querymap := make(map[string]interface{}) + querymap["type"] = "query_result" + querymap["values"] = *result + querybyte, err := json.Marshal(querymap) + //fmt.Println(querymap) + + s.producer.PublishMessage(&querybyte, &sessionID) +} diff --git a/internal/usecases/consume/interface.go b/internal/usecases/consume/interface.go index f0abaf4178bd8b6a14e8bd4d80a6d52a13d0d544..f2bb8d785dfe95e1124b392adca15ecabedc1438 100644 --- a/internal/usecases/consume/interface.go +++ b/internal/usecases/consume/interface.go @@ -3,5 +3,4 @@ package consume // UseCase is an interface describing the socket usecases type UseCase interface { Start() - ConsumeMessages() } diff --git a/internal/usecases/queryconversion/aql.go b/internal/usecases/convertquery/aql.go similarity index 97% rename from internal/usecases/queryconversion/aql.go rename to internal/usecases/convertquery/aql.go index 8b5cb0aba71c3355aaa214455aa3909fd05390c0..dd776e223e749bf1251b2be9e3cb2f42196379d0 100644 --- a/internal/usecases/queryconversion/aql.go +++ b/internal/usecases/convertquery/aql.go @@ -1,4 +1,4 @@ -package queryconversion +package convertquery import ( "encoding/json" @@ -7,6 +7,15 @@ import ( "strings" ) +// Service is a model for the convertquery use case +type Service struct { +} + +// NewService creates a new convertquery service +func NewService() *Service { + return &Service{} +} + /* // Query format for exporting to JSON export type JSONFormat = { @@ -123,7 +132,7 @@ type constraintStruct struct { } // ConvertQuery converts a json string to an AQL query -func ConvertQuery(jsonMsg *[]byte) (*string, error) { +func (s *Service) ConvertQuery(jsonMsg *[]byte) (*string, error) { jsonStruct, err := convertJSONToStruct(jsonMsg) if err != nil { diff --git a/internal/usecases/convertquery/aql_test.go b/internal/usecases/convertquery/aql_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8563c2d657ba7d04bb86daf064ad8bc29412ee52 --- /dev/null +++ b/internal/usecases/convertquery/aql_test.go @@ -0,0 +1,89 @@ +package convertquery + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMock(t *testing.T) { + + assert.True(t, true, true) +} + +// func TestHugeQuery(t *testing.T) { + +// s := `{ +// "Return": { +// "Entities": [ +// 0, +// 1 +// ], +// "Relations": [ +// 0 +// ] +// }, +// "Entities": [ +// { +// "Type": "airports", +// "Constraints": [ +// { +// "Attribute": "country", +// "Value": "USA", +// "DataType": "text", +// "MatchType": "exact" +// } +// ] +// }, +// { +// "Type": "airports", +// "Constraints": [ +// { +// "Attribute": "city", +// "Value": "New York", +// "DataType": "text", +// "MatchType": "exact" +// }, +// { +// "Attribute": "vip", +// "Value": "true", +// "DataType": "bool", +// "MatchType": "exact" +// } +// ] +// } +// ], +// "Relations": [ +// { +// "Type": "flights", +// "Depth": { +// "min": 1, +// "max": 1 +// }, +// "EntityFrom": 0, +// "EntityTo": 1, +// "Constraints": [ +// { +// "Attribute": "Month", +// "Value": "1", +// "DataType": "number", +// "MatchType": "exact" +// }, +// { +// "Attribute": "Day", +// "Value": "15", +// "DataType": "number", +// "MatchType": "exact" +// } +// ] +// } +// ] +// }` + +// s3 := []byte(s) +// j, _ := ConvertQuery(&s3) + +// fmt.Print(j) + +// assert.True(t, true, true) +// } diff --git a/internal/usecases/queryconversion/benchmark.txt b/internal/usecases/convertquery/benchmark.txt similarity index 100% rename from internal/usecases/queryconversion/benchmark.txt rename to internal/usecases/convertquery/benchmark.txt diff --git a/internal/usecases/queryconversion/interface.go b/internal/usecases/convertquery/interface.go similarity index 88% rename from internal/usecases/queryconversion/interface.go rename to internal/usecases/convertquery/interface.go index 9990e897c00ecba3f44ad77e5fed37836df596a0..5155e7bce52b1941de5726a18c1d6f7133ca6970 100644 --- a/internal/usecases/queryconversion/interface.go +++ b/internal/usecases/convertquery/interface.go @@ -1,4 +1,4 @@ -package queryconversion +package convertquery // UseCase is an interface describing a function for converting a visual query // to a query of the database language diff --git a/internal/usecases/produce/interface.go b/internal/usecases/produce/interface.go index e1a44cda0545131e8a78639c7632c9bca2cb089a..f3b75e8d57fa6b5edb5c1b79699b6c5267f2f9c0 100644 --- a/internal/usecases/produce/interface.go +++ b/internal/usecases/produce/interface.go @@ -1 +1,6 @@ package produce + +// UseCase is an interface describing the produce usecases +type UseCase interface { + PublishMessage(data *[]byte, sessionID *string) +} diff --git a/internal/usecases/produce/produce.go b/internal/usecases/produce/produce.go index e1a44cda0545131e8a78639c7632c9bca2cb089a..cd1e0c9b6351f555ef8d86b80c71b64db873a38d 100644 --- a/internal/usecases/produce/produce.go +++ b/internal/usecases/produce/produce.go @@ -1 +1,44 @@ package produce + +import ( + "query-service/internal/drivers/brokerdriver" + "query-service/internal/drivers/keyvaluedriver" + + "github.com/streadway/amqp" +) + +// Service wraps consumer methods +type Service struct { + brokerDriver brokerdriver.Broker + keyValueStore keyvaluedriver.KeyValueStore + producerDriver brokerdriver.Producer +} + +// NewService creates a new service +func NewService(broker brokerdriver.Broker, keyValueStore keyvaluedriver.KeyValueStore) *Service { + return &Service{ + brokerDriver: broker, + keyValueStore: keyValueStore, + } +} + +// Start starts the producer +func (s *Service) Start() { + // Create producer + p := s.brokerDriver.CreateProducer() + + s.producerDriver = p + + // // Start consuming messages + // p.ConsumeMessages() +} + +// PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID +func (s *Service) PublishMessage(data *[]byte, sessionID *string) { + clientUpdaterID := s.keyValueStore.Get(sessionID) + + headers := amqp.Table{} + headers["sessionID"] = sessionID + headers["type"] = "queryResult" + s.producerDriver.PublishMessage(data, clientUpdaterID, &headers) +} diff --git a/internal/usecases/queryconversion/aql_test.go b/internal/usecases/queryconversion/aql_test.go deleted file mode 100644 index a61c75f2f9a4129a7a3660bcb230e6617afd86b4..0000000000000000000000000000000000000000 --- a/internal/usecases/queryconversion/aql_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package queryconversion - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMock(t *testing.T) { - - assert.True(t, true, true) -} - -func TestHugeQuery(t *testing.T) { - - s := `{ - "Return": { - "Entities": [ - 0, - 1 - ], - "Relations": [ - 0 - ] - }, - "Entities": [ - { - "Type": "airports", - "Constraints": [ - { - "Attribute": "country", - "Value": "USA", - "DataType": "text", - "MatchType": "exact" - } - ] - }, - { - "Type": "airports", - "Constraints": [ - { - "Attribute": "city", - "Value": "New York", - "DataType": "text", - "MatchType": "exact" - }, - { - "Attribute": "vip", - "Value": "true", - "DataType": "bool", - "MatchType": "exact" - } - ] - } - ], - "Relations": [ - { - "Type": "flights", - "Depth": { - "min": 1, - "max": 1 - }, - "EntityFrom": 0, - "EntityTo": 1, - "Constraints": [ - { - "Attribute": "Month", - "Value": "1", - "DataType": "number", - "MatchType": "exact" - }, - { - "Attribute": "Day", - "Value": "15", - "DataType": "number", - "MatchType": "exact" - } - ] - } - ] - }` - - s3 := []byte(s) - j, _ := ConvertQuery(&s3) - - fmt.Print(j) - - assert.True(t, true, true) -} diff --git a/internal/usecases/redisclient/interface.go b/internal/usecases/redisclient/interface.go deleted file mode 100644 index 299504a301ee60a549a8dfd84dc30ae3c7b34b82..0000000000000000000000000000000000000000 --- a/internal/usecases/redisclient/interface.go +++ /dev/null @@ -1,12 +0,0 @@ -package redisclient - -import "context" - -// UseCase is an interface describing the redis service -type UseCase interface { - - // GetRouting will retrieve the client updated service ID - // to publish the result to - // Requires the client request sessionID - GetRouting(context.Context, *string) -} diff --git a/internal/usecases/redisclient/redis.go b/internal/usecases/redisclient/redis.go deleted file mode 100644 index d62cd3a90ea6527cdca3da5643133950ec4c776c..0000000000000000000000000000000000000000 --- a/internal/usecases/redisclient/redis.go +++ /dev/null @@ -1,22 +0,0 @@ -package redisclient - -import ( - "context" - "query-service/internal/entity" -) - -// Service wraps redis usecases -type Service struct { -} - -// NewService creates a new redis service and returns it -func NewService() *Service { - return &Service{} -} - -// GetRouting get the client update service queue id -// Takes the context and the sessionID -func (s *Service) GetRouting(ctx context.Context, sessionID *string) *string { - clientUpdaterID := entity.RedisClient.Get(ctx, *sessionID).Val() - return &clientUpdaterID -} diff --git a/internal/usecases/request/interface.go b/internal/usecases/request/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..e8847b81e2001e2770445db49c22fb978c270a60 --- /dev/null +++ b/internal/usecases/request/interface.go @@ -0,0 +1,6 @@ +package request + +// UseCase is an interface describing the request usecases +type UseCase interface { + SendAQLQuery(query string) (*[]Document, error) +} diff --git a/internal/request/request.go b/internal/usecases/request/request.go similarity index 95% rename from internal/request/request.go rename to internal/usecases/request/request.go index ea6058b4c7b06cb457cbf8b9e426949d17a44a63..e146a5497c12f05bd3d3ae6b7a925acf67509add 100644 --- a/internal/request/request.go +++ b/internal/usecases/request/request.go @@ -13,6 +13,13 @@ import ( "github.com/arangodb/go-driver/http" ) +type Service struct { +} + +func NewService() *Service { + return &Service{} +} + // Document with Empty struct to retrieve all data from the DB Document type Document map[string]interface{} @@ -24,7 +31,7 @@ type GeneralFormat map[string][]Document //map[1 , 2 , 3 map [ .. ]] // SendAQLQuery send AQL string query to database and returns a JSON object in a general format -func SendAQLQuery(AQLQuery string) (*[]Document, error) { +func (s *Service) SendAQLQuery(AQLQuery string) (*[]Document, error) { var queryResult []Document conn, err := http.NewConnection(http.ConnectionConfig{ Endpoints: []string{"https://aae8f5c054da.arangodb.cloud:8529"}, diff --git a/main.exe b/main.exe deleted file mode 100644 index cf37186e615e25dabd0ee61f39baf8b95aa572c0..0000000000000000000000000000000000000000 Binary files a/main.exe and /dev/null differ diff --git a/result.json b/result.json index 4e4af0a83944a020ecf8aee9af950e7ad52a437a..b3d8db87d40f6145077b86578c6281f62e5eaeae 100644 --- a/result.json +++ b/result.json @@ -82,19 +82,5 @@ "state": "NY", "vip": false } - }, - { - "_id": "airports/01J", - "_key": "01J", - "_rev": "_cIYKbri--H", - "attributes": { - "city": "Hilliard", - "country": "USA", - "lat": 30.6880125, - "long": -81.90594389, - "name": "Hilliard Airpark", - "state": "FL", - "vip": false - } } ] \ No newline at end of file