Skip to content
Snippets Groups Projects
Commit 11371eed authored by sivan's avatar sivan
Browse files

refactoring kinda done

parent 5479b2dc
No related branches found
No related tags found
No related merge requests found
Showing
with 299 additions and 336 deletions
...@@ -35,16 +35,14 @@ run: ...@@ -35,16 +35,14 @@ run:
./builds/main ./builds/main
develop: develop:
# RabbitMQ env variables
$(eval export RABBIT_USER := guest) $(eval export RABBIT_USER := haha-test)
$(eval export RABBIT_PASSWORD := guest) $(eval export RABBIT_PASSWORD := dikkedraak)
$(eval export RABBIT_HOST := localhost) $(eval export RABBIT_HOST := 192.168.178.158)
$(eval export RABBIT_PORT := 5672) $(eval export RABBIT_PORT := 5672)
# Whether to log
$(eval export LOG_MESSAGES := true) $(eval export LOG_MESSAGES := true)
# Redis env variables
$(eval export REDIS_ADDRESS := localhost:6379) $(eval export REDIS_ADDRESS := localhost:6379)
@go run cmd/query-service/main.go @go run cmd/query-service/main.go
......
package main package main
import ( import (
"context" "query-service/internal/adapters/brokeradapter"
"encoding/json" "query-service/internal/drivers/brokerdriver"
"fmt" "query-service/internal/drivers/keyvaluedriver"
"log" "query-service/internal/usecases/consume"
"query-service/internal/aql" "query-service/internal/usecases/convertquery"
"query-service/internal/entity" "query-service/internal/usecases/produce"
"query-service/internal/errorhandler" "query-service/internal/usecases/request"
"query-service/internal/messagequeue" "query-service/pkg/logger"
"query-service/internal/redisclient"
"query-service/internal/usecases/redisclient"
"query-service/internal/request"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
) )
var producer alice.Producer
func main() { func main() {
//FORGLORY() logger.Start()
// Initialize redis client
entity.NewRedisClient()
// MARK: Create relevant services // MARK: Create relevant services
redisService := redisclient.NewService() redisService := keyvaluedriver.NewRedisDriver()
exchangeID := "query-requests"
routingKey := "aql-user-request"
broker := messagequeue.Create() // MARK: Create alice RabbitMQ services
messagequeue.StartConsumer(broker, &exchangeID, &routingKey, onMessageReceived) brokerGateway := brokeradapter.CreateGateway()
aliceBroker := brokerdriver.CreateAliceBroker(brokerGateway)
producer = messagequeue.StartProducer(broker) // Instantiate an implementation of the produce UseCase
produceService := produce.NewService(aliceBroker, redisService)
redisclient.Start() // MARK: Create relevant services for consuming a message
convertQueryService := convertquery.NewService()
select {} requestSenderService := request.NewService()
// file, _ := ioutil.ReadFile("./internal/data/jsonQuery.json")
// query, _ := aql.ConvertJSONToAQL(&file)
// result, _ := request.SendAQLQuery(*query)
// //fmt.Println(string(*result))
// querymap := make(map[string]interface{})
// querymap["type"] = "query_result"
// querymap["values"] = string(*result)
// fmt.Println(querymap)
}
func onMessageReceived(msg amqp.Delivery) { consumeService := consume.NewService(aliceBroker, produceService, convertQueryService, requestSenderService)
// Retrieve JSON formatted string payload from msg
// Bypass MSQ // MARK: Start services
// jsonQuery, _ := ioutil.ReadFile("./internal/data/jsonQuery.json") produceService.Start()
aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body)
// Convert the json byte msg to an aql query string go consumeService.Start()
//aqlQuery, err := aql.ConvertJSONToAQL(&msg.Body)
if err != nil {
errorhandler.LogError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead
return
}
fmt.Println("Query: " + *aqlQuery)
// Get the queueID for this sessionID select {}
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
log.Println("No sessionID passed in message")
return
}
// Get queueID for this client
clientUpdaterID := redisclient.Conn.Get(context.Background(), sessionID).Val()
log.Println(clientUpdaterID)
// producer.PublishMessage(aqlQuery, , &amqp.Table{})
// TODO : Generate database seperately
// execute and retrieve result
// convert result to general (node-link (?)) format
result, err := request.SendAQLQuery(*aqlQuery)
if err != nil {
return // TODO: Send message in queue notifying of error
}
// TODO: Test MQ result
querymap := make(map[string]interface{})
querymap["type"] = "query_result"
querymap["values"] = *result
querybyte, err := json.Marshal(querymap)
//fmt.Println(querymap)
// Publish converted result
headers := amqp.Table{}
headers["sessionID"] = sessionID
headers["type"] = "schemaResult"
log.Println("publishing message")
producer.PublishMessage(querybyte, &clientUpdaterID, &headers)
msg.Ack(true)
} }
// func FORGLORY() {
// s := `{
// "Return": {
// "Entities": [
// 0,
// 1
// ],
// "Relations": [
// 0
// ]
// },
// "Entities": [
// {
// "Type": "airports",
// "Constraints": [
// {
// "Attribute": "country",
// "Value": "USA",
// "DataType": "text",
// "MatchType": "exact"
// }
// ]
// },
// {
// "Type": "airports",
// "Constraints": [
// {
// "Attribute": "city",
// "Value": "New York",
// "DataType": "text",
// "MatchType": "exact"
// },
// {
// "Attribute": "vip",
// "Value": "true",
// "DataType": "bool",
// "MatchType": "exact"
// }
// ]
// }
// ],
// "Relations": [
// {
// "Type": "flights",
// "Depth": {
// "min": 1,
// "max": 1
// },
// "EntityFrom": 0,
// "EntityTo": 1,
// "Constraints": [
// {
// "Attribute": "Month",
// "Value": "1",
// "DataType": "number",
// "MatchType": "exact"
// },
// {
// "Attribute": "Day",
// "Value": "15",
// "DataType": "number",
// "MatchType": "exact"
// }
// ]
// }
// ]
// }`
// s3 := []byte(s)
// yeet, _ := aql.ConvertJSONToAQL(&s3)
// fmt.Print(*yeet)
// }
...@@ -4,8 +4,8 @@ import ( ...@@ -4,8 +4,8 @@ import (
"os" "os"
"query-service/internal/adapters/brokeradapter" "query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler" "query-service/pkg/errorhandler"
"strconv" "strconv"
"time" "time"
"github.com/thijsheijden/alice" "github.com/thijsheijden/alice"
...@@ -63,9 +63,11 @@ func (d *Driver) CreateConsumer() Consumer { ...@@ -63,9 +63,11 @@ func (d *Driver) CreateConsumer() Consumer {
return consumer return consumer
} }
// CreateProducer creates an AliceProducer on a certain exchange // CreateAliceProducer creates an AliceProducer on a certain exchange
func (d *Driver) CreateProducer() Producer { func (d *Driver) CreateProducer() Producer {
exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct) exchangeID := "ui-direct-exchange"
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil { if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer") errorhandler.FailWithError(err, "failed to create exchange for producer")
} }
......
...@@ -14,7 +14,7 @@ type AliceConsumer struct { ...@@ -14,7 +14,7 @@ type AliceConsumer struct {
messageHandler func(msg *brokeradapter.Message) messageHandler func(msg *brokeradapter.Message)
} }
// ConsumeMessages starts the consumer // ConsumeMessages starts the consumer using an alice consumer
func (ac *AliceConsumer) ConsumeMessages() { func (ac *AliceConsumer) ConsumeMessages() {
go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage) go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage)
} }
......
...@@ -20,5 +20,5 @@ type Consumer interface { ...@@ -20,5 +20,5 @@ type Consumer interface {
// A Producer belongs to a broker and publishes messages to a queue // A Producer belongs to a broker and publishes messages to a queue
type Producer interface { type Producer interface {
PublishMessage(body []byte, queueID *string, headers amqp.Table) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table)
} }
...@@ -12,6 +12,6 @@ type AliceProducer struct { ...@@ -12,6 +12,6 @@ type AliceProducer struct {
} }
// PublishMessage will publish a message to the specified queue id // PublishMessage will publish a message to the specified queue id
func (ap *AliceProducer) PublishMessage(body []byte, queueID *string, headers *amqp.Table) { func (ap *AliceProducer) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) {
ap.producer.PublishMessage(body, queueID, headers) ap.producer.PublishMessage(*body, queueID, headers)
} }
package keyvaluedriver
// KeyValueStore is an interface for a key value storage
type KeyValueStore interface {
Get(key *string) *string
Set(key *string, value interface{}) error
}
package keyvaluedriver
import (
"context"
"fmt"
"os"
"query-service/pkg/logger"
"github.com/go-redis/redis/v8"
)
// RedisDriver models the redis driver
type RedisDriver struct {
client *redis.Client
}
// NewRedisDriver creates and returns a redis driver
func NewRedisDriver() *RedisDriver {
return &RedisDriver{}
}
// Start starts the redis driver
func (d *RedisDriver) Start() {
// Grab the redis host and port from environment vars
redisAddress := os.Getenv("REDIS_ADDRESS")
// redisPassword := os.Getenv("REDIS_PASSWORD")
// Create redis client
d.client = redis.NewClient(&redis.Options{
Addr: redisAddress,
})
pong := d.client.Ping(context.Background())
logger.Log(fmt.Sprintf("%v", pong))
}
// Get retrieves the value from the redis store that belongs to the given key
func (d *RedisDriver) Get(key *string) *string {
value := d.client.Get(context.Background(), *key).Val()
return &value
}
// Set sets the key value pair in the redis store
func (d *RedisDriver) Set(key *string, value interface{}) error {
status := d.client.Set(context.Background(), *key, value, 0)
return status.Err()
}
...@@ -14,15 +14,5 @@ var RedisClient *redis.Client ...@@ -14,15 +14,5 @@ var RedisClient *redis.Client
// NewRedisClient creates a new redis client // NewRedisClient creates a new redis client
func NewRedisClient() { func NewRedisClient() {
// Grab the redis host and port from environment vars
redisAddress := os.Getenv("REDIS_ADDRESS")
// redisPassword := os.Getenv("REDIS_PASSWORD")
// Create redis client
RedisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
})
pong := RedisClient.Ping(context.Background())
logger.Log(fmt.Sprintf("%v", pong))
} }
package consume package consume
import ( import (
"errors" "query-service/internal/drivers/brokerdriver"
"query-service/internal/errorhandler" "query-service/internal/usecases/convertquery"
"query-service/pkg/logger" "query-service/internal/usecases/produce"
"query-service/internal/usecases/request"
) )
// Service wraps consumer methods // Service wraps consumer methods
// broker is Alice broker created in brockerdriver driver
type Service struct { type Service struct {
broker brokerdriver.Broker broker brokerdriver.Broker
producer produce.UseCase
queryConverter convertquery.UseCase
requestSender request.UseCase
} }
// NewService creates a new service // NewService creates a new service
func NewService(broker brokerdriver.Broker) *Service { func NewService(broker brokerdriver.Broker, produceService produce.UseCase, converQueryService convertquery.UseCase, requestSenderService request.UseCase) *Service {
return &Service{ return &Service{
broker: broker, broker: broker,
producer: produceService,
queryConverter: converQueryService,
requestSender: requestSenderService,
} }
} }
...@@ -28,51 +36,3 @@ func (s *Service) Start() { ...@@ -28,51 +36,3 @@ func (s *Service) Start() {
// Start consuming messages // Start consuming messages
consumer.ConsumeMessages() consumer.ConsumeMessages()
} }
func (s *Service) HandleMessage(msg *brokeradapter.Message) {
// Grab sessionID from the headers
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
// TODO: Handle error where there is no session ID supplied
}
// Look at the result type in the headers
switch msg.Headers["type"] {
case "schemaResult": // A schema result
logger.Log("Received schema result for session " + sessionID)
// Send the result into the websocket
s.socketService.WriteToClient(&sessionID, msg.Body)
case "queryResult": // A query result
logger.Log("Received query result for session " + sessionID)
// Send the result into the websocket
s.socketService.WriteToClient(&sessionID, msg.Body)
default:
logger.Log("Received untyped message for " + sessionID)
errorhandler.LogError(errors.New("undefined message type sent"), "error while consuming")
}
}
// // ConsumeMessageFunc is a function type to be called when a message is consumed
// type ConsumeMessageFunc func(amqp.Delivery)
// // StartConsumer will start a consumer
// // When a message is received the consumeMessage function will be called
// func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) {
// // Declare the exchange we want to bind to
// exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct)
// if err != nil {
// errorhandler.FailWithError(err, "failed to create exchange")
// }
// // Declare the queue we will consume from
// queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// // Create the consumer
// c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler)
// if err != nil {
// errorhandler.FailWithError(err, "failed to create consumer")
// }
// // Start consuming messages
// go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) })
// }
package consume
import (
"encoding/json"
"fmt"
"query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler"
"query-service/pkg/logger"
)
// HandleMessage gets called when a message is received
func (s *Service) HandleMessage(msg *brokeradapter.Message) {
// Grab sessionID from the headers
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
// TODO: Handle error where there is no session ID supplied
}
// Convert the json byte msg to a query string
query, err := s.queryConverter.ConvertQuery(&msg.Body)
if err != nil {
errorhandler.LogError(err, "failed to parse incoming msg to query language") // TODO: don't panic on error, send error message to client instead
return
}
fmt.Println("Query: " + *query)
// Make request to database
// TODO : Generate database seperately
// execute and retrieve result
// convert result to general (node-link (?)) format
result, err := s.requestSender.SendAQLQuery(*query)
if err != nil {
logger.Log(err.Error())
return // TODO: Send message in queue notifying of error
}
// Add type indicator to result from database
querymap := make(map[string]interface{})
querymap["type"] = "query_result"
querymap["values"] = *result
querybyte, err := json.Marshal(querymap)
//fmt.Println(querymap)
s.producer.PublishMessage(&querybyte, &sessionID)
}
...@@ -3,5 +3,4 @@ package consume ...@@ -3,5 +3,4 @@ package consume
// UseCase is an interface describing the socket usecases // UseCase is an interface describing the socket usecases
type UseCase interface { type UseCase interface {
Start() Start()
ConsumeMessages()
} }
package queryconversion package convertquery
import ( import (
"encoding/json" "encoding/json"
...@@ -7,6 +7,15 @@ import ( ...@@ -7,6 +7,15 @@ import (
"strings" "strings"
) )
// Service is a model for the convertquery use case
type Service struct {
}
// NewService creates a new convertquery service
func NewService() *Service {
return &Service{}
}
/* /*
// Query format for exporting to JSON // Query format for exporting to JSON
export type JSONFormat = { export type JSONFormat = {
...@@ -123,7 +132,7 @@ type constraintStruct struct { ...@@ -123,7 +132,7 @@ type constraintStruct struct {
} }
// ConvertQuery converts a json string to an AQL query // ConvertQuery converts a json string to an AQL query
func ConvertQuery(jsonMsg *[]byte) (*string, error) { func (s *Service) ConvertQuery(jsonMsg *[]byte) (*string, error) {
jsonStruct, err := convertJSONToStruct(jsonMsg) jsonStruct, err := convertJSONToStruct(jsonMsg)
if err != nil { if err != nil {
......
package convertquery
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestMock(t *testing.T) {
assert.True(t, true, true)
}
// func TestHugeQuery(t *testing.T) {
// s := `{
// "Return": {
// "Entities": [
// 0,
// 1
// ],
// "Relations": [
// 0
// ]
// },
// "Entities": [
// {
// "Type": "airports",
// "Constraints": [
// {
// "Attribute": "country",
// "Value": "USA",
// "DataType": "text",
// "MatchType": "exact"
// }
// ]
// },
// {
// "Type": "airports",
// "Constraints": [
// {
// "Attribute": "city",
// "Value": "New York",
// "DataType": "text",
// "MatchType": "exact"
// },
// {
// "Attribute": "vip",
// "Value": "true",
// "DataType": "bool",
// "MatchType": "exact"
// }
// ]
// }
// ],
// "Relations": [
// {
// "Type": "flights",
// "Depth": {
// "min": 1,
// "max": 1
// },
// "EntityFrom": 0,
// "EntityTo": 1,
// "Constraints": [
// {
// "Attribute": "Month",
// "Value": "1",
// "DataType": "number",
// "MatchType": "exact"
// },
// {
// "Attribute": "Day",
// "Value": "15",
// "DataType": "number",
// "MatchType": "exact"
// }
// ]
// }
// ]
// }`
// s3 := []byte(s)
// j, _ := ConvertQuery(&s3)
// fmt.Print(j)
// assert.True(t, true, true)
// }
package queryconversion package convertquery
// UseCase is an interface describing a function for converting a visual query // UseCase is an interface describing a function for converting a visual query
// to a query of the database language // to a query of the database language
......
package produce package produce
// UseCase is an interface describing the produce usecases
type UseCase interface {
PublishMessage(data *[]byte, sessionID *string)
}
package produce package produce
import (
"query-service/internal/drivers/brokerdriver"
"query-service/internal/drivers/keyvaluedriver"
"github.com/streadway/amqp"
)
// Service wraps consumer methods
type Service struct {
brokerDriver brokerdriver.Broker
keyValueStore keyvaluedriver.KeyValueStore
producerDriver brokerdriver.Producer
}
// NewService creates a new service
func NewService(broker brokerdriver.Broker, keyValueStore keyvaluedriver.KeyValueStore) *Service {
return &Service{
brokerDriver: broker,
keyValueStore: keyValueStore,
}
}
// Start starts the producer
func (s *Service) Start() {
// Create producer
p := s.brokerDriver.CreateProducer()
s.producerDriver = p
// // Start consuming messages
// p.ConsumeMessages()
}
// PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID
func (s *Service) PublishMessage(data *[]byte, sessionID *string) {
clientUpdaterID := s.keyValueStore.Get(sessionID)
headers := amqp.Table{}
headers["sessionID"] = sessionID
headers["type"] = "queryResult"
s.producerDriver.PublishMessage(data, clientUpdaterID, &headers)
}
package queryconversion
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func TestMock(t *testing.T) {
assert.True(t, true, true)
}
func TestHugeQuery(t *testing.T) {
s := `{
"Return": {
"Entities": [
0,
1
],
"Relations": [
0
]
},
"Entities": [
{
"Type": "airports",
"Constraints": [
{
"Attribute": "country",
"Value": "USA",
"DataType": "text",
"MatchType": "exact"
}
]
},
{
"Type": "airports",
"Constraints": [
{
"Attribute": "city",
"Value": "New York",
"DataType": "text",
"MatchType": "exact"
},
{
"Attribute": "vip",
"Value": "true",
"DataType": "bool",
"MatchType": "exact"
}
]
}
],
"Relations": [
{
"Type": "flights",
"Depth": {
"min": 1,
"max": 1
},
"EntityFrom": 0,
"EntityTo": 1,
"Constraints": [
{
"Attribute": "Month",
"Value": "1",
"DataType": "number",
"MatchType": "exact"
},
{
"Attribute": "Day",
"Value": "15",
"DataType": "number",
"MatchType": "exact"
}
]
}
]
}`
s3 := []byte(s)
j, _ := ConvertQuery(&s3)
fmt.Print(j)
assert.True(t, true, true)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment