diff --git a/internal/usecases/produce/produce.go b/internal/usecases/produce/produce.go index d846a0b795c5718a901d241e1c6372aa14470f6d..85bff2a1c15774a55a05982025aaeb4bc907f508 100644 --- a/internal/usecases/produce/produce.go +++ b/internal/usecases/produce/produce.go @@ -1,11 +1,8 @@ package produce import ( - "log" "query-service/internal/drivers/brokerdriver" "query-service/internal/drivers/keyvaluedriver" - - "github.com/streadway/amqp" ) // Service wraps consumer methods @@ -29,19 +26,4 @@ func (s *Service) Start() { p := s.brokerDriver.CreateProducer() s.producerDriver = p - - // // Start consuming messages - // p.ConsumeMessages() -} - -// PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID -func (s *Service) PublishMessage(data *[]byte, sessionID *string) { - clientUpdaterID := s.keyValueStore.Get(sessionID) - - log.Println(string(*data)) - - headers := amqp.Table{} - headers["sessionID"] = sessionID - headers["type"] = "queryResult" - s.producerDriver.PublishMessage(data, &clientUpdaterID, &headers) } diff --git a/internal/usecases/produce/produce_test.go b/internal/usecases/produce/produce_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ff3f5fefde093ed8b0ba1c39be4c28444ac3b1ae --- /dev/null +++ b/internal/usecases/produce/produce_test.go @@ -0,0 +1,77 @@ +package produce + +import ( + "query-service/internal/adapters/brokeradapter" + mockbrokerdriver "query-service/internal/drivers/brokerdriver/mock" + mockkeyvaluedriver "query-service/internal/drivers/keyvaluedriver/mock" + "testing" + + "github.com/stretchr/testify/assert" +) + +// Make sure a correct message gets published +func TestPublishCorrectMessage(t *testing.T) { + // Create broker adapter + brokerAdapter := brokeradapter.CreateGateway() + // Create a mock broker + mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + // Create mock key value store + keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() + // Create new service and start it + service := NewService(mockBroker, keyValueStore) + service.Start() + + // Create mock session and mock queue + mockSession := "mock-session" + mockQueue := "mock-queue" + + // Set the test-session sessionID queue to mock-queue in key value store + keyValueStore.Set(&mockSession, &mockQueue) + + // Create headers containing a sessionID + headers := make(map[string]interface{}) + headers["sessionID"] = mockSession + + // Assert that there have not been any messages sent yet + assert.Empty(t, mockBroker.Messages) + + // Publish the message + mockMessage := []byte("Test message") + service.PublishMessage(&mockMessage, &mockSession) + + // Assert that there is now 1 message + assert.Len(t, mockBroker.Messages[mockQueue], 1) + + // Assert that this message contains the mockMessage + assert.Equal(t, mockBroker.Messages[mockQueue][0].Body, mockMessage) +} + +// Test publishing message without setting routing in key value store +func TestPublishMessageNoRouting(t *testing.T) { + // Create broker adapter + brokerAdapter := brokeradapter.CreateGateway() + // Create a mock broker + mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter) + // Create mock key value store + keyValueStore := mockkeyvaluedriver.CreateKeyValueStore() + // Create new service and start it + service := NewService(mockBroker, keyValueStore) + service.Start() + + // Create mock session and mock queue + mockSession := "mock-session" + + // Create headers containing a sessionID + headers := make(map[string]interface{}) + headers["sessionID"] = mockSession + + // Assert that there have not been any messages sent yet + assert.Empty(t, mockBroker.Messages) + + // Publish the message + mockMessage := []byte("Test message") + service.PublishMessage(&mockMessage, &mockSession) + + // Assert that there are still 0 messages + assert.Empty(t, mockBroker.Messages) +} diff --git a/internal/usecases/produce/publishmessage.go b/internal/usecases/produce/publishmessage.go new file mode 100644 index 0000000000000000000000000000000000000000..1b46dd4239d7822db3b8413dfb8f764913baa44c --- /dev/null +++ b/internal/usecases/produce/publishmessage.go @@ -0,0 +1,22 @@ +package produce + +import ( + "github.com/streadway/amqp" +) + +// PublishMessage will publish the message to the queue retrieved from the key value store, with the given sessionID +func (s *Service) PublishMessage(data *[]byte, sessionID *string) { + // Use the sessionID to query the key value store to get the queue we need to send this message to + clientQueueID := s.keyValueStore.Get(sessionID) + + // If this client has now disconnected + if clientQueueID == "" { + // TODO: Decide whether to throw away the message or perhaps cache it, for now throw it away + return + } + + headers := amqp.Table{} + headers["sessionID"] = sessionID + headers["type"] = "queryResult" + s.producerDriver.PublishMessage(data, &clientQueueID, &headers) +}