Skip to content
Snippets Groups Projects
broker.go 3.11 KiB
Newer Older
package brokerdriver

import (
	"log"
	"os"
	"query-service/internal/adapters/brokeradapter"
	"query-service/pkg/errorhandler"
	"query-service/pkg/logger"
	"strconv"
sivan's avatar
sivan committed

	"time"

	"github.com/thijsheijden/alice"
)

// Driver models an Alice RabbitMQ broker
type Driver struct {
	broker  alice.Broker
	gateway brokeradapter.GatewayInterface
}

// CreateAliceBroker creates an Alice broker
func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver {

	// Create connection config using environment variables
	rabbitUser := os.Getenv("RABBIT_USER")
	rabbitPassword := os.Getenv("RABBIT_PASSWORD")
	rabbitHost := os.Getenv("RABBIT_HOST")
	rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
	errorhandler.FailWithError(err, "invalid rabbitmq port given")

	config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)

	// Attempt to create a broker, if an error is returned retry the connection every 10 seconds
	broker, err := alice.CreateBroker(config)
	if err != nil {
		errorhandler.FailWithError(err, err.Error())
		errorhandler.LogError(err, "Failed to connect to RabbitMQ")

		// Create 10 second ticker
		ticker := time.NewTicker(time.Second * 10)
		done := make(chan bool, 1)

		for {
			select {
			case <-ticker.C:
				logger.Log("Retrying RabbitMQ connection")
				broker, err = alice.CreateBroker(config)
				if err == nil {
					done <- true
					ticker.Stop()
				}
			case <-done:
				log.Println("Succesfully connected to broker")
				return &Driver{
					broker:  broker,
					gateway: gateway,
				}
			}
		}
	}

	// Return the created driver
	// This code only gets called if the broker creation works on the first try, which it more often than not does
	return &Driver{
		broker:  broker,
		gateway: gateway,
	}
}

// CreateConsumer creates an AliceConsumer on a certain exchange and queue
func (d *Driver) CreateConsumer() Consumer {
	exchangeID := "requests-exchange"
	routingKey := "aql-query-request"

	// Declare the exchange we want to bind to
	exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
	if err != nil {
		errorhandler.FailWithError(err, "failed to create exchange")
	}

	// Declare the queue we will consume from
	queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)

	// Create the consumer
thijsheijden's avatar
thijsheijden committed
	c, err := d.broker.CreateConsumer(queue, routingKey, "", alice.DefaultConsumerErrorHandler)
	if err != nil {
		errorhandler.FailWithError(err, "failed to create consumer")
	}

	consumer := &AliceConsumer{
		broker:   d,
		consumer: c,
	}

	return consumer
}

// CreateProducer creates an AliceProducer on a certain exchange
func (d *Driver) CreateProducer() Producer {
sivan's avatar
sivan committed
	exchangeID := "ui-direct-exchange"

	exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
	if err != nil {
		errorhandler.FailWithError(err, "failed to create exchange for producer")
	}

	p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
	if err != nil {
		errorhandler.FailWithError(err, "failed to created producer")
	}

	producer := &AliceProducer{
		broker:   d,
		producer: p,
	}

	return producer
}