Skip to content
Snippets Groups Projects
Commit 34a51766 authored by thijsheijden's avatar thijsheijden
Browse files

Improved mock broker

Mock producer can now be made. Producer can publish messages. Messages
can be retrieved from the mock broker.
parent 139d9f2f
No related branches found
No related tags found
No related merge requests found
......@@ -20,5 +20,5 @@ type Consumer interface {
// A Producer belongs to a broker and publishes messages to a queue
type Producer interface {
PublishMessage(body *[]byte, queueID *string, headers *amqp.Table)
PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table)
}
......@@ -8,18 +8,32 @@ import (
// Driver is mock gateway
type Driver struct {
gateway brokeradapter.GatewayInterface
// Mock messages that are published by producers on this broker
// Key is the routing key
// Value is a slice of messages, in order of being sent 'first -> last'
Messages map[string][]brokeradapter.Message
}
// CreateBroker is a creates a mock driver
// CreateBroker creates a broker driver (mock)
func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver {
return &Driver{
gateway: gateway,
gateway: gateway,
Messages: make(map[string][]brokeradapter.Message),
}
}
// CreateConsumer creates a mock consumer
// CreateConsumer creates a consumer (mock)
func (d *Driver) CreateConsumer() brokerdriver.Consumer {
return &Consumer{
broker: d,
}
}
// CreateProducer creates a producer (mock)
func (d *Driver) CreateProducer() brokerdriver.Producer {
return &Producer{
broker: d,
exchange: "ui-direct-exchange", // This is the only exchange this service produces to
}
}
package mockbrokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
)
type Producer struct {
broker *Driver
// The exchange this producer is connected to
exchange string
}
// PublishMessage publishes a message to the given queue
func (p *Producer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) {
// Create the message
msg := brokeradapter.Message{
Headers: *headers,
Body: *body,
}
// Append the message to the list
p.broker.Messages[*routingKey] = append(p.broker.Messages[*routingKey], msg)
}
......@@ -14,10 +14,10 @@ type AliceProducer struct {
producer alice.Producer
}
// PublishMessage will publish a message to the specified queue id
func (ap *AliceProducer) PublishMessage(body *[]byte, queueID *string, headers *amqp.Table) {
// PublishMessage will publish a message to the specified queue id (mock)
func (ap *AliceProducer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) {
sessionID := (*headers)["sessionID"]
logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *queueID, sessionID))
logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *routingKey, sessionID))
ap.producer.PublishMessage(*body, queueID, headers)
ap.producer.PublishMessage(*body, routingKey, headers)
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment