Skip to content
Snippets Groups Projects
consumer.go 954 B
Newer Older
package brokerdriver

import (
	"query-service/internal/adapters/brokeradapter"

	"github.com/streadway/amqp"
	"github.com/thijsheijden/alice"
)

// AliceConsumer models a RabbitMQ consumer in Alice
type AliceConsumer struct {
	broker         *Driver
	consumer       alice.Consumer
	messageHandler func(msg *brokeradapter.Message)
}

sivan's avatar
sivan committed
// ConsumeMessages starts the consumer using an alice consumer
func (ac *AliceConsumer) ConsumeMessages() {
thijsheijden's avatar
thijsheijden committed
	go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage)
}

func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) {
	// Convert message using the gateway
	// Pass message to the message handler
	ac.messageHandler(ac.broker.gateway.TransformMessage(msg))

	// Acknowledge the message was received
	msg.Ack(true)
}

// SetMessageHandler sets the message handler to the supplied function
func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
	ac.messageHandler = handler
}