Skip to content
Snippets Groups Projects
consume_test.go 7.73 KiB
package consume

import (
	"context"
	"encoding/json"
	"query-service/internal/entity"
	"query-service/internal/usecases/databaseinfo/mockdatabaseinfo"
	"query-service/internal/usecases/produce"

	"testing"

	"git.science.uu.nl/datastrophe/broker"
	"git.science.uu.nl/datastrophe/keyvaluestore"
	"git.science.uu.nl/datastrophe/objectstore"
	"git.science.uu.nl/datastrophe/query-conversion"
	"git.science.uu.nl/datastrophe/query-conversion/aql"
	queryexecution "git.science.uu.nl/datastrophe/query-execution"
	"github.com/stretchr/testify/assert"
)

type testSuite struct {
	mockObjectStore     *objectstore.MockDriver
	mockBroker          *broker.MockDriver
	mockKeyValueStore   keyvaluestore.Interface
	produceService      *produce.Service
	queryConverter      query.Converter
	requestSender       *queryexecution.MockService
	databaseInfoService *mockdatabaseinfo.Service
	service             *Service
}

var ts *testSuite

func createTestSuite() {
	if ts != nil {
		ts.reset()
		return
	}

	// Create test suite
	mockBroker := broker.NewMockDriver().(*broker.MockDriver)
	mockKeyValueStore := keyvaluestore.NewMockDriver()
	mockObjectStore := objectstore.NewMockDriver().(*objectstore.MockDriver)
	ts = &testSuite{
		mockObjectStore:     mockObjectStore,
		mockBroker:          mockBroker,
		mockKeyValueStore:   mockKeyValueStore,
		produceService:      produce.NewService(mockBroker, mockKeyValueStore),
		queryConverter:      aql.NewService(),
		requestSender:       queryexecution.NewMockService(),
		databaseInfoService: mockdatabaseinfo.NewService(),
	}
	ts.service = NewService(mockObjectStore, mockBroker, ts.produceService, ts.queryConverter, ts.requestSender, ts.databaseInfoService)

	// Set routing in the mock keyvaluestore
	ts.mockKeyValueStore.Set(context.Background(), "mock-session", "mock-queue")

	// Start producer
	ts.produceService.Start()
}

// Reset all stateful things
func (ts *testSuite) reset() {
	// Reset broker messages
	ts.mockBroker.Messages = make(map[string][]broker.Message)
}

func TestCorrectMessageHandled(t *testing.T) {
	createTestSuite()
	// Create a mock message
	headers := make(map[string]interface{})
	headers["sessionID"] = "mock-session"
	headers["clientID"] = "mock-client"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
			"databaseName": "test",
			"return": {
				"entities": [],
				"relations": []
			},
			"entities": [],
			"relations": [],
			"limit": 5000
		}`),
	}

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there now are two messages that have been sent with routing key mock-queue
	assert.Len(t, ts.mockBroker.Messages["mock-queue"], 7)

	// Assert that the first message is of type 'query_translation_result' and has 'Query converted' as value
	var translationMessage entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][2].Body, &translationMessage)
	assert.Equal(t, "query_translation_result", translationMessage.Type)

	// Assert that the second message is of type 'query_result' and contains no values
	var resultMessage entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][6].Body, &resultMessage)
	assert.Equal(t, "query_result", resultMessage.Type)
	assert.Equal(t, "test", resultMessage.Value)
}

func TestNoSessionIDHandled(t *testing.T) {
	createTestSuite()

	// Create a mock message
	headers := make(map[string]interface{})
	headers["clientID"] = "mock-client"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
		"databaseName": "test",
		"return": {
			"entities": [],
			"relations": []
		},
		"entities": [],
		"relations": [],
		"limit": 5000
	}`),
	}

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there was no message published
	assert.Empty(t, ts.mockBroker.Messages)
}

func TestBadIncomingQueryHandled(t *testing.T) {
	createTestSuite()

	// Create a mock message
	headers := make(map[string]interface{})
	headers["clientID"] = "mock-client"
	headers["sessionID"] = "mock-session"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
		"databaseName": "test",
		"return": {
			"entities": [],
			"relations": []
		},
		"entities": [],
		"relations": [],
		"limit": "test"
	}`),
	}

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there was an error message published
	var errorMsg entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][1].Body, &errorMsg)
	assert.Equal(t, "query_translation_error", errorMsg.Type)
}

func TestDatabaseErrorHandled(t *testing.T) {
	createTestSuite()

	// Create a mock message
	headers := make(map[string]interface{})
	headers["clientID"] = "mock-client"
	headers["sessionID"] = "mock-session"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
		"databaseName": "test",
		"return": {
			"entities": [],
			"relations": []
		},
		"entities": [],
		"relations": [],
		"limit": 5000
	}`),
	}

	// Make it so that the request sender service throws an error
	ts.requestSender.ToggleError()

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there was an error message published
	var errorMsg entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][5].Body, &errorMsg)
	assert.Equal(t, "query_database_error", errorMsg.Type)

	// Turn of error throwing in request service
	ts.requestSender.ToggleError()
}

func TestQueryConversionErrorHandled(t *testing.T) {
	createTestSuite()

	// Create a mock message
	headers := make(map[string]interface{})
	headers["clientID"] = "mock-client"
	headers["sessionID"] = "mock-session"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
		"databaseName": "test",
		"return": {
			"entities": [
				1
			],
			"relations": []
		},
		"entities": [
			{
				"type": "airports",
				"constraints": [
					{
						"attribute": "state",
						"value": "HI",
						"dataType": "text",
						"matchType": "exact"
					}
				]
			}
		],
		"relations": [],
		"limit": 5000
	}`),
	}

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there was an error message published
	var errorMsg entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][2].Body, &errorMsg)
	assert.Equal(t, "query_translation_error", errorMsg.Type)
}

func TestNoDatabaseNameHandled(t *testing.T) {
	createTestSuite()

	// Create a mock message
	headers := make(map[string]interface{})
	headers["clientID"] = "mock-client"
	headers["sessionID"] = "mock-session"
	headers["queryID"] = "mock-query"
	mockMessage := broker.Message{
		Headers: headers,
		Body: []byte(`{
			"return": {
				"entities": [],
				"relations": []
			},
			"entities": [],
			"relations": [],
			"limit": 5000
		}`),
	}

	// Assert that there have not been any messages sent yet
	assert.Empty(t, ts.mockBroker.Messages)

	// Send the mock message
	ts.service.HandleMessage(&mockMessage)

	// Assert that there was an error message published
	var errorMsg entity.MessageStruct
	json.Unmarshal(ts.mockBroker.Messages["mock-queue"][1].Body, &errorMsg)
	assert.Equal(t, "query_malformed_request_error", errorMsg.Type)
	assert.Equal(t, "no database name supplied", errorMsg.Value)
}