From 75771521c9edc74315a9805c0143bf27288f3d24 Mon Sep 17 00:00:00 2001 From: Leonardo Christino <leomilho@gmail.com> Date: Wed, 23 Aug 2023 14:06:43 +0200 Subject: [PATCH] remove minio dependency and refactor redis keys for better understanding solves #DEV-4 --- Makefile | 8 +----- cmd/schema-orchestrator/main.go | 26 +++++-------------- go.mod | 2 +- go.sum | 4 +-- .../usecases/produce/produceCachedSchema.go | 3 ++- internal/usecases/schema/retrieve.go | 5 ++-- .../usecases/schema/retrieveCachedSchema.go | 7 +++-- internal/usecases/schema/schema.go | 6 ++--- 8 files changed, 22 insertions(+), 39 deletions(-) diff --git a/Makefile b/Makefile index 0e1a32d..9db6217 100644 --- a/Makefile +++ b/Makefile @@ -33,18 +33,12 @@ run: develop: # Usernames and Password only usable in locally in dev environment! - - $(eval export MINIO_ADDRESS := localhost:9002) - $(eval export MINIO_ACCESSKEYID := minio) - $(eval export MINIO_ACCESSKEY := DevOnlyPass) - $(eval export RABBIT_USER := rabbitmq) $(eval export RABBIT_PASSWORD := DevOnlyPass) $(eval export RABBIT_HOST := localhost) $(eval export RABBIT_PORT := 5672) - $(eval export SCHEMA_REDIS_ADDRESS := localhost:6379) - $(eval export UI_REDIS_ADDRESS := localhost:6379) + $(eval export REDIS_ADDRESS := localhost:6379) $(eval export JWT_SECRET := 15D262E1FB339FFBD062FFB81C1831B2757FA3F1C02B7432A3E586A447FB7870) diff --git a/cmd/schema-orchestrator/main.go b/cmd/schema-orchestrator/main.go index 565ca77..3291e3e 100644 --- a/cmd/schema-orchestrator/main.go +++ b/cmd/schema-orchestrator/main.go @@ -19,7 +19,6 @@ import ( "git.science.uu.nl/graphpolaris/broker" "git.science.uu.nl/graphpolaris/keyvaluestore" - "git.science.uu.nl/graphpolaris/objectstore" ) /* @@ -46,34 +45,21 @@ func main() { } brokerDriver := broker.NewDriver(os.Getenv("RABBIT_USER"), os.Getenv("RABBIT_PASSWORD"), os.Getenv("RABBIT_HOST"), port) - // Create keyvaluestore to get the queue with which clients are connected (websockets) - clientUpdaterRedisService := keyvaluestore.NewDriver() - err = clientUpdaterRedisService.Connect(os.Getenv("UI_REDIS_ADDRESS")) - if err != nil { - log.Panic().Msg(fmt.Sprintf("keyValueStore error: %s", err)) - } - - // Create keyvaluestore to keep track of schema status - schemaStatusRedisService := keyvaluestore.NewDriver() - err = schemaStatusRedisService.Connect(os.Getenv("SCHEMA_REDIS_ADDRESS")) + redisService := keyvaluestore.NewDriver() + err = redisService.Connect(os.Getenv("REDIS_ADDRESS")) if err != nil { log.Panic().Msg(fmt.Sprintf("keyValueStore error: %s", err)) - } - - // Create object store driver - objectStore := objectstore.NewDriver() - err = objectStore.Connect(os.Getenv("MINIO_ADDRESS"), os.Getenv("MINIO_ACCESSKEYID"), os.Getenv("MINIO_ACCESSKEY")) - if err != nil { - log.Error().Err(err).Msg("failed to connect to minio") + } else { + log.Info().Str("address", os.Getenv("REDIS_ADDRESS")).Msg("successfully connected to redis") } // Create producer service - produceService := produce.NewService(brokerDriver, clientUpdaterRedisService, rpcDriver) + produceService := produce.NewService(brokerDriver, redisService, rpcDriver) produceService.Start() // Create a schema usecase - schemaService := schema.New(schemaStatusRedisService, objectStore, produceService) + schemaService := schema.New(redisService, produceService) // Create webdriver webDriver := webdriver.CreateListener(schemaService) diff --git a/go.mod b/go.mod index ddd2d5c..f38a258 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/pkg/errors v0.9.1 // indirect github.com/rs/xid v1.4.0 // indirect github.com/streadway/amqp v1.0.0 // indirect - github.com/thijsheijden/alice v0.1.21-0.20220115152033-a03f3f58c4c0 // indirect + github.com/thijsheijden/alice v0.1.21-0.20220206104614-72a8777edcc7 // indirect go.opentelemetry.io/otel v0.19.0 // indirect go.opentelemetry.io/otel/metric v0.19.0 // indirect go.opentelemetry.io/otel/trace v0.19.0 // indirect diff --git a/go.sum b/go.sum index ba595e5..041fd15 100644 --- a/go.sum +++ b/go.sum @@ -136,8 +136,8 @@ github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81P github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/thijsheijden/alice v0.1.21-0.20220115152033-a03f3f58c4c0 h1:DGtVn9DvfF2dS/qIfcFYdHNdDCueQw5JlUdw7UM+/vQ= -github.com/thijsheijden/alice v0.1.21-0.20220115152033-a03f3f58c4c0/go.mod h1:+NcNAdnGEJvRoLvK+PoemQMar6U3VZ553YZ6jstQylY= +github.com/thijsheijden/alice v0.1.21-0.20220206104614-72a8777edcc7 h1:BSFm3ubd+xdej9M2hwJ83nLOtQd8ew6gSIi9yK1vXL4= +github.com/thijsheijden/alice v0.1.21-0.20220206104614-72a8777edcc7/go.mod h1:+NcNAdnGEJvRoLvK+PoemQMar6U3VZ553YZ6jstQylY= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.0/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.opentelemetry.io/otel v0.19.0 h1:Lenfy7QHRXPZVsw/12CWpxX6d/JkrX8wrx2vO8G80Ng= diff --git a/internal/usecases/produce/produceCachedSchema.go b/internal/usecases/produce/produceCachedSchema.go index 8852db3..c667d16 100644 --- a/internal/usecases/produce/produceCachedSchema.go +++ b/internal/usecases/produce/produceCachedSchema.go @@ -10,12 +10,13 @@ import ( /* ProduceCachedSchema produces the cached schema + schema: *[]byte, the schema result sessionID: *string, the ID of the session */ func (s *Service) ProduceCachedSchema(schema *[]byte, sessionID *string) { // Use the sessionID to query the key value store to get the queue we need to send this message to - clientQueueID, err := s.keyValueStore.Get(context.Background(), *sessionID, keyvaluestore.String) + clientQueueID, err := s.keyValueStore.Get(context.Background(), fmt.Sprintf("routing %s", *sessionID), keyvaluestore.String) if err != nil || clientQueueID == nil { return } diff --git a/internal/usecases/schema/retrieve.go b/internal/usecases/schema/retrieve.go index 9243e53..44f5ec3 100644 --- a/internal/usecases/schema/retrieve.go +++ b/internal/usecases/schema/retrieve.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "time" "github.com/rs/zerolog/log" @@ -22,7 +23,7 @@ Retrieve retrieves the schema. It will check if the schema is currently being ma */ func (s *Service) Retrieve(request *[]byte, sessionID string, clientID string, databaseName string, cached bool) (err error) { // Check if the schema is currently being retrieved for this database - schemaBeingRetrievedInterface, err := s.keyValueStore.Get(context.Background(), fmt.Sprintf("%s-%s", databaseName, clientID), keyvaluestore.Bool) + schemaBeingRetrievedInterface, err := s.keyValueStore.Get(context.Background(), fmt.Sprintf("cached-schema retrieval %s-%s", databaseName, clientID), keyvaluestore.Bool) if err != nil && err != keyvaluestore.ErrKeyNotFound { log.Error().AnErr("Err", err).Msg("error accessing keyvaluestore in schema retrieve method") return err @@ -61,7 +62,7 @@ func (s *Service) Retrieve(request *[]byte, sessionID string, clientID string, d s.producer.ProduceSchemaRetrievalRequest(*request, sessionID, clientID, databaseName) // Update KVS - s.keyValueStore.Set(context.Background(), fmt.Sprintf("%s-%s", databaseName, clientID), true) + s.keyValueStore.SetWithEnvDuration(context.Background(), fmt.Sprintf("cached-schema retrieval %s-%s", databaseName, clientID), true, "REDIS_SCHEMA_RETRIEVAL_DURATION", time.Minute) // Return no error return nil diff --git a/internal/usecases/schema/retrieveCachedSchema.go b/internal/usecases/schema/retrieveCachedSchema.go index 456e8c7..b73dcd9 100644 --- a/internal/usecases/schema/retrieveCachedSchema.go +++ b/internal/usecases/schema/retrieveCachedSchema.go @@ -7,10 +7,13 @@ import ( "log" "schema-orchestrator/internal/entity" "time" + + "git.science.uu.nl/graphpolaris/keyvaluestore" ) /* retrieveCachedSchema retrieves a cached schema. This method is called if the cached key in the message is set to 'true' + clientID: string, the clientID this database belongs to databaseName: string, the name of the database we want to fetch the cached schema for Returns: schema: *entity.JSONReturnFormat, possible error @@ -20,14 +23,14 @@ func (s *Service) retrieveCachedSchema(clientID string, databaseName string) (*e // Create context with 30 second timeout getBucketContext, cancelGetBucket := context.WithTimeout(context.Background(), time.Second*30) defer cancelGetBucket() - objectReader, err := s.objectStore.Get(getBucketContext, "cached-schemas", fmt.Sprintf("%s-%s", databaseName, clientID)) + byteArray, err := s.keyValueStore.Get(getBucketContext, fmt.Sprintf("cached-schemas %s-%s", databaseName, clientID), keyvaluestore.Bytes) if err != nil { log.Println(err, "Failed to retrieve schema from object store") return nil, err } var schema entity.JSONReturnFormat - err = json.NewDecoder(objectReader).Decode(&schema) + err = json.Unmarshal(byteArray.([]byte), &schema) return &schema, err } diff --git a/internal/usecases/schema/schema.go b/internal/usecases/schema/schema.go index e503ce9..604f240 100644 --- a/internal/usecases/schema/schema.go +++ b/internal/usecases/schema/schema.go @@ -4,7 +4,6 @@ import ( "schema-orchestrator/internal/usecases/produce" "git.science.uu.nl/graphpolaris/keyvaluestore" - "git.science.uu.nl/graphpolaris/objectstore" ) /* @@ -12,18 +11,17 @@ Service implements the schema UseCase */ type Service struct { keyValueStore keyvaluestore.Interface - objectStore objectstore.Interface producer produce.UseCase } /* New creates a new schema UseCase + keyValueStore: keyvaluestore.Interface, the keyvaluestore driver to use to check schema status */ -func New(keyValueStore keyvaluestore.Interface, objectStore objectstore.Interface, producer produce.UseCase) UseCase { +func New(keyValueStore keyvaluestore.Interface, producer produce.UseCase) UseCase { return &Service{ keyValueStore: keyValueStore, - objectStore: objectStore, producer: producer, } } -- GitLab