Skip to content
Snippets Groups Projects
Commit 5479b2dc authored by sivan's avatar sivan
Browse files

refactoring to clean, work in progress

parent 05dc5fed
No related branches found
No related tags found
No related merge requests found
Showing
with 361 additions and 93 deletions
......@@ -6,9 +6,11 @@ import (
"fmt"
"log"
"query-service/internal/aql"
"query-service/internal/entity"
"query-service/internal/errorhandler"
"query-service/internal/messagequeue"
"query-service/internal/redisclient"
"query-service/internal/usecases/redisclient"
"query-service/internal/request"
......@@ -21,6 +23,12 @@ var producer alice.Producer
func main() {
//FORGLORY()
// Initialize redis client
entity.NewRedisClient()
// MARK: Create relevant services
redisService := redisclient.NewService()
exchangeID := "query-requests"
routingKey := "aql-user-request"
......@@ -67,12 +75,12 @@ func onMessageReceived(msg amqp.Delivery) {
}
// Get queueID for this client
queueID := redisclient.Conn.Get(context.Background(), sessionID).Val()
log.Println(queueID)
clientUpdaterID := redisclient.Conn.Get(context.Background(), sessionID).Val()
log.Println(clientUpdaterID)
// producer.PublishMessage(aqlQuery, , &amqp.Table{})
// TODO : Generate databse seperatly
// TODO : Generate database seperately
// execute and retrieve result
// convert result to general (node-link (?)) format
result, err := request.SendAQLQuery(*aqlQuery)
......@@ -87,12 +95,12 @@ func onMessageReceived(msg amqp.Delivery) {
querybyte, err := json.Marshal(querymap)
//fmt.Println(querymap)
// publish converted result
// Publish converted result
headers := amqp.Table{}
headers["sessionID"] = sessionID
headers["type"] = "schemaResult"
log.Println("publishing message")
producer.PublishMessage(querybyte, &queueID, &headers)
producer.PublishMessage(querybyte, &clientUpdaterID, &headers)
msg.Ack(true)
}
......
package brokeradapter
import "github.com/streadway/amqp"
// A Message describes a standard message queue message within the service
type Message struct {
Headers map[string]interface{}
Body []byte
}
// An Gateway converts AMQP formats to universal formats
type Gateway struct {
}
func CreateGateway() *Gateway {
return &Gateway{}
}
// TransformMessage transforms an AMQP delivery into a general format
func (a *Gateway) TransformMessage(msg amqp.Delivery) *Message {
return &Message{
Headers: (map[string]interface{})(msg.Headers),
Body: msg.Body,
}
}
package brokeradapter
import "github.com/streadway/amqp"
type GatewayInterface interface {
TransformMessage(msg amqp.Delivery) *Message
}
package brokerdriver
import (
"os"
"query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler"
"strconv"
"time"
"github.com/thijsheijden/alice"
)
// Driver models an Alice RabbitMQ broker
type Driver struct {
broker alice.Broker
gateway brokeradapter.GatewayInterface
}
// CreateAliceBroker creates an Alice broker
func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
errorhandler.FailWithError(err, "invalid rabbitmq port given")
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
return &Driver{
broker: alice.CreateBroker(config),
gateway: gateway,
}
}
// CreateConsumer creates an AliceConsumer on a certain exchange and queue
func (d *Driver) CreateConsumer() Consumer {
exchangeID := "query-requests"
routingKey := "aql-user-request"
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// Create the consumer
c, err := d.broker.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
consumer := &AliceConsumer{
broker: d,
consumer: c,
}
return consumer
}
// CreateProducer creates an AliceProducer on a certain exchange
func (d *Driver) CreateProducer() Producer {
exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer")
}
p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to created producer")
}
producer := &AliceProducer{
broker: d,
producer: p,
}
return producer
}
package brokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// AliceConsumer models a RabbitMQ consumer in Alice
type AliceConsumer struct {
broker *Driver
consumer alice.Consumer
messageHandler func(msg *brokeradapter.Message)
}
// ConsumeMessages starts the consumer
func (ac *AliceConsumer) ConsumeMessages() {
go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage)
}
func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) {
// Convert message using the gateway
// Pass message to the message handler
ac.messageHandler(ac.broker.gateway.TransformMessage(msg))
// Acknowledge the message was received
msg.Ack(true)
}
// SetMessageHandler sets the message handler to the supplied function
func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
ac.messageHandler = handler
}
package brokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
)
// Broker models a message broker
type Broker interface {
CreateConsumer() Consumer
CreateProducer() Producer
}
// A Consumer belongs to a broker and consumes messages from a queue
type Consumer interface {
ConsumeMessages()
SetMessageHandler(handler func(msg *brokeradapter.Message))
}
// A Producer belongs to a broker and publishes messages to a queue
type Producer interface {
PublishMessage(body []byte, queueID *string, headers amqp.Table)
}
package mockbrokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"query-service/internal/drivers/brokerdriver"
)
type Driver struct {
gateway brokeradapter.GatewayInterface
}
func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver {
return &Driver{
gateway: gateway,
}
}
func (d *Driver) CreateConsumer() brokerdriver.Consumer {
return &Consumer{
broker: d,
}
}
package mockbrokerdriver
import "query-service/internal/adapters/brokeradapter"
type Consumer struct {
broker *Driver
}
func (c *Consumer) ConsumeMessages() {
}
func (c *Consumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
}
package brokerdriver
import (
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// AliceProducer models a RabbitMQ producer in Alice
type AliceProducer struct {
broker *Driver
producer alice.Producer
}
// PublishMessage will publish a message to the specified queue id
func (ap *AliceProducer) PublishMessage(body []byte, queueID *string, headers *amqp.Table) {
ap.producer.PublishMessage(body, queueID, headers)
}
package redisclient
package entity
import (
"context"
"fmt"
"log"
"os"
"query-service/pkg/logger"
"github.com/go-redis/redis/v8"
)
// Conn is the redis connection
var Conn *redis.Client
// RedisClient is the Redis client used throughout the service
var RedisClient *redis.Client
// Start starts the redis client
func Start() {
// NewRedisClient creates a new redis client
func NewRedisClient() {
// Grab the redis host and port from environment vars
redisAddress := os.Getenv("REDIS_ADDRESS")
// redisPassword := os.Getenv("REDIS_PASSWORD")
// Create redis client
Conn = redis.NewClient(&redis.Options{
RedisClient = redis.NewClient(&redis.Options{
Addr: redisAddress,
})
pong := Conn.Ping(context.Background())
log.Printf(fmt.Sprintf("%v", pong))
pong := RedisClient.Ping(context.Background())
logger.Log(fmt.Sprintf("%v", pong))
}
package messagequeue
import (
"os"
"query-service/internal/errorhandler"
"strconv"
"time"
"github.com/thijsheijden/alice"
)
// Create creates a broker
func Create() *alice.RabbitBroker {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
if err != nil {
errorhandler.FailWithError(err, "port should be a number")
}
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
broker := alice.CreateBroker(config)
return broker
}
package messagequeue
import (
"query-service/internal/errorhandler"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// ConsumeMessageFunc is a function type to be called when a message is consumed
type ConsumeMessageFunc func(amqp.Delivery)
// StartConsumer will start a consumer
// When a message is received the consumeMessage function will be called
func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) {
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// Create the consumer
c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
// Start consuming messages
go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) })
}
package messagequeue
import (
"query-service/internal/errorhandler"
"github.com/thijsheijden/alice"
)
// StartProducer starts a producer and returns it
func StartProducer(broker *alice.RabbitBroker) alice.Producer {
exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer")
}
producer, err := broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to created producer")
}
return producer
}
package consume
import (
"errors"
"query-service/internal/errorhandler"
"query-service/pkg/logger"
)
// Service wraps consumer methods
type Service struct {
broker brokerdriver.Broker
}
// NewService creates a new service
func NewService(broker brokerdriver.Broker) *Service {
return &Service{
broker: broker,
}
}
// Start starts consuming
func (s *Service) Start() {
// Create consumer
consumer := s.broker.CreateConsumer()
consumer.SetMessageHandler(s.HandleMessage)
// Start consuming messages
consumer.ConsumeMessages()
}
func (s *Service) HandleMessage(msg *brokeradapter.Message) {
// Grab sessionID from the headers
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
// TODO: Handle error where there is no session ID supplied
}
// Look at the result type in the headers
switch msg.Headers["type"] {
case "schemaResult": // A schema result
logger.Log("Received schema result for session " + sessionID)
// Send the result into the websocket
s.socketService.WriteToClient(&sessionID, msg.Body)
case "queryResult": // A query result
logger.Log("Received query result for session " + sessionID)
// Send the result into the websocket
s.socketService.WriteToClient(&sessionID, msg.Body)
default:
logger.Log("Received untyped message for " + sessionID)
errorhandler.LogError(errors.New("undefined message type sent"), "error while consuming")
}
}
// // ConsumeMessageFunc is a function type to be called when a message is consumed
// type ConsumeMessageFunc func(amqp.Delivery)
// // StartConsumer will start a consumer
// // When a message is received the consumeMessage function will be called
// func StartConsumer(broker *alice.RabbitBroker, exchangeID *string, routingKey *string, consumeMessage ConsumeMessageFunc) {
// // Declare the exchange we want to bind to
// exchange, err := alice.CreateDefaultExchange(*exchangeID, alice.Direct)
// if err != nil {
// errorhandler.FailWithError(err, "failed to create exchange")
// }
// // Declare the queue we will consume from
// queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// // Create the consumer
// c, err := broker.CreateConsumer(queue, *routingKey, alice.DefaultConsumerErrorHandler)
// if err != nil {
// errorhandler.FailWithError(err, "failed to create consumer")
// }
// // Start consuming messages
// go c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(msg) })
// }
package consume
// UseCase is an interface describing the socket usecases
type UseCase interface {
Start()
ConsumeMessages()
}
package produce
package produce
package aql
package queryconversion
import (
"encoding/json"
......@@ -78,7 +78,6 @@ type Constraint = {
]
}
&{{[0] [0]} [] [{Flight 0 -1 {1 3} []}]}
*/
// Constraint datatypes
......@@ -123,8 +122,8 @@ type constraintStruct struct {
MatchType string
}
// ConvertJSONToAQL converts a json string to an AQL query
func ConvertJSONToAQL(jsonMsg *[]byte) (*string, error) {
// ConvertQuery converts a json string to an AQL query
func ConvertQuery(jsonMsg *[]byte) (*string, error) {
jsonStruct, err := convertJSONToStruct(jsonMsg)
if err != nil {
......
package aql
package queryconversion
import (
"fmt"
......@@ -82,7 +82,7 @@ func TestHugeQuery(t *testing.T) {
}`
s3 := []byte(s)
j, _ := ConvertJSONToAQL(&s3)
j, _ := ConvertQuery(&s3)
fmt.Print(j)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment