From c17d57694e9bed9acf7775e5d0f78d29dfbce2fe Mon Sep 17 00:00:00 2001 From: sivan <sivanduijn@gmail.com> Date: Fri, 2 Apr 2021 12:22:07 +0200 Subject: [PATCH] co-authored by: Chris, cleaned up added handleMessageFunc function type --- cmd/query-service/main.go | 60 +++-------------------------------- internal/consumer/consumer.go | 50 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 55 deletions(-) create mode 100644 internal/consumer/consumer.go diff --git a/cmd/query-service/main.go b/cmd/query-service/main.go index 84edec1..370e4e1 100644 --- a/cmd/query-service/main.go +++ b/cmd/query-service/main.go @@ -2,74 +2,24 @@ package main import ( "fmt" - "os" "query-service/internal/aql" + "query-service/internal/consumer" "query-service/internal/errorhandler" - "strconv" - "time" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" ) func main() { - start() -} - -// Start starts consuming -func start() { - // Get the unique queue id that we will be listening on exchangeID := "query-requests" - - // Create connection config using environment variables - rabbitUser := os.Getenv("RABBIT_USER") - rabbitPassword := os.Getenv("RABBIT_PASSWORD") - rabbitHost := os.Getenv("RABBIT_HOST") - rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) - - config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) - - // Open connection to broker - conn := alice.Connect(*config) - - // Declare the exchange we want to bind to - exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Topic) - if err != nil { - errorhandler.FailWithError(err, "failed to create exchange") - } - - // Declare the queue we will consume from - queue := alice.CreateQueue(exchange, "", true, false, true, false, nil) - - // Create the consumer - c, err := conn.CreateConsumer(queue, "aql.user-request", alice.DefaultConsumerErrorHandler) - if err != nil { - errorhandler.FailWithError(err, "failed to create consumer") - } - - // Start consuming messages - c.ConsumeMessages(nil, false, consumeMessage) -} - -// Handle incoming messages, pass them to the correct place -func consumeMessage(msg amqp.Delivery) { - - fmt.Println(" [x] " + string(msg.Body)) - - for key, element := range msg.Headers { - fmt.Println("Key:", key, "=>", "Element:", element) - } - - executeQueryRequest(&msg.Body) + routingKey := "aql-user-request" + consumer.StartConsuming(onMessageReceived, exchangeID, routingKey) } -func executeQueryRequest(jsonMsg *[]byte) { +func onMessageReceived(jsonMsg *[]byte) { // Retrieve JSON formatted string payload from msg // Convert the json byte msg to an aql query string aqlQuery, err := aql.ConvertJSONToAQL(jsonMsg) - errorhandler.FailWithError(err, "failed to parse incoming msg to AQL") + errorhandler.FailWithError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead fmt.Println(*aqlQuery) diff --git a/internal/consumer/consumer.go b/internal/consumer/consumer.go new file mode 100644 index 0000000..e742681 --- /dev/null +++ b/internal/consumer/consumer.go @@ -0,0 +1,50 @@ +package consumer + +import ( + "query-service/internal/errorhandler" + "time" + + "github.com/streadway/amqp" + "github.com/thijsheijden/alice" +) + +// ConsumeMessageFunc is a function type to be called when a message is consumed +type ConsumeMessageFunc func(*[]byte) + +// StartConsuming will start consuming messages +// When a message is received the consumeMessage function will be called +func StartConsuming(consumeMessage ConsumeMessageFunc, exchangeID string, routingKey string) { + + // Create connection config using environment variables + // rabbitUser := os.Getenv("RABBIT_USER") + // rabbitPassword := os.Getenv("RABBIT_PASSWORD") + // rabbitHost := os.Getenv("RABBIT_HOST") + // rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) + rabbitUser := "haha-test" + rabbitPassword := "dikkedraak" + rabbitHost := "192.168.178.158" + rabbitPort := 5672 + + config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) + + // Open connection to broker + conn := alice.Connect(*config) + + // Declare the exchange we want to bind to + exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) + if err != nil { + errorhandler.FailWithError(err, "failed to create exchange") + } + + // Declare the queue we will consume from + queue := alice.CreateQueue(exchange, "", true, false, true, false, nil) + + // Create the consumer + c, err := conn.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler) + if err != nil { + errorhandler.FailWithError(err, "failed to create consumer") + } + + // Start consuming messages + c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(&msg.Body) }) +} -- GitLab