Skip to content
Snippets Groups Projects
Commit 9ab50a17 authored by Heijden,T.A.J. van der (Thijs)'s avatar Heijden,T.A.J. van der (Thijs)
Browse files

Merge branch 'externalize-packages' into 'develop'

Externalize packages

See merge request datastrophe/microservices-backbone/query-service!28
parents 4a9f3e31 c367316d
No related branches found
No related tags found
No related merge requests found
Showing
with 66 additions and 541 deletions
......@@ -3,10 +3,10 @@
lint: dep ## Lint the files
@golint -set_exit_status ./...
test: ## Run unittests
test: dep ## Run unittests
@go test -cover -coverprofile=coverage.txt -covermode count ./...
dep: ## Get the dependencies
dep: login ## Get the dependencies
@go get -a -v ./...
@go get google.golang.org/grpc/internal/transport@v1.37.0
@go get google.golang.org/grpc@v1.37.0
......@@ -26,7 +26,7 @@ macos:
$(eval export GOOS := darwin)
@go build -o builds/main ./cmd/query-service/
linux: # Build for linux
linux: login # Build for linux
$(eval export GOOS := linux)
CGO_ENABLED=0 go build -o builds/main ./cmd/query-service/
......@@ -34,7 +34,6 @@ run:
./builds/main
develop-sivan:
$(eval export RABBIT_USER := haha-test)
$(eval export RABBIT_PASSWORD := dikkedraak)
$(eval export RABBIT_HOST := 192.168.178.158)
......@@ -48,7 +47,6 @@ develop-sivan:
develop:
$(eval export RABBIT_USER := guest)
$(eval export RABBIT_PASSWORD := guest)
$(eval export RABBIT_HOST := localhost)
......@@ -62,14 +60,17 @@ develop:
@go run cmd/query-service/main.go
docker:
docker: login
make linux
@docker build -t query-service:latest .
@docker tag query-service:latest datastropheregistry.azurecr.io/query-service:latest
@docker push datastropheregistry.azurecr.io/query-service\:latest
staging:
staging: login
make linux
@docker build -t query-service-staging:latest .
@docker tag query-service-staging:latest datastropheregistry.azurecr.io/query-service-staging:latest
@docker push datastropheregistry.azurecr.io/query-service-staging:latest
\ No newline at end of file
@docker push datastropheregistry.azurecr.io/query-service-staging:latest
login:
echo -e "machine git.science.uu.nl\nlogin gitlab-ci-token\npassword ${CI_JOB_TOKEN}" > ~/.netrc
\ No newline at end of file
package main
import (
"query-service/internal/adapters/brokeradapter"
"query-service/internal/drivers/brokerdriver"
"query-service/internal/drivers/keyvaluedriver"
"query-service/internal/drivers/rpcdriver"
"query-service/internal/usecases/consume"
"query-service/internal/usecases/convertquery"
"query-service/internal/usecases/databaseinfo"
"query-service/internal/usecases/produce"
"query-service/internal/usecases/request"
"query-service/pkg/logger"
"git.science.uu.nl/datastrophe/broker"
"git.science.uu.nl/datastrophe/keyvaluestore"
"git.science.uu.nl/datastrophe/query-conversion/aql"
"github.com/thijsheijden/alice"
)
......@@ -20,29 +19,28 @@ func main() {
logger.Start()
// MARK: Create relevant services
redisService := keyvaluedriver.NewRedisDriver()
keyValueStore := keyvaluestore.NewDriver()
// Create new rpc driver
rpcDriver := rpcdriver.New()
// MARK: Create alice RabbitMQ services
brokerGateway := brokeradapter.CreateGateway()
aliceBroker := brokerdriver.CreateAliceBroker(brokerGateway)
broker := broker.NewDriver()
// Instantiate an implementation of the produce UseCase
produceService := produce.NewService(aliceBroker, redisService)
produceService := produce.NewService(broker, keyValueStore)
// MARK: Create relevant services for consuming a message
convertQueryService := convertquery.NewService()
convertQueryService := aql.NewService()
requestSenderService := request.NewService()
databaseInfoService := databaseinfo.NewService(rpcDriver)
consumeService := consume.NewService(aliceBroker, produceService, convertQueryService, requestSenderService, databaseInfoService)
consumeService := consume.NewService(broker, produceService, convertQueryService, requestSenderService, databaseInfoService)
// MARK: Start services
redisService.Start()
keyValueStore.Connect()
produceService.Start()
go consumeService.Start()
......
......@@ -3,16 +3,14 @@ module query-service
go 1.15
require (
git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58
git.science.uu.nl/datastrophe/keyvaluestore v0.0.0-20210517170603-34902cd5c90d
git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0
github.com/arangodb/go-driver v0.0.0-20210506071742-64f314d85db7
github.com/boumenot/gocover-cobertura v1.1.0 // indirect
github.com/go-redis/redis/v8 v8.8.2
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.7.0
github.com/thijsheijden/alice v0.1.18
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 // indirect
golang.org/x/tools v0.1.1 // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
google.golang.org/grpc v1.37.0
google.golang.org/protobuf v1.26.0
)
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58 h1:hnPHRwbH3EwMO+5XSPR1JFusrmxx/dq84mt5wKml1jw=
git.science.uu.nl/datastrophe/broker v0.0.0-20210516094125-abbeaf96fd58/go.mod h1:+ua8t+K6R+rF4zllcXH3QYPzpB+8bsAsw1/h6kflwfM=
git.science.uu.nl/datastrophe/keyvaluestore v0.0.0-20210517170603-34902cd5c90d h1:0F1IcemW3WmwZ0yPsFYIwo+MVlBOLHIcyMM7qKNH3hc=
git.science.uu.nl/datastrophe/keyvaluestore v0.0.0-20210517170603-34902cd5c90d/go.mod h1:8fw3mDyMATpBNUhd3T1Me0FvYRGIYtyRgP6Q4EyBUgE=
git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0 h1:UrYqOFjIFxaHtmzqsoId48g/jPEdGX4MCX5sDjbzBSI=
git.science.uu.nl/datastrophe/query-conversion v0.0.0-20210517164802-5852eee71ec0/go.mod h1:6rvalwekoukmVu3SbWmZkj8wBZEm34wDbA4Ilxcb+jw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/arangodb/go-driver v0.0.0-20210506071742-64f314d85db7 h1:xRFEg4kM17h2fIOt3o4+mws3uRpw5rqWtemPYKfTURg=
github.com/arangodb/go-driver v0.0.0-20210506071742-64f314d85db7/go.mod h1:3NUekcRLpgheFIGEwcOvxilEW73MV1queNKW58k7sdc=
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g=
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho=
github.com/boumenot/gocover-cobertura v1.1.0 h1:nqMsp1zONyd3Bz98gscF2CpMupSCY0BQA4nmLhkcoYQ=
github.com/boumenot/gocover-cobertura v1.1.0/go.mod h1:fz7ly8dslE42VRR5ZWLt2OHGDHjkTiA2oNvKgJEjLT0=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
......@@ -74,15 +78,12 @@ github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/thijsheijden/alice v0.1.18 h1:lHmhzruprpH0FH6XlCg8zDqTZI15Iuuj9VCNvH6a8Wc=
github.com/thijsheijden/alice v0.1.18/go.mod h1:lYOP30HKhw/7xJa3lLhs+Xsdc5T7MRo7DOb/npzfg9I=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/otel v0.19.0 h1:Lenfy7QHRXPZVsw/12CWpxX6d/JkrX8wrx2vO8G80Ng=
go.opentelemetry.io/otel v0.19.0/go.mod h1:j9bF567N9EfomkSidSfmMwIwIBuP37AMAIzVW85OxSg=
go.opentelemetry.io/otel/metric v0.19.0 h1:dtZ1Ju44gkJkYvo+3qGqVXmf88tc+a42edOywypengg=
......@@ -98,13 +99,7 @@ golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
......@@ -112,11 +107,9 @@ golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73r
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb h1:eBmm0M9fYhWpKZLjQUUKka/LtIxf46G4fxeEz5KJr9U=
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
......@@ -127,7 +120,6 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......@@ -140,9 +132,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744 h1:yhBbb4IRs2HS9PPlAg6DMC6mUOKexJBNsLf4Z+6En1Q=
golang.org/x/sys v0.0.0-20210511113859-b0526f3d8744/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k=
......@@ -154,12 +145,8 @@ golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190828213141-aed303cbaa74/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200526224456-8b020aee10d2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200818005847-188abfa75333/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.1 h1:wGiQel/hW0NnEkJUk8lbzkX2gFJU6PFxf1v5OlCfuOs=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
......
package brokeradapter
import "github.com/streadway/amqp"
// A Message describes a standard message queue message within the service
type Message struct {
Headers map[string]interface{}
Body []byte
}
// An Gateway converts AMQP formats to universal formats
type Gateway struct {
}
// CreateGateway creates a gateway
func CreateGateway() *Gateway {
return &Gateway{}
}
// TransformMessage transforms an AMQP delivery into a general format
func (a *Gateway) TransformMessage(msg amqp.Delivery) *Message {
return &Message{
Headers: (map[string]interface{})(msg.Headers),
Body: msg.Body,
}
}
package brokeradapter
import "github.com/streadway/amqp"
// GatewayInterface is an interface describing a GateWay
type GatewayInterface interface {
TransformMessage(msg amqp.Delivery) *Message
}
package brokerdriver
import (
"log"
"os"
"query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler"
"query-service/pkg/logger"
"strconv"
"time"
"github.com/thijsheijden/alice"
)
// Driver models an Alice RabbitMQ broker
type Driver struct {
broker alice.Broker
gateway brokeradapter.GatewayInterface
}
// CreateAliceBroker creates an Alice broker
func CreateAliceBroker(gateway brokeradapter.GatewayInterface) *Driver {
// Create connection config using environment variables
rabbitUser := os.Getenv("RABBIT_USER")
rabbitPassword := os.Getenv("RABBIT_PASSWORD")
rabbitHost := os.Getenv("RABBIT_HOST")
rabbitPort, err := strconv.Atoi(os.Getenv("RABBIT_PORT"))
errorhandler.FailWithError(err, "invalid rabbitmq port given")
config := alice.CreateConfig(rabbitUser, rabbitPassword, rabbitHost, rabbitPort, true, time.Minute*1, alice.DefaultErrorHandler)
// Attempt to create a broker, if an error is returned retry the connection every 10 seconds
broker, err := alice.CreateBroker(config)
if err != nil {
errorhandler.FailWithError(err, err.Error())
errorhandler.LogError(err, "Failed to connect to RabbitMQ")
// Create 10 second ticker
ticker := time.NewTicker(time.Second * 10)
done := make(chan bool, 1)
for {
select {
case <-ticker.C:
logger.Log("Retrying RabbitMQ connection")
broker, err = alice.CreateBroker(config)
if err == nil {
done <- true
ticker.Stop()
}
case <-done:
log.Println("Succesfully connected to broker")
return &Driver{
broker: broker,
gateway: gateway,
}
}
}
}
// Return the created driver
// This code only gets called if the broker creation works on the first try, which it more often than not does
return &Driver{
broker: broker,
gateway: gateway,
}
}
// CreateConsumer creates an AliceConsumer on a certain exchange and queue
func (d *Driver) CreateConsumer() Consumer {
exchangeID := "requests-exchange"
routingKey := "aql-query-request"
// Declare the exchange we want to bind to
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange")
}
// Declare the queue we will consume from
queue := alice.CreateQueue(exchange, "aql-query-queue", true, false, true, false, nil)
// Create the consumer
c, err := d.broker.CreateConsumer(queue, routingKey, "", alice.DefaultConsumerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to create consumer")
}
consumer := &AliceConsumer{
broker: d,
consumer: c,
}
return consumer
}
// CreateProducer creates an AliceProducer on a certain exchange
func (d *Driver) CreateProducer() Producer {
exchangeID := "ui-direct-exchange"
exchange, err := alice.CreateDefaultExchange(exchangeID, alice.Direct)
if err != nil {
errorhandler.FailWithError(err, "failed to create exchange for producer")
}
p, err := d.broker.CreateProducer(exchange, alice.DefaultProducerErrorHandler)
if err != nil {
errorhandler.FailWithError(err, "failed to created producer")
}
producer := &AliceProducer{
broker: d,
producer: p,
}
return producer
}
package brokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// AliceConsumer models a RabbitMQ consumer in Alice
type AliceConsumer struct {
broker *Driver
consumer alice.Consumer
messageHandler func(msg *brokeradapter.Message)
}
// ConsumeMessages starts the consumer using an alice consumer
func (ac *AliceConsumer) ConsumeMessages() {
go ac.consumer.ConsumeMessages(nil, false, ac.handleMessage)
}
func (ac *AliceConsumer) handleMessage(msg amqp.Delivery) {
// Convert message using the gateway
// Pass message to the message handler
ac.messageHandler(ac.broker.gateway.TransformMessage(msg))
// Acknowledge the message was received
msg.Ack(true)
}
// SetMessageHandler sets the message handler to the supplied function
func (ac *AliceConsumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
ac.messageHandler = handler
}
package brokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
)
// Broker models a message broker
type Broker interface {
CreateConsumer() Consumer
CreateProducer() Producer
}
// A Consumer belongs to a broker and consumes messages from a queue
type Consumer interface {
ConsumeMessages()
SetMessageHandler(handler func(msg *brokeradapter.Message))
}
// A Producer belongs to a broker and publishes messages to a queue
type Producer interface {
PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table)
}
package mockbrokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"query-service/internal/drivers/brokerdriver"
)
// Driver is mock gateway
type Driver struct {
gateway brokeradapter.GatewayInterface
// Mock messages that are published by producers on this broker
// Key is the routing key
// Value is a slice of messages, in order of being sent 'first -> last'
Messages map[string][]brokeradapter.Message
}
// CreateBroker creates a broker driver (mock)
func CreateBroker(gateway brokeradapter.GatewayInterface) *Driver {
return &Driver{
gateway: gateway,
Messages: make(map[string][]brokeradapter.Message),
}
}
// CreateConsumer creates a consumer (mock)
func (d *Driver) CreateConsumer() brokerdriver.Consumer {
return &Consumer{
broker: d,
}
}
// CreateProducer creates a producer (mock)
func (d *Driver) CreateProducer() brokerdriver.Producer {
return &Producer{
broker: d,
exchange: "ui-direct-exchange", // This is the only exchange this service produces to
}
}
package mockbrokerdriver
import "query-service/internal/adapters/brokeradapter"
// A Consumer implements the consumer interface (mock)
type Consumer struct {
broker *Driver
}
// ConsumeMessages consumes messages from the broker (mock)
func (c *Consumer) ConsumeMessages() {
}
// SetMessageHandler mocks the setting of a message handler (mock)
func (c *Consumer) SetMessageHandler(handler func(msg *brokeradapter.Message)) {
}
package mockbrokerdriver
import (
"query-service/internal/adapters/brokeradapter"
"github.com/streadway/amqp"
)
// A Producer implements the producer interface (mock)
type Producer struct {
broker *Driver
// The exchange this producer is connected to
exchange string
}
// PublishMessage publishes a message to the given queue (mock)
func (p *Producer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) {
// Create the message
msg := brokeradapter.Message{
Headers: *headers,
Body: *body,
}
// Append the message to the list
p.broker.Messages[*routingKey] = append(p.broker.Messages[*routingKey], msg)
}
package brokerdriver
import (
"fmt"
"query-service/pkg/logger"
"github.com/streadway/amqp"
"github.com/thijsheijden/alice"
)
// AliceProducer models a RabbitMQ producer in Alice
type AliceProducer struct {
broker *Driver
producer alice.Producer
}
// PublishMessage will publish a message to the specified queue id (mock)
func (ap *AliceProducer) PublishMessage(body *[]byte, routingKey *string, headers *amqp.Table) {
sessionID := (*headers)["sessionID"]
logger.Log(fmt.Sprintf("Publishing message to queue %v, for session %v", *routingKey, sessionID))
ap.producer.PublishMessage(*body, routingKey, headers)
}
package keyvaluedriver
// KeyValueStoreInterface is an interface for a key value storage
type KeyValueStoreInterface interface {
Get(key *string) string
Set(key *string, value *string) error
}
package keyvaluedriver
import (
"context"
"fmt"
"os"
"query-service/pkg/logger"
"github.com/go-redis/redis/v8"
)
// KeyValueStore models the redis driver
type KeyValueStore struct {
client *redis.Client
}
// NewRedisDriver creates and returns a redis driver
func NewRedisDriver() *KeyValueStore {
return &KeyValueStore{}
}
// Start starts the redis driver
func (d *KeyValueStore) Start() {
// Grab the redis host and port from environment vars
redisAddress := os.Getenv("REDIS_ADDRESS")
// Create redis client
d.client = redis.NewClient(&redis.Options{
Addr: redisAddress,
})
pong := d.client.Ping(context.Background())
logger.Log(fmt.Sprintf("%v", pong))
}
// Get retrieves the value from the redis store that belongs to the given key
func (d *KeyValueStore) Get(key *string) string {
return d.client.Get(context.Background(), *key).Val()
}
// Set sets the key value pair in the redis store
func (d *KeyValueStore) Set(key *string, value *string) error {
status := d.client.Set(context.Background(), *key, *value, 0)
return status.Err()
}
package mockkeyvaluedriver
// A KeyValueStore implements methods to set key-value data (mock)
type KeyValueStore struct {
data map[string]string
}
// CreateKeyValueStore creates a key value store driver (mock)
func CreateKeyValueStore() *KeyValueStore {
return &KeyValueStore{
data: make(map[string]string),
}
}
// Set sets a key to a value in the key value store. Expects a non-pointer as value. (mock)
func (kvs *KeyValueStore) Set(key *string, value *string) error {
kvs.data[*key] = *value
return nil
}
// Get gets the value for the supplied key from the key value store (mock)
func (kvs *KeyValueStore) Get(key *string) string {
return kvs.data[*key]
}
package entity
// QueryParsedJSON is used for JSON conversion of the incoming byte array
type QueryParsedJSON struct {
DatabaseName string
Return QueryReturnStruct
Entities []QueryEntityStruct
Relations []QueryRelationStruct
// Limit is for limiting the amount of paths AQL will return in a relation let statement
Limit int
Modifiers []QueryModifierStruct
}
// QueryReturnStruct holds the indices of the entities and relations that need to be returned
type QueryReturnStruct struct {
Entities []int
Relations []int
//Modifiers []int
}
// QueryEntityStruct encapsulates a single entity with its corresponding constraints
type QueryEntityStruct struct {
Type string
Constraints []QueryConstraintStruct
}
// QueryRelationStruct encapsulates a single relation with its corresponding constraints
type QueryRelationStruct struct {
Type string
EntityFrom int
EntityTo int
Depth QuerySearchDepthStruct
Constraints []QueryConstraintStruct
}
// QueryModifierStruct encapsulates a single modifier with its corresponding constraints
type QueryModifierStruct struct {
Type string // SUM COUNT AVG
SelectedType string // node relation
ID int // ID of the enitity or relation
AttributeIndex int // = -1 if its the node or relation, = > -1 if an attribute is selected
}
// QuerySearchDepthStruct holds the range of traversals for the relation
type QuerySearchDepthStruct struct {
Min int
Max int
}
// QueryConstraintStruct holds the information of the constraint
// Constraint datatypes
// text MatchTypes: exact/contains/startswith/endswith
// number MatchTypes: GT/LT/EQ
// bool MatchTypes: EQ/NEQ
type QueryConstraintStruct struct {
Attribute string
Value string
DataType string
MatchType string
}
package consume
import (
"query-service/internal/drivers/brokerdriver"
"query-service/internal/usecases/convertquery"
"query-service/internal/usecases/databaseinfo"
"query-service/internal/usecases/produce"
"query-service/internal/usecases/request"
"git.science.uu.nl/datastrophe/broker"
"git.science.uu.nl/datastrophe/query-conversion"
)
// Service wraps consumer methods
// broker is Alice broker created in brockerdriver driver
type Service struct {
broker brokerdriver.Broker
brokerDriver broker.Interface
producer produce.UseCase
queryConverter convertquery.UseCase
queryConverter query.Converter
requestSender request.UseCase
databaseInfoService databaseinfo.UseCase
}
// NewService creates a new service
func NewService(broker brokerdriver.Broker, produceService produce.UseCase, convertQueryService convertquery.UseCase, requestSenderService request.UseCase, databaseInfoService databaseinfo.UseCase) *Service {
func NewService(broker broker.Interface, produceService produce.UseCase, queryConverter query.Converter, requestSenderService request.UseCase, databaseInfoService databaseinfo.UseCase) *Service {
return &Service{
broker: broker,
brokerDriver: broker,
producer: produceService,
queryConverter: convertQueryService,
queryConverter: queryConverter,
requestSender: requestSenderService,
databaseInfoService: databaseInfoService,
}
......@@ -33,7 +34,7 @@ func NewService(broker brokerdriver.Broker, produceService produce.UseCase, conv
func (s *Service) Start() {
// Create consumer
consumer := s.broker.CreateConsumer()
consumer := s.brokerDriver.CreateConsumer("requests-exchange", "aql-query-queue", "aql-query-request")
consumer.SetMessageHandler(s.HandleMessage)
......
package consume
import (
"context"
"encoding/json"
"query-service/internal/adapters/brokeradapter"
mockbrokerdriver "query-service/internal/drivers/brokerdriver/mock"
mockkeyvaluedriver "query-service/internal/drivers/keyvaluedriver/mock"
mockconvertquery "query-service/internal/usecases/convertquery/mock"
mockdatabaseinfo "query-service/internal/usecases/databaseinfo/mock"
"query-service/internal/usecases/produce"
mockrequest "query-service/internal/usecases/request/mock"
"testing"
"git.science.uu.nl/datastrophe/broker"
"git.science.uu.nl/datastrophe/keyvaluestore"
mockconvertquery "git.science.uu.nl/datastrophe/query-conversion/aql"
"github.com/stretchr/testify/assert"
)
func TestHandleCorrectMessage(t *testing.T) {
// Create broker adapter
brokerAdapter := brokeradapter.CreateGateway()
// Create a mock broker
mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter)
mockBroker := broker.NewMockDriver().(*broker.MockDriver)
// Create mock key value store
keyValueStore := mockkeyvaluedriver.CreateKeyValueStore()
keyValueStore := keyvaluestore.NewMockDriver()
// Create new producer service
producerService := produce.NewService(mockBroker, keyValueStore)
producerService.Start()
// Create new convert query service
convertQueryService := mockconvertquery.NewService()
convertQueryService := mockconvertquery.NewMockService()
// Create new request sender service
requestSenderService := mockrequest.NewService()
// Create mock databaseinfo service
......@@ -39,13 +38,13 @@ func TestHandleCorrectMessage(t *testing.T) {
mockQueue := "mock-queue"
// Set the test-session sessionID queue to mock-queue in key value store
keyValueStore.Set(&mockSession, &mockQueue)
keyValueStore.Set(context.Background(), mockSession, mockQueue)
// Create headers containing a sessionID
headers := make(map[string]interface{})
headers["sessionID"] = mockSession
headers["clientID"] = mockClient
mockMessage := brokeradapter.Message{
mockMessage := broker.Message{
Headers: headers,
Body: []byte("test message"),
}
......@@ -74,16 +73,14 @@ func TestHandleCorrectMessage(t *testing.T) {
// Unit test message received with no session ID
func TestHandleMessageNoSessionID(t *testing.T) {
// Create broker adapter
brokerAdapter := brokeradapter.CreateGateway()
// Create a mock broker
mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter)
mockBroker := broker.NewMockDriver().(*broker.MockDriver)
// Create mock key value store
keyValueStore := mockkeyvaluedriver.CreateKeyValueStore()
keyValueStore := keyvaluestore.NewMockDriver()
// Create new producer service
producerService := produce.NewService(mockBroker, keyValueStore)
// Create new convert query service
convertQueryService := mockconvertquery.NewService()
convertQueryService := mockconvertquery.NewMockService()
// Create new request sender service
requestSenderService := mockrequest.NewService()
// Create mock databaseinfo service
......@@ -93,7 +90,7 @@ func TestHandleMessageNoSessionID(t *testing.T) {
// Create headers containing a sessionID
headers := make(map[string]interface{})
mockMessage := brokeradapter.Message{
mockMessage := broker.Message{
Headers: headers,
Body: []byte("test message"),
}
......@@ -110,17 +107,15 @@ func TestHandleMessageNoSessionID(t *testing.T) {
// Unit test receival of message and not being able to parse it
func TestFailToConvertQuery(t *testing.T) {
// Create broker adapter
brokerAdapter := brokeradapter.CreateGateway()
// Create a mock broker
mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter)
mockBroker := broker.NewMockDriver().(*broker.MockDriver)
// Create mock key value store
keyValueStore := mockkeyvaluedriver.CreateKeyValueStore()
keyValueStore := keyvaluestore.NewMockDriver()
// Create new producer service
producerService := produce.NewService(mockBroker, keyValueStore)
producerService.Start()
// Create new convert query service
convertQueryService := mockconvertquery.NewService()
convertQueryService := mockconvertquery.NewMockService()
// Create new request sender service
requestSenderService := mockrequest.NewService()
// Create mock databaseinfo service
......@@ -134,13 +129,13 @@ func TestFailToConvertQuery(t *testing.T) {
mockQueue := "mock-queue"
// Set the test-session sessionID queue to mock-queue in key value store
keyValueStore.Set(&mockSession, &mockQueue)
keyValueStore.Set(context.Background(), mockSession, mockQueue)
// Create headers containing a sessionID
headers := make(map[string]interface{})
headers["sessionID"] = mockSession
headers["clientID"] = mockClient
mockMessage := brokeradapter.Message{
mockMessage := broker.Message{
Headers: headers,
Body: []byte("test message"),
}
......@@ -162,17 +157,15 @@ func TestFailToConvertQuery(t *testing.T) {
// Test AQL querying error handling
func TestArangoError(t *testing.T) {
// Create broker adapter
brokerAdapter := brokeradapter.CreateGateway()
// Create a mock broker
mockBroker := mockbrokerdriver.CreateBroker(brokerAdapter)
mockBroker := broker.NewMockDriver().(*broker.MockDriver)
// Create mock key value store
keyValueStore := mockkeyvaluedriver.CreateKeyValueStore()
keyValueStore := keyvaluestore.NewMockDriver()
// Create new producer service
producerService := produce.NewService(mockBroker, keyValueStore)
producerService.Start()
// Create new convert query service
convertQueryService := mockconvertquery.NewService()
convertQueryService := mockconvertquery.NewMockService()
// Create new request sender service
requestSenderService := mockrequest.NewService()
// Create mock databaseinfo service
......@@ -186,13 +179,13 @@ func TestArangoError(t *testing.T) {
mockQueue := "mock-queue"
// Set the test-session sessionID queue to mock-queue in key value store
keyValueStore.Set(&mockSession, &mockQueue)
keyValueStore.Set(context.Background(), mockSession, mockQueue)
// Create headers containing a sessionID
headers := make(map[string]interface{})
headers["sessionID"] = mockSession
headers["clientID"] = mockClient
mockMessage := brokeradapter.Message{
mockMessage := broker.Message{
Headers: headers,
Body: []byte("test message"),
}
......
......@@ -2,13 +2,14 @@ package consume
import (
"encoding/json"
"query-service/internal/adapters/brokeradapter"
"query-service/pkg/errorhandler"
"strings"
"git.science.uu.nl/datastrophe/broker"
)
// HandleMessage gets called when a message is received
func (s *Service) HandleMessage(msg *brokeradapter.Message) {
func (s *Service) HandleMessage(msg *broker.Message) {
// Grab sessionID and clientID from the headers
sessionID, ok := msg.Headers["sessionID"].(string)
if !ok {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment