Newer
Older
"os"
"query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler"
"time"
"github.com/thijsheijden/alice"
)
// Driver models an Alice RabbitMQ broker
type Driver struct {
broker alice.Broker
gateway brokeradapter.GatewayInterface
}
// CreateAliceBroker creates an Alice broker
func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
errorhandler.FailWithError(err, "invalid rabbitmq port given")
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Attempt to create a broker, if an error is returned retry the connection every 10 seconds
broker, err := alice.CreateBroker(config)
if err != nil {
errorhandler.FailWithError(err, err.Error())
errorhandler.LogError(err, "Failed to connect to RabbitMQ")
// Create 10 second ticker
ticker := time.NewTicker(time.Second * 10)
done := make(chan bool, 1)
for {
select {
case <-ticker.C:
logger.Log("Retrying RabbitMQ connection")
broker, err = alice.CreateBroker(config)
if err == nil {
done <- true
ticker.Stop()
}
case <-done:
log.Println("Succesfully connected to broker")
return &Driver{
broker: broker,
gateway: gateway,
}
}
}
}
// Return the created driver
// This code only gets called if the broker creation works on the first try, which it more often than not does
gateway: gateway,
}
}
// CreateConsumer creates an AliceConsumer on a certain exchange and queue
func (d *Driver) CreateConsumer() Consumer {
exchangeID := "requests-exchange"
routingKey := "aql-query-request"
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// Create the consumer
c, err := d.broker.CreateConsumer(queue, routingKey, "", alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
consumer := &AliceConsumer{
broker: d,
consumer: c,
}
return consumer
}
// CreateProducer creates an AliceProducer on a certain exchange
func (d *Driver) CreateProducer() Producer {
exchangeID := "ui-direct-exchange"
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer")
}
p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to created producer")
}
producer := &AliceProducer{
broker: d,
producer: p,
}
return producer
}