Skip to content
Snippets Groups Projects
Select Git revision
  • 5479b2dccbdc75022b1d28363f30597f92ebccb7
  • main default protected
  • feat/statistics protected
  • feat/schemaStats protected
  • feat/password protected
  • refactor/remove-graphcounts protected
  • feat/cypher2query-negations2 protected
  • feat/airecommender protected
  • feat/cypher-visual protected
  • feat/mcp protected
  • feature-flag protected
  • feat/optionalDefault protected
  • feat/refactorarray protected
  • feat/attributes-db protected
  • fix/mlservice protected
  • feat/median protected
  • feat/grammar2sql protected
  • feat/fetchSchemaNew protected
  • feat/monolith protected
  • feat/aggComparison protected
  • feat/maxOutput protected
  • v1.62.0
  • v1.61.1
  • v1.61.0
  • v1.60.1
  • v1.60.0
  • v1.59.0
  • v1.58.1
  • v1.58.0
  • v1.57.1
  • v1.57.0
  • v1.56.0
  • v1.55.2
  • v1.55.1
  • v1.55.0
  • v1.54.0
  • v1.53.1
  • v1.53.0
  • v1.52.1
  • v1.52.0
  • v1.51.0
41 results

consume.go

Blame
  • broker.go 2.23 KiB
    package brokerdriver
    
    import (
    	"os"
    	"query-service/internal/adapters/brokeradapter"
    	"query-service/pkg/errorhandler"
    
    	"strconv"
    	"time"
    
    	"github.com/thijsheijden/alice"
    )
    
    // Driver models an Alice RabbitMQ broker
    type Driver struct {
    	broker  alice.Broker
    	gateway brokeradapter.GatewayInterface
    }
    
    // CreateAliceBroker creates an Alice broker
    func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver {
    
    	// Create connection config using environment variables
    	rabbitUser := os.Getenv("RABBIT_USER")
    	rabbitPassword := os.Getenv("RABBIT_PASSWORD")
    	rabbitHost := os.Getenv("RABBIT_HOST")
    	rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
    	errorhandler.FailWithError(err, "invalid rabbitmq port given")
    
    	config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
    
    	return &Driver{
    		broker:  alice.CreateBroker(config),
    		gateway: gateway,
    	}
    }
    
    // CreateConsumer creates an AliceConsumer on a certain exchange and queue
    func (d *Driver) CreateConsumer() Consumer {
    	exchangeID := "query-requests"
    	routingKey := "aql-user-request"
    
    	// Declare the exchange we want to bind to
    	exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
    	if err != nil {
    		errorhandler.FailWithError(err, "failed to create exchange")
    	}
    
    	// Declare the queue we will consume from
    	queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
    
    	// Create the consumer
    	c, err := d.broker.CreateConsumer(queue, routingKey, alice.DefaultConsumerErrorHandler)
    	if err != nil {
    		errorhandler.FailWithError(err, "failed to create consumer")
    	}
    
    	consumer := &AliceConsumer{
    		broker:   d,
    		consumer: c,
    	}
    
    	return consumer
    }
    
    // CreateProducer creates an AliceProducer on a certain exchange
    func (d *Driver) CreateProducer() Producer {
    	exchange, err := alice.CreateDefaultExchange("ui-direct-exchange", alice.Direct)
    	if err != nil {
    		errorhandler.FailWithError(err, "failed to create exchange for producer")
    	}
    
    	p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
    	if err != nil {
    		errorhandler.FailWithError(err, "failed to created producer")
    	}
    
    	producer := &AliceProducer{
    		broker:   d,
    		producer: p,
    	}
    
    	return producer
    }