package brokerdriver import ( "query-service/internal/adapters/brokeradapter" "github.com/streadway/amqp" "github.com/thijsheijden/alice" ) // AliceConsumer models a RabbitMQ consumer in Alice type AliceConsumer struct { broker *Driver consumer alice.Consumer messageHandler func(msg *brokeradapter.Message) } // ConsumeMessages starts the consumer using an alice consumer func (ac *AliceConsumer) ConsumeMessages() { go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage) } func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) { // Convert message using the gateway // Pass message to the message handler ac.messageHandler(ac.broker.gateway.TransformMessage(msg)) // Acknowledge the message was received msg.Ack(true) } // SetMessageHandler sets the message handler to the supplied function func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) { ac.messageHandler = handler }