Skip to content
Snippets Groups Projects
Commit b6d981e2 authored by Bouma,C.J. (Chris)'s avatar Bouma,C.J. (Chris)
Browse files

Listen to query queue (unclean)

Not finally implemented, still unclean, but functional!
parent 944b394a
No related branches found
No related tags found
No related merge requests found
package main
import "fmt"
import (
"errors"
"fmt"
"os"
"query-service/internal/errorhandler"
"strconv"
"time"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
func main() {
fmt.Println("Helloooo world")
start()
}
// Start starts consuming
func start() {
// Get the unique queue id that we will be listening on
queueID := "aql-query-requests"
exchangeID := "query-requests"
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Open connection to broker
conn := alice.Connect(*config)
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Topic)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, queueID, true, false, true, false, nil)
// Create the consumer
c, err := conn.CreateConsumer(queue, "aql.user-request", alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
// Start consuming messages
c.ConsumeMessages(nil, false, consumeMessage)
select {}
}
// Handle incoming messages, pass them to the correct place
func consumeMessage(msg amqp.Delivery) {
// Look at the result type in the headers
switch msg.Headers["type"] {
case "user-request": // A schema result
consumeQueryRequest(&msg)
default:
errorhandler.LogError(errors.New("undefined message type sent"), "error while consuming")
}
}
func consumeQueryRequest(msg *amqp.Delivery) {
// Retrieve JSON formatted string payload from msg
//jsonString := string(msg.Body)
// Call convert to AQL function
// aqlQuery := "result" // TODO: this should be the result from the json to aql conversion
// execute and retrieve result
// convert result to general (node-link (?)) format
// publish converted result
}
module query-service
go 1.15
require (
github.com/rs/xid v1.3.0
github.com/streadway/amqp v1.0.0
github.com/thijsheijden/alice v0.1.5
)
package errorhandler
import (
"fmt"
)
// LogError logs an error that is not nil
func LogError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %v", msg, err)
}
}
// FailWithError panics if the error is not nil
func FailWithError(err error, msg string) {
if err != nil {
fmt.Printf("%s: %v", msg, err)
panic(err)
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment