diff --git a/internal/drivers/brokerdriver/interface.go b/internal/drivers/brokerdriver/interface.go index 3aacf64fea45b25f2eb1dd9c89792ceecbf8192f..3dc54da2fc47fbae03fcf60d59c5dedcf8f296f6 100644 --- a/internal/drivers/brokerdriver/interface.go +++ b/internal/drivers/brokerdriver/interface.go @@ -20,5 +20,5 @@ type Consumer interface { // A Producer belongs to a broker and publishes messages to a queue type Producer interface { - PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) + PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) } diff --git a/internal/drivers/brokerdriver/mock/broker.go b/internal/drivers/brokerdriver/mock/broker.go index 8483d8fdc1f2b4d2d25061596854daf041d79393..80f1c1c89637212a46c18cd1fd89fc32a2a6c8f7 100644 --- a/internal/drivers/brokerdriver/mock/broker.go +++ b/internal/drivers/brokerdriver/mock/broker.go @@ -8,18 +8,32 @@ import ( // Driver is mock gateway type Driver struct { gateway brokeradapter.GatewayInterface + + // Mock messages that are published by producers on this broker + // Key is the routing key + // Value is a slice of messages, in order of being sent 'first -> last' + Messages map[string][]brokeradapter.Message } -// CreateBroker is a creates a mock driver +// CreateBroker creates a broker driver (mock) func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver { return &Driver{ - gateway: gateway, + gateway: gateway, + Messages: make(map[string][]brokeradapter.Message), } } -// CreateConsumer creates a mock consumer +// CreateConsumer creates a consumer (mock) func (d *Driver) CreateConsumer() brokerdriver.Consumer { return &Consumer{ broker: d, } } + +// CreateProducer creates a producer (mock) +func (d *Driver) CreateProducer() brokerdriver.Producer { + return &Producer{ + broker: d, + exchange: "ui-direct-exchange", // This is the only exchange this service produces to + } +} diff --git a/internal/drivers/brokerdriver/mock/producer.go b/internal/drivers/brokerdriver/mock/producer.go new file mode 100644 index 0000000000000000000000000000000000000000..d92435b8dc5d3b5eb4d4857ffb4326fd2677e174 --- /dev/null +++ b/internal/drivers/brokerdriver/mock/producer.go @@ -0,0 +1,26 @@ +package mockbrokerdriver + +import ( + "query-service/internal/adapters/brokeradapter" + + "github.com/streadway/amqp" +) + +type Producer struct { + broker *Driver + + // The exchange this producer is connected to + exchange string +} + +// PublishMessage publishes a message to the given queue +func (p *Producer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) { + // Create the message + msg := brokeradapter.Message{ + Headers: *headers, + Body: *body, + } + + // Append the message to the list + p.broker.Messages[*routingKey] = append(p.broker.Messages[*routingKey], msg) +} diff --git a/internal/drivers/brokerdriver/producer.go b/internal/drivers/brokerdriver/producer.go index e037aac06b3c6e2911851f722dd17017b5c21653..513e43d41b55110590092c0c9dc7479b7dd83402 100644 --- a/internal/drivers/brokerdriver/producer.go +++ b/internal/drivers/brokerdriver/producer.go @@ -14,10 +14,10 @@ type AliceProducer struct { producer alice.Producer } -// PublishMessage will publish a message to the specified queue id -func (ap *AliceProducer) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) { +// PublishMessage will publish a message to the specified queue id (mock) +func (ap *AliceProducer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) { sessionID := (*headers)["sessionID"] - logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *queueID, sessionID)) + logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *routingKey, sessionID)) - ap.producer.PublishMessage(*body, queueID, headers) + ap.producer.PublishMessage(*body, routingKey, headers) }