diff --git a/Makefile b/Makefile index 4b791381fe258b7c08e5e62ab8a6682a5ed2c3f2..36118875197d3c15d4823fba5effe3ad946a471e 100644 --- a/Makefile +++ b/Makefile @@ -35,16 +35,14 @@ run: ./builds/main develop: - # RabbitMQ env variables - $(eval export RABBIT_USER := guest) - $(eval export RABBIT_PASSWORD := guest) - $(eval export RABBIT_HOST := localhost) + + $(eval export RABBIT_USER := haha-test) + $(eval export RABBIT_PASSWORD := dikkedraak) + $(eval export RABBIT_HOST := 192.168.178.158) $(eval export RABBIT_PORT := 5672) - # Whether to log $(eval export LOG_MESSAGES := true) - # Redis env variables $(eval export REDIS_ADDRESS := localhost:6379) @go run cmd/query-service/main.go diff --git a/cmd/query-service/main.go b/cmd/query-service/main.go index 58bfa1ae40ed0a3308cc1d7ddd3169051e14eac6..a992bbe81e02b607db0122f780bfb134afa87631 100644 --- a/cmd/query-service/main.go +++ b/cmd/query-service/main.go @@ -1,171 +1,39 @@ package main import ( - "context" - "encoding/json" - "fmt" - "log" - "query-service/internal/aql" - "query-service/internal/errorhandler" - "query-service/internal/messagequeue" - "query-service/internal/redisclient" - - "query-service/internal/request" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" + "query-service/internal/adapters/brokeradapter" + "query-service/internal/drivers/brokerdriver" + "query-service/internal/drivers/keyvaluedriver" + "query-service/internal/usecases/consume" + "query-service/internal/usecases/convertquery" + "query-service/internal/usecases/produce" + "query-service/internal/usecases/request" + "query-service/pkg/logger" ) -var producer alice.Producer - func main() { - //FORGLORY() - - exchangeID := "query-requests" - routingKey := "aql-user-request" - - broker := messagequeue.Create() - messagequeue.StartConsumer(broker, &exchangeID, &routingKey, onMessageReceived) + logger.Start() - producer = messagequeue.StartProducer(broker) + // MARK: Create relevant services + redisService := keyvaluedriver.NewRedisDriver() - redisclient.Start() + // MARK: Create alice RabbitMQ services + brokerGateway := brokeradapter.CreateGateway() + aliceBroker := brokerdriver.CreateAliceBroker(brokerGateway) - select {} - - // file, _ := ioutil.ReadFile("./internal/data/jsonQuery.json") - // query, _ := aql.ConvertJSONToAQL(&file) - - // result, _ := request.SendAQLQuery(*query) - // //fmt.Println(string(*result)) - // querymap := make(map[string]interface{}) - // querymap["type"] = "query_result" - // querymap["values"] = string(*result) - // fmt.Println(querymap) -} + // Instantiate an implementation of the produce UseCase + produceService := produce.NewService(aliceBroker, redisService) -func onMessageReceived(msg amqp.Delivery) { - // Retrieve JSON formatted string payload from msg + // MARK: Create relevant services for consuming a message + convertQueryService := convertquery.NewService() + requestSenderService := request.NewService() - // Bypass MSQ - // jsonQuery, _ := ioutil.ReadFile("./internal/data/jsonQuery.json") - aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body) + consumeService := consume.NewService(aliceBroker, produceService, convertQueryService, requestSenderService) - // Convert the json byte msg to an aql query string - //aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body) - if err != nil { - errorhandler.LogError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead - return - } - fmt.Println("Query: " + *aqlQuery) + // MARK: Start services + produceService.Start() - // Get the queueID for this sessionID - sessionID, ok := msg.Headers["sessionID"].(string) - if !ok { - log.Println("No sessionID passed in message") - return - } + go consumeService.Start() - // Get queueID for this client - queueID := redisclient.Conn.Get(context.Background(), sessionID).Val() - log.Println(queueID) - - // producer.PublishMessage(aqlQuery, , &amqp.Table{}) - - // TODO : Generate databse seperatly - // execute and retrieve result - // convert result to general (node-link (?)) format - result, err := request.SendAQLQuery(*aqlQuery) - if err != nil { - return // TODO: Send message in queue notifying of error - } - - // TODO: Test MQ result - querymap := make(map[string]interface{}) - querymap["type"] = "query_result" - querymap["values"] = *result - querybyte, err := json.Marshal(querymap) - //fmt.Println(querymap) - - // publish converted result - headers := amqp.Table{} - headers["sessionID"] = sessionID - headers["type"] = "schemaResult" - log.Println("publishing message") - producer.PublishMessage(querybyte, &queueID, &headers) - msg.Ack(true) + select {} } - -// func FORGLORY() { -// s := `{ -// "Return": { -// "Entities": [ -// 0, -// 1 -// ], -// "Relations": [ -// 0 -// ] -// }, -// "Entities": [ -// { -// "Type": "airports", -// "Constraints": [ -// { -// "Attribute": "country", -// "Value": "USA", -// "DataType": "text", -// "MatchType": "exact" -// } -// ] -// }, -// { -// "Type": "airports", -// "Constraints": [ -// { -// "Attribute": "city", -// "Value": "New York", -// "DataType": "text", -// "MatchType": "exact" -// }, -// { -// "Attribute": "vip", -// "Value": "true", -// "DataType": "bool", -// "MatchType": "exact" -// } -// ] -// } -// ], -// "Relations": [ -// { -// "Type": "flights", -// "Depth": { -// "min": 1, -// "max": 1 -// }, -// "EntityFrom": 0, -// "EntityTo": 1, -// "Constraints": [ -// { -// "Attribute": "Month", -// "Value": "1", -// "DataType": "number", -// "MatchType": "exact" -// }, -// { -// "Attribute": "Day", -// "Value": "15", -// "DataType": "number", -// "MatchType": "exact" -// } -// ] -// } -// ] -// }` - -// s3 := []byte(s) - -// yeet, _ := aql.ConvertJSONToAQL(&s3) -// fmt.Print(*yeet) -// } diff --git a/go.sum b/go.sum index df4f2cab688029c43b6e227d614edd76ccda7a50..8d6d3a0e1942650c8c43cec06c4c6ebbf65bf261 100644 --- a/go.sum +++ b/go.sum @@ -52,7 +52,6 @@ github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/zerolog v1.19.0/go.mod h1:IzD0RJ65iWH0w97OQQebJEvTZYvsCUm9WVLWBQrJRjo= github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= -github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= diff --git a/internal/adapters/brokeradapter/brokeradapter.go b/internal/adapters/brokeradapter/brokeradapter.go new file mode 100644 index 0000000000000000000000000000000000000000..724836ca611da7406f023d35dd82ce81c2ad46cc --- /dev/null +++ b/internal/adapters/brokeradapter/brokeradapter.go @@ -0,0 +1,26 @@ +package brokeradapter + +import "github.com/streadway/amqp" + +// A Message describes a standard message queue message within the service +type Message struct { + Headers map[string]interface{} + Body []byte +} + +// An Gateway converts AMQP formats to universal formats +type Gateway struct { +} + +// CreateGateway creates a gateway +func CreateGateway() *Gateway { + return &Gateway{} +} + +// TransformMessage transforms an AMQP delivery into a general format +func (a *Gateway) TransformMessage(msg amqp.Delivery) *Message { + return &Message{ + Headers: (map[string]interface{})(msg.Headers), + Body: msg.Body, + } +} diff --git a/internal/adapters/brokeradapter/interface.go b/internal/adapters/brokeradapter/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..d67d8a9c09e620525725049aae5faaa85b92aac3 --- /dev/null +++ b/internal/adapters/brokeradapter/interface.go @@ -0,0 +1,8 @@ +package brokeradapter + +import "github.com/streadway/amqp" + +// GatewayInterface is an interface describing a GateWay +type GatewayInterface interface { + TransformMessage(msg amqp.Delivery) *Message +} diff --git a/internal/aql/aql_test.go b/internal/aql/aql_test.go deleted file mode 100644 index 2a0c3c9e1b5dec119b5bb8f32ccf02ba9e4d3fa8..0000000000000000000000000000000000000000 --- a/internal/aql/aql_test.go +++ /dev/null @@ -1,90 +0,0 @@ -package aql - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestMock(t *testing.T) { - - assert.True(t, true, true) -} - -func TestHugeQuery(t *testing.T) { - - s := `{ - "Return": { - "Entities": [ - 0, - 1 - ], - "Relations": [ - 0 - ] - }, - "Entities": [ - { - "Type": "airports", - "Constraints": [ - { - "Attribute": "country", - "Value": "USA", - "DataType": "text", - "MatchType": "exact" - } - ] - }, - { - "Type": "airports", - "Constraints": [ - { - "Attribute": "city", - "Value": "New York", - "DataType": "text", - "MatchType": "exact" - }, - { - "Attribute": "vip", - "Value": "true", - "DataType": "bool", - "MatchType": "exact" - } - ] - } - ], - "Relations": [ - { - "Type": "flights", - "Depth": { - "min": 1, - "max": 1 - }, - "EntityFrom": 0, - "EntityTo": 1, - "Constraints": [ - { - "Attribute": "Month", - "Value": "1", - "DataType": "number", - "MatchType": "exact" - }, - { - "Attribute": "Day", - "Value": "15", - "DataType": "number", - "MatchType": "exact" - } - ] - } - ] - }` - - s3 := []byte(s) - j, _ := ConvertJSONToAQL(&s3) - - fmt.Print(j) - - assert.True(t, true, true) -} diff --git a/internal/drivers/brokerdriver/broker.go b/internal/drivers/brokerdriver/broker.go new file mode 100644 index 0000000000000000000000000000000000000000..abef043fe84c928879f35fe62903eeee52ba36a2 --- /dev/null +++ b/internal/drivers/brokerdriver/broker.go @@ -0,0 +1,86 @@ +package brokerdriver + +import ( + "os" + "query-service/internal/adapters/brokeradapter" + "query-service/pkg/errorhandler" + "strconv" + + "time" + + "github.com/thijsheijden/alice" +) + +// Driver models an Alice RabbitMQ broker +type Driver struct { + broker alice.Broker + gateway brokeradapter.GatewayInterface +} + +// CreateAliceBroker creates an Alice broker +func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver { + + // Create connection config using environment variables + rabbitUser := os.Getenv("RABBIT_USER") + rabbitPassword := os.Getenv("RABBIT_PASSWORD") + rabbitHost := os.Getenv("RABBIT_HOST") + rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) + errorhandler.FailWithError(err, "invalid rabbitmq port given") + + config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) + + return &Driver{ + broker: alice.CreateBroker(config), + gateway: gateway, + } +} + +// CreateConsumer creates an AliceConsumer on a certain exchange and queue +func (d *Driver) CreateConsumer() Consumer { + exchangeID := "query-requests" + routingKey := "aql-user-request" + + // Declare the exchange we want to bind to + exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) + if err != nil { + errorhandler.FailWithError(err, "failed to create exchange") + } + + // Declare the queue we will consume from + queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil) + + // Create the consumer + c, err := d.broker.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler) + if err != nil { + errorhandler.FailWithError(err, "failed to create consumer") + } + + consumer := &AliceConsumer{ + broker: d, + consumer: c, + } + + return consumer +} + +// CreateProducer creates an AliceProducer on a certain exchange +func (d *Driver) CreateProducer() Producer { + exchangeID := "ui-direct-exchange" + + exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) + if err != nil { + errorhandler.FailWithError(err, "failed to create exchange for producer") + } + + p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler) + if err != nil { + errorhandler.FailWithError(err, "failed to created producer") + } + + producer := &AliceProducer{ + broker: d, + producer: p, + } + + return producer +} diff --git a/internal/drivers/brokerdriver/consumer.go b/internal/drivers/brokerdriver/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..194b2ad217e79868a423f2a6d17b27bfad5805ba --- /dev/null +++ b/internal/drivers/brokerdriver/consumer.go @@ -0,0 +1,34 @@ +package brokerdriver + +import ( + "query-service/internal/adapters/brokeradapter" + + "github.com/streadway/amqp" + "github.com/thijsheijden/alice" +) + +// AliceConsumer models a RabbitMQ consumer in Alice +type AliceConsumer struct { + broker *Driver + consumer alice.Consumer + messageHandler func(msg *brokeradapter.Message) +} + +// ConsumeMessages starts the consumer using an alice consumer +func (ac *AliceConsumer) ConsumeMessages() { + go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage) +} + +func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) { + // Convert message using the gateway + // Pass message to the message handler + ac.messageHandler(ac.broker.gateway.TransformMessage(msg)) + + // Acknowledge the message was received + msg.Ack(true) +} + +// SetMessageHandler sets the message handler to the supplied function +func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) { + ac.messageHandler = handler +} diff --git a/internal/drivers/brokerdriver/interface.go b/internal/drivers/brokerdriver/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..3aacf64fea45b25f2eb1dd9c89792ceecbf8192f --- /dev/null +++ b/internal/drivers/brokerdriver/interface.go @@ -0,0 +1,24 @@ +package brokerdriver + +import ( + "query-service/internal/adapters/brokeradapter" + + "github.com/streadway/amqp" +) + +// Broker models a message broker +type Broker interface { + CreateConsumer() Consumer + CreateProducer() Producer +} + +// A Consumer belongs to a broker and consumes messages from a queue +type Consumer interface { + ConsumeMessages() + SetMessageHandler(handler func(msg *brokeradapter.Message)) +} + +// A Producer belongs to a broker and publishes messages to a queue +type Producer interface { + PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) +} diff --git a/internal/drivers/brokerdriver/mock/broker.go b/internal/drivers/brokerdriver/mock/broker.go new file mode 100644 index 0000000000000000000000000000000000000000..8483d8fdc1f2b4d2d25061596854daf041d79393 --- /dev/null +++ b/internal/drivers/brokerdriver/mock/broker.go @@ -0,0 +1,25 @@ +package mockbrokerdriver + +import ( + "query-service/internal/adapters/brokeradapter" + "query-service/internal/drivers/brokerdriver" +) + +// Driver is mock gateway +type Driver struct { + gateway brokeradapter.GatewayInterface +} + +// CreateBroker is a creates a mock driver +func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver { + return &Driver{ + gateway: gateway, + } +} + +// CreateConsumer creates a mock consumer +func (d *Driver) CreateConsumer() brokerdriver.Consumer { + return &Consumer{ + broker: d, + } +} diff --git a/internal/drivers/brokerdriver/mock/consumer.go b/internal/drivers/brokerdriver/mock/consumer.go new file mode 100644 index 0000000000000000000000000000000000000000..b3391265198146245752370f2f559524e73df65b --- /dev/null +++ b/internal/drivers/brokerdriver/mock/consumer.go @@ -0,0 +1,18 @@ +package mockbrokerdriver + +import "query-service/internal/adapters/brokeradapter" + +// Consumer is a mock consumer +type Consumer struct { + broker *Driver +} + +// ConsumeMessages mocks the consume messages func +func (c *Consumer) ConsumeMessages() { + +} + +// SetMessageHandler mocks the setting of a message handler +func (c *Consumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) { + +} diff --git a/internal/drivers/brokerdriver/producer.go b/internal/drivers/brokerdriver/producer.go new file mode 100644 index 0000000000000000000000000000000000000000..a3df373ec86e691c6a675389921a037fa7ae6ac7 --- /dev/null +++ b/internal/drivers/brokerdriver/producer.go @@ -0,0 +1,17 @@ +package brokerdriver + +import ( + "github.com/streadway/amqp" + "github.com/thijsheijden/alice" +) + +// AliceProducer models a RabbitMQ producer in Alice +type AliceProducer struct { + broker *Driver + producer alice.Producer +} + +// PublishMessage will publish a message to the specified queue id +func (ap *AliceProducer) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) { + ap.producer.PublishMessage(*body, queueID, headers) +} diff --git a/internal/drivers/keyvaluedriver/interface.go b/internal/drivers/keyvaluedriver/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..ad465b3f7f4212470076b09d9edaa38edab9892d --- /dev/null +++ b/internal/drivers/keyvaluedriver/interface.go @@ -0,0 +1,7 @@ +package keyvaluedriver + +// KeyValueStore is an interface for a key value storage +type KeyValueStore interface { + Get(key *string) *string + Set(key *string, value interface{}) error +} diff --git a/internal/drivers/keyvaluedriver/redisdriver.go b/internal/drivers/keyvaluedriver/redisdriver.go new file mode 100644 index 0000000000000000000000000000000000000000..c3420dd52d3196bdced96d97e8de3d30dccfd028 --- /dev/null +++ b/internal/drivers/keyvaluedriver/redisdriver.go @@ -0,0 +1,47 @@ +package keyvaluedriver + +import ( + "context" + "fmt" + "os" + "query-service/pkg/logger" + + "github.com/go-redis/redis/v8" +) + +// RedisDriver models the redis driver +type RedisDriver struct { + client *redis.Client +} + +// NewRedisDriver creates and returns a redis driver +func NewRedisDriver() *RedisDriver { + return &RedisDriver{} +} + +// Start starts the redis driver +func (d *RedisDriver) Start() { + // Grab the redis host and port from environment vars + redisAddress := os.Getenv("REDIS_ADDRESS") + // redisPassword := os.Getenv("REDIS_PASSWORD") + + // Create redis client + d.client = redis.NewClient(&redis.Options{ + Addr: redisAddress, + }) + + pong := d.client.Ping(context.Background()) + logger.Log(fmt.Sprintf("%v", pong)) +} + +// Get retrieves the value from the redis store that belongs to the given key +func (d *RedisDriver) Get(key *string) *string { + value := d.client.Get(context.Background(), *key).Val() + return &value +} + +// Set sets the key value pair in the redis store +func (d *RedisDriver) Set(key *string, value interface{}) error { + status := d.client.Set(context.Background(), *key, value, 0) + return status.Err() +} diff --git a/internal/messagequeue/broker.go b/internal/messagequeue/broker.go deleted file mode 100644 index 58e578bc95045ec5d45b8424a7d7374a86d8012e..0000000000000000000000000000000000000000 --- a/internal/messagequeue/broker.go +++ /dev/null @@ -1,27 +0,0 @@ -package messagequeue - -import ( - "os" - "query-service/internal/errorhandler" - "strconv" - "time" - - "github.com/thijsheijden/alice" -) - -// Create creates a broker -func Create() *alice.RabbitBroker { - // Create connection config using environment variables - rabbitUser := os.Getenv("RABBIT_USER") - rabbitPassword := os.Getenv("RABBIT_PASSWORD") - rabbitHost := os.Getenv("RABBIT_HOST") - rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) - if err != nil { - errorhandler.FailWithError(err, "port should be a number") - } - - config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) - - broker := alice.CreateBroker(config) - return broker -} diff --git a/internal/messagequeue/consumer.go b/internal/messagequeue/consumer.go deleted file mode 100644 index 8c1e5a279de010eeb8cf02d41205076fac64ba14..0000000000000000000000000000000000000000 --- a/internal/messagequeue/consumer.go +++ /dev/null @@ -1,33 +0,0 @@ -package messagequeue - -import ( - "query-service/internal/errorhandler" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" -) - -// ConsumeMessageFunc is a function type to be called when a message is consumed -type ConsumeMessageFunc func(amqp.Delivery) - -// StartConsumer will start a consumer -// When a message is received the consumeMessage function will be called -func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) { - // Declare the exchange we want to bind to - exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct) - if err != nil { - errorhandler.FailWithError(err, "failed to create exchange") - } - - // Declare the queue we will consume from - queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil) - - // Create the consumer - c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler) - if err != nil { - errorhandler.FailWithError(err, "failed to create consumer") - } - - // Start consuming messages - go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) }) -} diff --git a/internal/messagequeue/producer.go b/internal/messagequeue/producer.go deleted file mode 100644 index 7bcd9c09372670791c95fe20283a9749eb15e50e..0000000000000000000000000000000000000000 --- a/internal/messagequeue/producer.go +++ /dev/null @@ -1,22 +0,0 @@ -package messagequeue - -import ( - "query-service/internal/errorhandler" - - "github.com/thijsheijden/alice" -) - -// StartProducer starts a producer and returns it -func StartProducer(broker *alice.RabbitBroker) alice.Producer { - exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct) - if err != nil { - errorhandler.FailWithError(err, "failed to create exchange for producer") - } - - producer, err := broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler) - if err != nil { - errorhandler.FailWithError(err, "failed to created producer") - } - - return producer -} diff --git a/internal/redisclient/redis.go b/internal/redisclient/redis.go deleted file mode 100644 index 420f89dddc1ba728ffd37b746aaef090f75d44f3..0000000000000000000000000000000000000000 --- a/internal/redisclient/redis.go +++ /dev/null @@ -1,28 +0,0 @@ -package redisclient - -import ( - "context" - "fmt" - "log" - "os" - - "github.com/go-redis/redis/v8" -) - -// Conn is the redis connection -var Conn *redis.Client - -// Start starts the redis client -func Start() { - // Grab the redis host and port from environment vars - redisAddress := os.Getenv("REDIS_ADDRESS") - // redisPassword := os.Getenv("REDIS_PASSWORD") - - // Create redis client - Conn = redis.NewClient(&redis.Options{ - Addr: redisAddress, - }) - - pong := Conn.Ping(context.Background()) - log.Printf(fmt.Sprintf("%v", pong)) -} diff --git a/internal/usecases/consume/consume.go b/internal/usecases/consume/consume.go new file mode 100644 index 0000000000000000000000000000000000000000..a2426fe427b3a2e2f221da4c753d2051f29c8863 --- /dev/null +++ b/internal/usecases/consume/consume.go @@ -0,0 +1,38 @@ +package consume + +import ( + "query-service/internal/drivers/brokerdriver" + "query-service/internal/usecases/convertquery" + "query-service/internal/usecases/produce" + "query-service/internal/usecases/request" +) + +// Service wraps consumer methods +// broker is Alice broker created in brockerdriver driver +type Service struct { + broker brokerdriver.Broker + producer produce.UseCase + queryConverter convertquery.UseCase + requestSender request.UseCase +} + +// NewService creates a new service +func NewService(broker brokerdriver.Broker, produceService produce.UseCase, converQueryService convertquery.UseCase, requestSenderService request.UseCase) *Service { + return &Service{ + broker: broker, + producer: produceService, + queryConverter: converQueryService, + requestSender: requestSenderService, + } +} + +// Start starts consuming +func (s *Service) Start() { + // Create consumer + consumer := s.broker.CreateConsumer() + + consumer.SetMessageHandler(s.HandleMessage) + + // Start consuming messages + consumer.ConsumeMessages() +} diff --git a/internal/usecases/consume/handlemessage.go b/internal/usecases/consume/handlemessage.go new file mode 100644 index 0000000000000000000000000000000000000000..ba9787b8356b44f4e2cf5b14896c96771362afd3 --- /dev/null +++ b/internal/usecases/consume/handlemessage.go @@ -0,0 +1,45 @@ +package consume + +import ( + "encoding/json" + "fmt" + "query-service/internal/adapters/brokeradapter" + "query-service/pkg/errorhandler" + "query-service/pkg/logger" +) + +// HandleMessage gets called when a message is received +func (s *Service) HandleMessage(msg *brokeradapter.Message) { + // Grab sessionID from the headers + sessionID, ok := msg.Headers["sessionID"].(string) + if !ok { + // TODO: Handle error where there is no session ID supplied + } + + // Convert the json byte msg to a query string + query, err := s.queryConverter.ConvertQuery(&msg.Body) + if err != nil { + errorhandler.LogError(err, "failed to parse incoming msg to query language") // TODO: don't panic on error, send error message to client instead + return + } + fmt.Println("Query: " + *query) + + // Make request to database + // TODO : Generate database seperately + // execute and retrieve result + // convert result to general (node-link (?)) format + result, err := s.requestSender.SendAQLQuery(*query) + if err != nil { + logger.Log(err.Error()) + return // TODO: Send message in queue notifying of error + } + + // Add type indicator to result from database + querymap := make(map[string]interface{}) + querymap["type"] = "query_result" + querymap["values"] = *result + querybyte, err := json.Marshal(querymap) + //fmt.Println(querymap) + + s.producer.PublishMessage(&querybyte, &sessionID) +} diff --git a/internal/usecases/consume/interface.go b/internal/usecases/consume/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..f2bb8d785dfe95e1124b392adca15ecabedc1438 --- /dev/null +++ b/internal/usecases/consume/interface.go @@ -0,0 +1,6 @@ +package consume + +// UseCase is an interface describing the socket usecases +type UseCase interface { + Start() +} diff --git a/internal/aql/aql.go b/internal/usecases/convertquery/aql.go similarity index 97% rename from internal/aql/aql.go rename to internal/usecases/convertquery/aql.go index 95f8e9699b6523d22ee29ff2d7d563ce68ec6895..dd776e223e749bf1251b2be9e3cb2f42196379d0 100644 --- a/internal/aql/aql.go +++ b/internal/usecases/convertquery/aql.go @@ -1,4 +1,4 @@ -package aql +package convertquery import ( "encoding/json" @@ -7,6 +7,15 @@ import ( "strings" ) +// Service is a model for the convertquery use case +type Service struct { +} + +// NewService creates a new convertquery service +func NewService() *Service { + return &Service{} +} + /* // Query format for exporting to JSON export type JSONFormat = { @@ -78,7 +87,6 @@ type Constraint = { ] } -&{{[0] [0]} [] [{Flight 0 -1 {1 3} []}]} */ // Constraint datatypes @@ -123,8 +131,8 @@ type constraintStruct struct { MatchType string } -// ConvertJSONToAQL converts a json string to an AQL query -func ConvertJSONToAQL(jsonMsg *[]byte) (*string, error) { +// ConvertQuery converts a json string to an AQL query +func (s *Service) ConvertQuery(jsonMsg *[]byte) (*string, error) { jsonStruct, err := convertJSONToStruct(jsonMsg) if err != nil { diff --git a/internal/usecases/convertquery/aql_test.go b/internal/usecases/convertquery/aql_test.go new file mode 100644 index 0000000000000000000000000000000000000000..8563c2d657ba7d04bb86daf064ad8bc29412ee52 --- /dev/null +++ b/internal/usecases/convertquery/aql_test.go @@ -0,0 +1,89 @@ +package convertquery + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestMock(t *testing.T) { + + assert.True(t, true, true) +} + +// func TestHugeQuery(t *testing.T) { + +// s := `{ +// "Return": { +// "Entities": [ +// 0, +// 1 +// ], +// "Relations": [ +// 0 +// ] +// }, +// "Entities": [ +// { +// "Type": "airports", +// "Constraints": [ +// { +// "Attribute": "country", +// "Value": "USA", +// "DataType": "text", +// "MatchType": "exact" +// } +// ] +// }, +// { +// "Type": "airports", +// "Constraints": [ +// { +// "Attribute": "city", +// "Value": "New York", +// "DataType": "text", +// "MatchType": "exact" +// }, +// { +// "Attribute": "vip", +// "Value": "true", +// "DataType": "bool", +// "MatchType": "exact" +// } +// ] +// } +// ], +// "Relations": [ +// { +// "Type": "flights", +// "Depth": { +// "min": 1, +// "max": 1 +// }, +// "EntityFrom": 0, +// "EntityTo": 1, +// "Constraints": [ +// { +// "Attribute": "Month", +// "Value": "1", +// "DataType": "number", +// "MatchType": "exact" +// }, +// { +// "Attribute": "Day", +// "Value": "15", +// "DataType": "number", +// "MatchType": "exact" +// } +// ] +// } +// ] +// }` + +// s3 := []byte(s) +// j, _ := ConvertQuery(&s3) + +// fmt.Print(j) + +// assert.True(t, true, true) +// } diff --git a/internal/aql/benchmark.txt b/internal/usecases/convertquery/benchmark.txt similarity index 100% rename from internal/aql/benchmark.txt rename to internal/usecases/convertquery/benchmark.txt diff --git a/internal/usecases/convertquery/interface.go b/internal/usecases/convertquery/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..5155e7bce52b1941de5726a18c1d6f7133ca6970 --- /dev/null +++ b/internal/usecases/convertquery/interface.go @@ -0,0 +1,7 @@ +package convertquery + +// UseCase is an interface describing a function for converting a visual query +// to a query of the database language +type UseCase interface { + ConvertQuery(jsonMsg *[]byte) (*string, error) +} diff --git a/internal/usecases/produce/interface.go b/internal/usecases/produce/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..f3b75e8d57fa6b5edb5c1b79699b6c5267f2f9c0 --- /dev/null +++ b/internal/usecases/produce/interface.go @@ -0,0 +1,6 @@ +package produce + +// UseCase is an interface describing the produce usecases +type UseCase interface { + PublishMessage(data *[]byte, sessionID *string) +} diff --git a/internal/usecases/produce/produce.go b/internal/usecases/produce/produce.go new file mode 100644 index 0000000000000000000000000000000000000000..cd1e0c9b6351f555ef8d86b80c71b64db873a38d --- /dev/null +++ b/internal/usecases/produce/produce.go @@ -0,0 +1,44 @@ +package produce + +import ( + "query-service/internal/drivers/brokerdriver" + "query-service/internal/drivers/keyvaluedriver" + + "github.com/streadway/amqp" +) + +// Service wraps consumer methods +type Service struct { + brokerDriver brokerdriver.Broker + keyValueStore keyvaluedriver.KeyValueStore + producerDriver brokerdriver.Producer +} + +// NewService creates a new service +func NewService(broker brokerdriver.Broker, keyValueStore keyvaluedriver.KeyValueStore) *Service { + return &Service{ + brokerDriver: broker, + keyValueStore: keyValueStore, + } +} + +// Start starts the producer +func (s *Service) Start() { + // Create producer + p := s.brokerDriver.CreateProducer() + + s.producerDriver = p + + // // Start consuming messages + // p.ConsumeMessages() +} + +// PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID +func (s *Service) PublishMessage(data *[]byte, sessionID *string) { + clientUpdaterID := s.keyValueStore.Get(sessionID) + + headers := amqp.Table{} + headers["sessionID"] = sessionID + headers["type"] = "queryResult" + s.producerDriver.PublishMessage(data, clientUpdaterID, &headers) +} diff --git a/internal/usecases/request/interface.go b/internal/usecases/request/interface.go new file mode 100644 index 0000000000000000000000000000000000000000..e8847b81e2001e2770445db49c22fb978c270a60 --- /dev/null +++ b/internal/usecases/request/interface.go @@ -0,0 +1,6 @@ +package request + +// UseCase is an interface describing the request usecases +type UseCase interface { + SendAQLQuery(query string) (*[]Document, error) +} diff --git a/internal/request/request.go b/internal/usecases/request/request.go similarity index 91% rename from internal/request/request.go rename to internal/usecases/request/request.go index ea6058b4c7b06cb457cbf8b9e426949d17a44a63..51b451600aa95af457884e1678edab0a097cc21b 100644 --- a/internal/request/request.go +++ b/internal/usecases/request/request.go @@ -13,6 +13,15 @@ import ( "github.com/arangodb/go-driver/http" ) +// Service is a struct used to store this use case in +type Service struct { +} + +// NewService creates a new instantion of this use case +func NewService() *Service { + return &Service{} +} + // Document with Empty struct to retrieve all data from the DB Document type Document map[string]interface{} @@ -24,7 +33,7 @@ type GeneralFormat map[string][]Document //map[1 , 2 , 3 map [ .. ]] // SendAQLQuery send AQL string query to database and returns a JSON object in a general format -func SendAQLQuery(AQLQuery string) (*[]Document, error) { +func (s *Service) SendAQLQuery(AQLQuery string) (*[]Document, error) { var queryResult []Document conn, err := http.NewConnection(http.ConnectionConfig{ Endpoints: []string{"https://aae8f5c054da.arangodb.cloud:8529"}, diff --git a/main.exe b/main.exe deleted file mode 100644 index cf37186e615e25dabd0ee61f39baf8b95aa572c0..0000000000000000000000000000000000000000 Binary files a/main.exe and /dev/null differ diff --git a/internal/errorhandler/errorhandler.go b/pkg/errorhandler/errorhandler.go similarity index 100% rename from internal/errorhandler/errorhandler.go rename to pkg/errorhandler/errorhandler.go diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go new file mode 100644 index 0000000000000000000000000000000000000000..2b795b20a0d99c8ebee5649b898cc136f09e5ff5 --- /dev/null +++ b/pkg/logger/logger.go @@ -0,0 +1,26 @@ +package logger + +import ( + "log" + "os" + "query-service/pkg/errorhandler" + "strconv" +) + +var logMessages bool + +// Start error logging +func Start() { + var err error + logMessages, err = strconv.ParseBool(os.Getenv("LOG_MESSAGES")) + if err != nil { + errorhandler.FailWithError(err, "invalid LOG_MESSAGES env variable") + } +} + +// Log logs a message to the console if logging is turned on +func Log(msg string) { + if logMessages { + log.Println(msg) + } +} diff --git a/result.json b/result.json index 4e4af0a83944a020ecf8aee9af950e7ad52a437a..b3d8db87d40f6145077b86578c6281f62e5eaeae 100644 --- a/result.json +++ b/result.json @@ -82,19 +82,5 @@ "state": "NY", "vip": false } - }, - { - "_id": "airports/01J", - "_key": "01J", - "_rev": "_cIYKbri--H", - "attributes": { - "city": "Hilliard", - "country": "USA", - "lat": 30.6880125, - "long": -81.90594389, - "name": "Hilliard Airpark", - "state": "FL", - "vip": false - } } ] \ No newline at end of file