Skip to content
Snippets Groups Projects
Commit c17d5769 authored by sivan's avatar sivan
Browse files

co-authored by: Chris, cleaned up

added handleMessageFunc function type
parent fc3febc2
No related tags found
No related merge requests found
......@@ -2,74 +2,24 @@ package main
import (
"fmt"
"os"
"query-service/internal/aql"
"query-service/internal/consumer"
"query-service/internal/errorhandler"
"strconv"
"time"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
func main() {
start()
}
// Start starts consuming
func start() {
// Get the unique queue id that we will be listening on
exchangeID := "query-requests"
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Open connection to broker
conn := alice.Connect(*config)
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Topic)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "", true, false, true, false, nil)
// Create the consumer
c, err := conn.CreateConsumer(queue, "aql.user-request", alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
// Start consuming messages
c.ConsumeMessages(nil, false, consumeMessage)
}
// Handle incoming messages, pass them to the correct place
func consumeMessage(msg amqp.Delivery) {
fmt.Println(" [x] " + string(msg.Body))
for key, element := range msg.Headers {
fmt.Println("Key:", key, "=>", "Element:", element)
}
executeQueryRequest(&msg.Body)
routingKey := "aql-user-request"
consumer.StartConsuming(onMessageReceived, exchangeID, routingKey)
}
func executeQueryRequest(jsonMsg *[]byte) {
func onMessageReceived(jsonMsg *[]byte) {
// Retrieve JSON formatted string payload from msg
// Convert the json byte msg to an aql query string
aqlQuery, err := aql.ConvertJSONToAQL(jsonMsg)
errorhandler.FailWithError(err, "failed to parse incoming msg to AQL")
errorhandler.FailWithError(err, "failed to parse incoming msg to AQL") // TODO: don't panic on error, send error message to client instead
fmt.Println(*aqlQuery)
......
package consumer
import (
"query-service/internal/errorhandler"
"time"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// ConsumeMessageFunc is a function type to be called when a message is consumed
type ConsumeMessageFunc func(*[]byte)
// StartConsuming will start consuming messages
// When a message is received the consumeMessage function will be called
func StartConsuming(consumeMessage ConsumeMessageFunc, exchangeID string, routingKey string) {
// Create connection config using environment variables
// rabbitUser := os.Getenv("RABBIT_USER")
// rabbitPassword := os.Getenv("RABBIT_PASSWORD")
// rabbitHost := os.Getenv("RABBIT_HOST")
// rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
rabbitUser := "haha-test"
rabbitPassword := "dikkedraak"
rabbitHost := "192.168.178.158"
rabbitPort := 5672
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Open connection to broker
conn := alice.Connect(*config)
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "", true, false, true, false, nil)
// Create the consumer
c, err := conn.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
// Start consuming messages
c.ConsumeMessages(nil, false, func(msg amqp.Delivery) { consumeMessage(&msg.Body) })
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment