package brokerdriver import ( "log" "os" "query-service/internal/adapters/brokeradapter" "query-service/pkg/errorhandler" "query-service/pkg/logger" "strconv" "time" "github.com/thijsheijden/alice" ) // Driver models an Alice RabbitMQ broker type Driver struct { broker alice.Broker gateway brokeradapter.GatewayInterface } // CreateAliceBroker creates an Alice broker func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver { // Create connection config using environment variables rabbitUser := os.Getenv("RABBIT_USER") rabbitPassword := os.Getenv("RABBIT_PASSWORD") rabbitHost := os.Getenv("RABBIT_HOST") rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) errorhandler.FailWithError(err, "invalid rabbitmq port given") config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) // Attempt to create a broker, if an error is returned retry the connection every 10 seconds broker, err := alice.CreateBroker(config) if err != nil { errorhandler.FailWithError(err, err.Error()) errorhandler.LogError(err, "Failed to connect to RabbitMQ") // Create 10 second ticker ticker := time.NewTicker(time.Second * 10) done := make(chan bool, 1) for { select { case <-ticker.C: logger.Log("Retrying RabbitMQ connection") broker, err = alice.CreateBroker(config) if err == nil { done <- true ticker.Stop() } case <-done: log.Println("Succesfully connected to broker") return &Driver{ broker: broker, gateway: gateway, } } } } // Return the created driver // This code only gets called if the broker creation works on the first try, which it more often than not does return &Driver{ broker: broker, gateway: gateway, } } // CreateConsumer creates an AliceConsumer on a certain exchange and queue func (d *Driver) CreateConsumer() Consumer { exchangeID := "requests-exchange" routingKey := "aql-query-request" // Declare the exchange we want to bind to exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) if err != nil { errorhandler.FailWithError(err, "failed to create exchange") } // Declare the queue we will consume from queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil) // Create the consumer c, err := d.broker.CreateConsumer(queue, routingKey, "", alice.DefaultConsumerErrorHandler) if err != nil { errorhandler.FailWithError(err, "failed to create consumer") } consumer := &AliceConsumer{ broker: d, consumer: c, } return consumer } // CreateProducer creates an AliceProducer on a certain exchange func (d *Driver) CreateProducer() Producer { exchangeID := "ui-direct-exchange" exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) if err != nil { errorhandler.FailWithError(err, "failed to create exchange for producer") } p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler) if err != nil { errorhandler.FailWithError(err, "failed to created producer") } producer := &AliceProducer{ broker: d, producer: p, } return producer }