Newer
Older
package brokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// AliceConsumer models a RabbitMQ consumer in Alice
type AliceConsumer struct {
broker *Driver
consumer alice.Consumer
messageHandler func(msg *brokeradapter.Message)
}
// ConsumeMessages starts the consumer using an alice consumer
func (ac *AliceConsumer) ConsumeMessages() {
go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage)
}
func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) {
// Convert message using the gateway
// Pass message to the message handler
ac.messageHandler(ac.broker.gateway.TransformMessage(msg))
// Acknowledge the message was received
msg.Ack(true)
}
// SetMessageHandler sets the message handler to the supplied function
func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
ac.messageHandler = handler
}