From 75a36540e0075a27b0486c393014d6400f17fb79 Mon Sep 17 00:00:00 2001 From: thijsheijden <hi@thijsheijden.nl> Date: Mon, 17 May 2021 18:59:15 +0200 Subject: [PATCH] Replaced brokerdriver with external broker package --- cmd/query-service/main.go | 6 +- go.mod | 1 + go.sum | 2 + internal/drivers/brokerdriver/broker.go | 119 ------------------ internal/drivers/brokerdriver/consumer.go | 34 ----- internal/drivers/brokerdriver/interface.go | 24 ---- internal/drivers/brokerdriver/mock/broker.go | 39 ------ .../drivers/brokerdriver/mock/consumer.go | 18 --- .../drivers/brokerdriver/mock/producer.go | 27 ---- internal/drivers/brokerdriver/producer.go | 23 ---- internal/usecases/consume/consume.go | 10 +- internal/usecases/consume/consume_test.go | 27 ++-- internal/usecases/consume/handlemessage.go | 5 +- internal/usecases/produce/produce.go | 11 +- internal/usecases/produce/produce_test.go | 11 +- internal/usecases/produce/publishmessage.go | 6 +- 16 files changed, 33 insertions(+), 330 deletions(-) delete mode 100644 internal/drivers/brokerdriver/broker.go delete mode 100644 internal/drivers/brokerdriver/consumer.go delete mode 100644 internal/drivers/brokerdriver/interface.go delete mode 100644 internal/drivers/brokerdriver/mock/broker.go delete mode 100644 internal/drivers/brokerdriver/mock/consumer.go delete mode 100644 internal/drivers/brokerdriver/mock/producer.go delete mode 100644 internal/drivers/brokerdriver/producer.go diff --git a/cmd/query-service/main.go b/cmd/query-service/main.go index 202e20c..939cd58 100644 --- a/cmd/query-service/main.go +++ b/cmd/query-service/main.go @@ -1,8 +1,6 @@ package main import ( - "query-service/internal/adapters/brokeradapter" - "query-service/internal/drivers/brokerdriver" "query-service/internal/drivers/keyvaluedriver" "query-service/internal/drivers/rpcdriver" "query-service/internal/usecases/consume" @@ -11,6 +9,7 @@ import ( "query-service/internal/usecases/request" "query-service/pkg/logger" + "git.science.uu.nl/datastrophe/broker" "git.science.uu.nl/datastrophe/query-conversion/aql" "github.com/thijsheijden/alice" ) @@ -26,8 +25,7 @@ func main() { rpcDriver := rpcdriver.New() // MARK: Create alice RabbitMQ services - brokerGateway := brokeradapter.CreateGateway() - aliceBroker := brokerdriver.CreateAliceBroker(brokerGateway) + aliceBroker := broker.NewDriver() // Instantiate an implementation of the produce UseCase produceService := produce.NewService(aliceBroker, redisService) diff --git a/go.mod b/go.mod index 9a04e17..c146fcd 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.15 require ( git.science.uu.nl/datastrophe/aql-conversion v0.0.0-20210517163916-56e6fe347218 + git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58 git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0 github.com/arangodb/go-driver v0.0.0-20210506071742-64f314d85db7 github.com/boumenot/gocover-cobertura v1.1.0 // indirect diff --git a/go.sum b/go.sum index 8e65923..be623bb 100644 --- a/go.sum +++ b/go.sum @@ -1,6 +1,8 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= git.science.uu.nl/datastrophe/aql-conversion v0.0.0-20210517163916-56e6fe347218 h1:Ro/k5d3EBb6LN3w68xlRWpfm52gt3decB+nzXsPuJ7Q= git.science.uu.nl/datastrophe/aql-conversion v0.0.0-20210517163916-56e6fe347218/go.mod h1:T4TILAfcxyGbubv918rCr0DIa59JTh0PnjDaNK8CZh8= +git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58 h1:hnPHRwbH3EwMO+5XSPR1JFusrmxx/dq84mt5wKml1jw= +git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58/go.mod h1:+ua8t+K6R+rF4zllcXH3QYPzpB+8bsAsw1/h6kflwfM= git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0 h1:UrYqOFjIFxaHtmzqsoId48g/jPEdGX4MCX5sDjbzBSI= git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0/go.mod h1:6rvalwekoukmVu3SbWmZkj8wBZEm34wDbA4Ilxcb+jw= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= diff --git a/internal/drivers/brokerdriver/broker.go b/internal/drivers/brokerdriver/broker.go deleted file mode 100644 index a37b6fd..0000000 --- a/internal/drivers/brokerdriver/broker.go +++ /dev/null @@ -1,119 +0,0 @@ -package brokerdriver - -import ( - "log" - "os" - "query-service/internal/adapters/brokeradapter" - "query-service/pkg/errorhandler" - "query-service/pkg/logger" - "strconv" - - "time" - - "github.com/thijsheijden/alice" -) - -// Driver models an Alice RabbitMQ broker -type Driver struct { - broker alice.Broker - gateway brokeradapter.GatewayInterface -} - -// CreateAliceBroker creates an Alice broker -func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver { - - // Create connection config using environment variables - rabbitUser := os.Getenv("RABBIT_USER") - rabbitPassword := os.Getenv("RABBIT_PASSWORD") - rabbitHost := os.Getenv("RABBIT_HOST") - rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT")) - errorhandler.FailWithError(err, "invalid rabbitmq port given") - - config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler) - - // Attempt to create a broker, if an error is returned retry the connection every 10 seconds - broker, err := alice.CreateBroker(config) - if err != nil { - errorhandler.FailWithError(err, err.Error()) - errorhandler.LogError(err, "Failed to connect to RabbitMQ") - - // Create 10 second ticker - ticker := time.NewTicker(time.Second * 10) - done := make(chan bool, 1) - - for { - select { - case <-ticker.C: - logger.Log("Retrying RabbitMQ connection") - broker, err = alice.CreateBroker(config) - if err == nil { - done <- true - ticker.Stop() - } - case <-done: - log.Println("Succesfully connected to broker") - return &Driver{ - broker: broker, - gateway: gateway, - } - } - } - } - - // Return the created driver - // This code only gets called if the broker creation works on the first try, which it more often than not does - return &Driver{ - broker: broker, - gateway: gateway, - } -} - -// CreateConsumer creates an AliceConsumer on a certain exchange and queue -func (d *Driver) CreateConsumer() Consumer { - exchangeID := "requests-exchange" - routingKey := "aql-query-request" - - // Declare the exchange we want to bind to - exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) - if err != nil { - errorhandler.FailWithError(err, "failed to create exchange") - } - - // Declare the queue we will consume from - queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil) - - // Create the consumer - c, err := d.broker.CreateConsumer(queue, routingKey, "", alice.DefaultConsumerErrorHandler) - if err != nil { - errorhandler.FailWithError(err, "failed to create consumer") - } - - consumer := &AliceConsumer{ - broker: d, - consumer: c, - } - - return consumer -} - -// CreateProducer creates an AliceProducer on a certain exchange -func (d *Driver) CreateProducer() Producer { - exchangeID := "ui-direct-exchange" - - exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct) - if err != nil { - errorhandler.FailWithError(err, "failed to create exchange for producer") - } - - p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler) - if err != nil { - errorhandler.FailWithError(err, "failed to created producer") - } - - producer := &AliceProducer{ - broker: d, - producer: p, - } - - return producer -} diff --git a/internal/drivers/brokerdriver/consumer.go b/internal/drivers/brokerdriver/consumer.go deleted file mode 100644 index 194b2ad..0000000 --- a/internal/drivers/brokerdriver/consumer.go +++ /dev/null @@ -1,34 +0,0 @@ -package brokerdriver - -import ( - "query-service/internal/adapters/brokeradapter" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" -) - -// AliceConsumer models a RabbitMQ consumer in Alice -type AliceConsumer struct { - broker *Driver - consumer alice.Consumer - messageHandler func(msg *brokeradapter.Message) -} - -// ConsumeMessages starts the consumer using an alice consumer -func (ac *AliceConsumer) ConsumeMessages() { - go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage) -} - -func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) { - // Convert message using the gateway - // Pass message to the message handler - ac.messageHandler(ac.broker.gateway.TransformMessage(msg)) - - // Acknowledge the message was received - msg.Ack(true) -} - -// SetMessageHandler sets the message handler to the supplied function -func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) { - ac.messageHandler = handler -} diff --git a/internal/drivers/brokerdriver/interface.go b/internal/drivers/brokerdriver/interface.go deleted file mode 100644 index 3dc54da..0000000 --- a/internal/drivers/brokerdriver/interface.go +++ /dev/null @@ -1,24 +0,0 @@ -package brokerdriver - -import ( - "query-service/internal/adapters/brokeradapter" - - "github.com/streadway/amqp" -) - -// Broker models a message broker -type Broker interface { - CreateConsumer() Consumer - CreateProducer() Producer -} - -// A Consumer belongs to a broker and consumes messages from a queue -type Consumer interface { - ConsumeMessages() - SetMessageHandler(handler func(msg *brokeradapter.Message)) -} - -// A Producer belongs to a broker and publishes messages to a queue -type Producer interface { - PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) -} diff --git a/internal/drivers/brokerdriver/mock/broker.go b/internal/drivers/brokerdriver/mock/broker.go deleted file mode 100644 index 80f1c1c..0000000 --- a/internal/drivers/brokerdriver/mock/broker.go +++ /dev/null @@ -1,39 +0,0 @@ -package mockbrokerdriver - -import ( - "query-service/internal/adapters/brokeradapter" - "query-service/internal/drivers/brokerdriver" -) - -// Driver is mock gateway -type Driver struct { - gateway brokeradapter.GatewayInterface - - // Mock messages that are published by producers on this broker - // Key is the routing key - // Value is a slice of messages, in order of being sent 'first -> last' - Messages map[string][]brokeradapter.Message -} - -// CreateBroker creates a broker driver (mock) -func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver { - return &Driver{ - gateway: gateway, - Messages: make(map[string][]brokeradapter.Message), - } -} - -// CreateConsumer creates a consumer (mock) -func (d *Driver) CreateConsumer() brokerdriver.Consumer { - return &Consumer{ - broker: d, - } -} - -// CreateProducer creates a producer (mock) -func (d *Driver) CreateProducer() brokerdriver.Producer { - return &Producer{ - broker: d, - exchange: "ui-direct-exchange", // This is the only exchange this service produces to - } -} diff --git a/internal/drivers/brokerdriver/mock/consumer.go b/internal/drivers/brokerdriver/mock/consumer.go deleted file mode 100644 index 9118043..0000000 --- a/internal/drivers/brokerdriver/mock/consumer.go +++ /dev/null @@ -1,18 +0,0 @@ -package mockbrokerdriver - -import "query-service/internal/adapters/brokeradapter" - -// A Consumer implements the consumer interface (mock) -type Consumer struct { - broker *Driver -} - -// ConsumeMessages consumes messages from the broker (mock) -func (c *Consumer) ConsumeMessages() { - -} - -// SetMessageHandler mocks the setting of a message handler (mock) -func (c *Consumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) { - -} diff --git a/internal/drivers/brokerdriver/mock/producer.go b/internal/drivers/brokerdriver/mock/producer.go deleted file mode 100644 index 1367971..0000000 --- a/internal/drivers/brokerdriver/mock/producer.go +++ /dev/null @@ -1,27 +0,0 @@ -package mockbrokerdriver - -import ( - "query-service/internal/adapters/brokeradapter" - - "github.com/streadway/amqp" -) - -// A Producer implements the producer interface (mock) -type Producer struct { - broker *Driver - - // The exchange this producer is connected to - exchange string -} - -// PublishMessage publishes a message to the given queue (mock) -func (p *Producer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) { - // Create the message - msg := brokeradapter.Message{ - Headers: *headers, - Body: *body, - } - - // Append the message to the list - p.broker.Messages[*routingKey] = append(p.broker.Messages[*routingKey], msg) -} diff --git a/internal/drivers/brokerdriver/producer.go b/internal/drivers/brokerdriver/producer.go deleted file mode 100644 index 513e43d..0000000 --- a/internal/drivers/brokerdriver/producer.go +++ /dev/null @@ -1,23 +0,0 @@ -package brokerdriver - -import ( - "fmt" - "query-service/pkg/logger" - - "github.com/streadway/amqp" - "github.com/thijsheijden/alice" -) - -// AliceProducer models a RabbitMQ producer in Alice -type AliceProducer struct { - broker *Driver - producer alice.Producer -} - -// PublishMessage will publish a message to the specified queue id (mock) -func (ap *AliceProducer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) { - sessionID := (*headers)["sessionID"] - logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *routingKey, sessionID)) - - ap.producer.PublishMessage(*body, routingKey, headers) -} diff --git a/internal/usecases/consume/consume.go b/internal/usecases/consume/consume.go index c0cba93..dc1974e 100644 --- a/internal/usecases/consume/consume.go +++ b/internal/usecases/consume/consume.go @@ -1,18 +1,18 @@ package consume import ( - "query-service/internal/drivers/brokerdriver" "query-service/internal/usecases/databaseinfo" "query-service/internal/usecases/produce" "query-service/internal/usecases/request" + "git.science.uu.nl/datastrophe/broker" "git.science.uu.nl/datastrophe/query-conversion" ) // Service wraps consumer methods // broker is Alice broker created in brockerdriver driver type Service struct { - broker brokerdriver.Broker + brokerDriver broker.Interface producer produce.UseCase queryConverter query.Converter requestSender request.UseCase @@ -20,9 +20,9 @@ type Service struct { } // NewService creates a new service -func NewService(broker brokerdriver.Broker, produceService produce.UseCase, queryConverter query.Converter, requestSenderService request.UseCase, databaseInfoService databaseinfo.UseCase) *Service { +func NewService(broker broker.Interface, produceService produce.UseCase, queryConverter query.Converter, requestSenderService request.UseCase, databaseInfoService databaseinfo.UseCase) *Service { return &Service{ - broker: broker, + brokerDriver: broker, producer: produceService, queryConverter: queryConverter, requestSender: requestSenderService, @@ -34,7 +34,7 @@ func NewService(broker brokerdriver.Broker, produceService produce.UseCase, quer func (s *Service) Start() { // Create consumer - consumer := s.broker.CreateConsumer() + consumer := s.brokerDriver.CreateConsumer("requests-exchange", "aql-query-queue", "aql-query-request") consumer.SetMessageHandler(s.HandleMessage) diff --git a/internal/usecases/consume/consume_test.go b/internal/usecases/consume/consume_test.go index e6e70dc..92b9cea 100644 --- a/internal/usecases/consume/consume_test.go +++ b/internal/usecases/consume/consume_test.go @@ -2,24 +2,21 @@ package consume import ( "encoding/json" - "query-service/internal/adapters/brokeradapter" - mockbrokerdriver "query-service/internal/drivers/brokerdriver/mock" mockkeyvaluedriver "query-service/internal/drivers/keyvaluedriver/mock" mockdatabaseinfo "query-service/internal/usecases/databaseinfo/mock" "query-service/internal/usecases/produce" mockrequest "query-service/internal/usecases/request/mock" "testing" + "git.science.uu.nl/datastrophe/broker" mockconvertquery "git.science.uu.nl/datastrophe/query-conversion/aql" "github.com/stretchr/testify/assert" ) func TestHandleCorrectMessage(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new producer service @@ -46,7 +43,7 @@ func TestHandleCorrectMessage(t *testing.T) { headers := make(map[string]interface{}) headers["sessionID"] = mockSession headers["clientID"] = mockClient - mockMessage := brokeradapter.Message{ + mockMessage := broker.Message{ Headers: headers, Body: []byte("test message"), } @@ -75,10 +72,8 @@ func TestHandleCorrectMessage(t *testing.T) { // Unit test message received with no session ID func TestHandleMessageNoSessionID(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new producer service @@ -94,7 +89,7 @@ func TestHandleMessageNoSessionID(t *testing.T) { // Create headers containing a sessionID headers := make(map[string]interface{}) - mockMessage := brokeradapter.Message{ + mockMessage := broker.Message{ Headers: headers, Body: []byte("test message"), } @@ -111,10 +106,8 @@ func TestHandleMessageNoSessionID(t *testing.T) { // Unit test receival of message and not being able to parse it func TestFailToConvertQuery(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new producer service @@ -141,7 +134,7 @@ func TestFailToConvertQuery(t *testing.T) { headers := make(map[string]interface{}) headers["sessionID"] = mockSession headers["clientID"] = mockClient - mockMessage := brokeradapter.Message{ + mockMessage := broker.Message{ Headers: headers, Body: []byte("test message"), } @@ -163,10 +156,8 @@ func TestFailToConvertQuery(t *testing.T) { // Test AQL querying error handling func TestArangoError(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new producer service @@ -193,7 +184,7 @@ func TestArangoError(t *testing.T) { headers := make(map[string]interface{}) headers["sessionID"] = mockSession headers["clientID"] = mockClient - mockMessage := brokeradapter.Message{ + mockMessage := broker.Message{ Headers: headers, Body: []byte("test message"), } diff --git a/internal/usecases/consume/handlemessage.go b/internal/usecases/consume/handlemessage.go index e816007..6351409 100644 --- a/internal/usecases/consume/handlemessage.go +++ b/internal/usecases/consume/handlemessage.go @@ -2,13 +2,14 @@ package consume import ( "encoding/json" - "query-service/internal/adapters/brokeradapter" "query-service/pkg/errorhandler" "strings" + + "git.science.uu.nl/datastrophe/broker" ) // HandleMessage gets called when a message is received -func (s *Service) HandleMessage(msg *brokeradapter.Message) { +func (s *Service) HandleMessage(msg *broker.Message) { // Grab sessionID and clientID from the headers sessionID, ok := msg.Headers["sessionID"].(string) if !ok { diff --git a/internal/usecases/produce/produce.go b/internal/usecases/produce/produce.go index 85bff2a..1059564 100644 --- a/internal/usecases/produce/produce.go +++ b/internal/usecases/produce/produce.go @@ -1,19 +1,20 @@ package produce import ( - "query-service/internal/drivers/brokerdriver" "query-service/internal/drivers/keyvaluedriver" + + "git.science.uu.nl/datastrophe/broker" ) // Service wraps consumer methods type Service struct { - brokerDriver brokerdriver.Broker + brokerDriver broker.Interface keyValueStore keyvaluedriver.KeyValueStoreInterface - producerDriver brokerdriver.Producer + producerDriver broker.Producer } // NewService creates a new service -func NewService(broker brokerdriver.Broker, keyValueStore keyvaluedriver.KeyValueStoreInterface) *Service { +func NewService(broker broker.Interface, keyValueStore keyvaluedriver.KeyValueStoreInterface) *Service { return &Service{ brokerDriver: broker, keyValueStore: keyValueStore, @@ -23,7 +24,7 @@ func NewService(broker brokerdriver.Broker, keyValueStore keyvaluedriver.KeyValu // Start starts the producer func (s *Service) Start() { // Create producer - p := s.brokerDriver.CreateProducer() + p := s.brokerDriver.CreateProducer("ui-direct-exchange") s.producerDriver = p } diff --git a/internal/usecases/produce/produce_test.go b/internal/usecases/produce/produce_test.go index ff3f5fe..4d025d1 100644 --- a/internal/usecases/produce/produce_test.go +++ b/internal/usecases/produce/produce_test.go @@ -1,20 +1,17 @@ package produce import ( - "query-service/internal/adapters/brokeradapter" - mockbrokerdriver "query-service/internal/drivers/brokerdriver/mock" mockkeyvaluedriver "query-service/internal/drivers/keyvaluedriver/mock" "testing" + "git.science.uu.nl/datastrophe/broker" "github.com/stretchr/testify/assert" ) // Make sure a correct message gets published func TestPublishCorrectMessage(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new service and start it @@ -48,10 +45,8 @@ func TestPublishCorrectMessage(t *testing.T) { // Test publishing message without setting routing in key value store func TestPublishMessageNoRouting(t *testing.T) { - // Create broker adapter - brokerAdapter := brokeradapter.CreateGateway() // Create a mock broker - mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + mockBroker := broker.NewMockDriver().(*broker.MockDriver) // Create mock key value store keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() // Create new service and start it diff --git a/internal/usecases/produce/publishmessage.go b/internal/usecases/produce/publishmessage.go index dffd5a8..6e64247 100644 --- a/internal/usecases/produce/publishmessage.go +++ b/internal/usecases/produce/publishmessage.go @@ -3,8 +3,6 @@ package produce import ( "fmt" "query-service/pkg/logger" - - "github.com/streadway/amqp" ) // PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID @@ -21,8 +19,8 @@ func (s *Service) PublishMessage(data *[]byte, sessionID *string) { return } - headers := amqp.Table{} + headers := make(map[string]interface{}) headers["sessionID"] = *sessionID headers["type"] = "queryResult" - s.producerDriver.PublishMessage(data, &clientQueueID, &headers) + s.producerDriver.PublishMessage(data, clientQueueID, &headers) } -- GitLab