From 0de3f43f4165e732a13ffe24dd0ea511dc7013e8 Mon Sep 17 00:00:00 2001 From: Leonardo Christino <leomilho@gmail.com> Date: Wed, 11 Oct 2023 14:06:00 +0200 Subject: [PATCH] feat: move proto and common structs to go-common --- Makefile | 2 +- cmd/schema-orchestrator/main.go | 24 +- .../databaseTypeService/databaseType.pb.go | 297 ------------------ .../databaseType_grpc.pb.go | 106 ------- internal/drivers/rpcdriver/getDatabaseType.go | 55 ---- internal/drivers/rpcdriver/grpcdriver.go | 19 -- internal/drivers/rpcdriver/interface.go | 13 - internal/drivers/webdriver/web.go | 42 --- internal/usecases/produce/interface.go | 40 ++- internal/usecases/produce/produce.go | 50 --- .../usecases/produce/produceCachedSchema.go | 29 -- .../produce/produceSchemaRetrievalRequest.go | 40 --- internal/usecases/schema/interface.go | 8 - internal/usecases/schema/retrieve.go | 81 ----- .../usecases/schema/retrieveCachedSchema.go | 36 --- internal/usecases/schema/schema.go | 27 -- .../schemaService/cacheHitCallback.go | 17 + .../schemaService/cacheMissCallback.go | 1 + .../schemaService/handler.go} | 151 ++++----- internal/usecases/schemaService/interface.go | 97 ++++++ .../usecases/schemaStatsService/handler.go | 78 +++++ .../usecases/schemaStatsService/interface.go | 98 ++++++ 22 files changed, 418 insertions(+), 893 deletions(-) delete mode 100644 internal/drivers/rpcdriver/databaseTypeService/databaseType.pb.go delete mode 100644 internal/drivers/rpcdriver/databaseTypeService/databaseType_grpc.pb.go delete mode 100644 internal/drivers/rpcdriver/getDatabaseType.go delete mode 100644 internal/drivers/rpcdriver/grpcdriver.go delete mode 100644 internal/drivers/rpcdriver/interface.go delete mode 100755 internal/drivers/webdriver/web.go delete mode 100755 internal/usecases/produce/produce.go delete mode 100755 internal/usecases/produce/produceCachedSchema.go delete mode 100755 internal/usecases/produce/produceSchemaRetrievalRequest.go delete mode 100644 internal/usecases/schema/interface.go delete mode 100644 internal/usecases/schema/retrieve.go delete mode 100644 internal/usecases/schema/retrieveCachedSchema.go delete mode 100644 internal/usecases/schema/schema.go create mode 100755 internal/usecases/schemaService/cacheHitCallback.go create mode 100755 internal/usecases/schemaService/cacheMissCallback.go rename internal/{drivers/webdriver/schemaRequest.go => usecases/schemaService/handler.go} (73%) create mode 100644 internal/usecases/schemaService/interface.go create mode 100755 internal/usecases/schemaStatsService/handler.go create mode 100644 internal/usecases/schemaStatsService/interface.go diff --git a/Makefile b/Makefile index 4028b7d..781c895 100644 --- a/Makefile +++ b/Makefile @@ -31,7 +31,7 @@ linux: login # Build for linux run: ./builds/main -develop: tidy +develop: # Usernames and Password only usable in locally in dev environment! $(eval export RABBIT_USER := rabbitmq) $(eval export RABBIT_PASSWORD := DevOnlyPass) diff --git a/cmd/schema-orchestrator/main.go b/cmd/schema-orchestrator/main.go index f081f99..f6dac9f 100755 --- a/cmd/schema-orchestrator/main.go +++ b/cmd/schema-orchestrator/main.go @@ -6,10 +6,9 @@ This program has been developed by students from the bachelor Computer Science a package main import ( - "schema-orchestrator/internal/drivers/rpcdriver" - "schema-orchestrator/internal/drivers/webdriver" "schema-orchestrator/internal/usecases/produce" - "schema-orchestrator/internal/usecases/schema" + "schema-orchestrator/internal/usecases/schemaService" + "schema-orchestrator/internal/usecases/schemaStatsService" "git.science.uu.nl/graphpolaris/go-common/microservice" ) @@ -21,9 +20,6 @@ func main() { // Set up logging microservice.SetupLogging() - // Create rpcDriver - rpcDriver := rpcdriver.New() - // Create broker driver brokerDriver := microservice.ConnectRabbit() @@ -31,15 +27,19 @@ func main() { redisService := microservice.ConnectRedis() // Create producer service - produceService := produce.NewService(brokerDriver, redisService, rpcDriver) + produceService := produce.New(brokerDriver, redisService) produceService.Start() - // Create a schema usecase - schemaService := schema.New(redisService, produceService) + // services + schemaService := schemaService.New(produceService, redisService) + schemaStatsService := schemaStatsService.New(produceService, redisService) - // Create webdriver - webDriver := webdriver.CreateListener(schemaService) - webDriver.Start() + // Setup web driver + api := microservice.New() + r := api.Routes() + r.Post("/", schemaService.Handler) + r.Post("/stats/", schemaStatsService.Handler) + api.Start(3002, r) select {} } diff --git a/internal/drivers/rpcdriver/databaseTypeService/databaseType.pb.go b/internal/drivers/rpcdriver/databaseTypeService/databaseType.pb.go deleted file mode 100644 index 00b3853..0000000 --- a/internal/drivers/rpcdriver/databaseTypeService/databaseType.pb.go +++ /dev/null @@ -1,297 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -// Code generated by protoc-gen-go. DO NOT EDIT. -// versions: -// protoc-gen-go v1.26.0 -// protoc v3.14.0 -// source: databaseType.proto - -package databaseTypeService - -import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" - reflect "reflect" - sync "sync" -) - -const ( - // Verify that this generated code is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) - // Verify that runtime/protoimpl is sufficiently up-to-date. - _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) -) - -type DatabaseTypeResponse_DatabaseType int32 - -const ( - DatabaseTypeResponse_ARANGODB DatabaseTypeResponse_DatabaseType = 0 - DatabaseTypeResponse_NEO4J DatabaseTypeResponse_DatabaseType = 1 -) - -// Enum value maps for DatabaseTypeResponse_DatabaseType. -var ( - DatabaseTypeResponse_DatabaseType_name = map[int32]string{ - 0: "ARANGODB", - 1: "NEO4J", - } - DatabaseTypeResponse_DatabaseType_value = map[string]int32{ - "ARANGODB": 0, - "NEO4J": 1, - } -) - -func (x DatabaseTypeResponse_DatabaseType) Enum() *DatabaseTypeResponse_DatabaseType { - p := new(DatabaseTypeResponse_DatabaseType) - *p = x - return p -} - -func (x DatabaseTypeResponse_DatabaseType) String() string { - return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) -} - -func (DatabaseTypeResponse_DatabaseType) Descriptor() protoreflect.EnumDescriptor { - return file_databaseType_proto_enumTypes[0].Descriptor() -} - -func (DatabaseTypeResponse_DatabaseType) Type() protoreflect.EnumType { - return &file_databaseType_proto_enumTypes[0] -} - -func (x DatabaseTypeResponse_DatabaseType) Number() protoreflect.EnumNumber { - return protoreflect.EnumNumber(x) -} - -// Deprecated: Use DatabaseTypeResponse_DatabaseType.Descriptor instead. -func (DatabaseTypeResponse_DatabaseType) EnumDescriptor() ([]byte, []int) { - return file_databaseType_proto_rawDescGZIP(), []int{1, 0} -} - -type DatabaseTypeRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - ClientID string `protobuf:"bytes,1,opt,name=clientID,proto3" json:"clientID,omitempty"` - DatabaseName string `protobuf:"bytes,2,opt,name=databaseName,proto3" json:"databaseName,omitempty"` -} - -func (x *DatabaseTypeRequest) Reset() { - *x = DatabaseTypeRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_databaseType_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *DatabaseTypeRequest) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*DatabaseTypeRequest) ProtoMessage() {} - -func (x *DatabaseTypeRequest) ProtoReflect() protoreflect.Message { - mi := &file_databaseType_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use DatabaseTypeRequest.ProtoReflect.Descriptor instead. -func (*DatabaseTypeRequest) Descriptor() ([]byte, []int) { - return file_databaseType_proto_rawDescGZIP(), []int{0} -} - -func (x *DatabaseTypeRequest) GetClientID() string { - if x != nil { - return x.ClientID - } - return "" -} - -func (x *DatabaseTypeRequest) GetDatabaseName() string { - if x != nil { - return x.DatabaseName - } - return "" -} - -type DatabaseTypeResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - - Error string `protobuf:"bytes,1,opt,name=error,proto3" json:"error,omitempty"` - DatabaseType DatabaseTypeResponse_DatabaseType `protobuf:"varint,2,opt,name=databaseType,proto3,enum=DatabaseTypeResponse_DatabaseType" json:"databaseType,omitempty"` -} - -func (x *DatabaseTypeResponse) Reset() { - *x = DatabaseTypeResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_databaseType_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } -} - -func (x *DatabaseTypeResponse) String() string { - return protoimpl.X.MessageStringOf(x) -} - -func (*DatabaseTypeResponse) ProtoMessage() {} - -func (x *DatabaseTypeResponse) ProtoReflect() protoreflect.Message { - mi := &file_databaseType_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - if ms.LoadMessageInfo() == nil { - ms.StoreMessageInfo(mi) - } - return ms - } - return mi.MessageOf(x) -} - -// Deprecated: Use DatabaseTypeResponse.ProtoReflect.Descriptor instead. -func (*DatabaseTypeResponse) Descriptor() ([]byte, []int) { - return file_databaseType_proto_rawDescGZIP(), []int{1} -} - -func (x *DatabaseTypeResponse) GetError() string { - if x != nil { - return x.Error - } - return "" -} - -func (x *DatabaseTypeResponse) GetDatabaseType() DatabaseTypeResponse_DatabaseType { - if x != nil { - return x.DatabaseType - } - return DatabaseTypeResponse_ARANGODB -} - -var File_databaseType_proto protoreflect.FileDescriptor - -var file_databaseType_proto_rawDesc = []byte{ - 0x0a, 0x12, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x55, 0x0a, 0x13, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1a, 0x0a, 0x08, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, - 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x49, 0x44, 0x12, 0x22, 0x0a, 0x0c, 0x64, 0x61, 0x74, 0x61, 0x62, - 0x61, 0x73, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x64, - 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x22, 0x9d, 0x01, 0x0a, 0x14, - 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x46, 0x0a, 0x0c, 0x64, 0x61, - 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, - 0x32, 0x22, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x0c, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, - 0x70, 0x65, 0x22, 0x27, 0x0a, 0x0c, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x0c, 0x0a, 0x08, 0x41, 0x52, 0x41, 0x4e, 0x47, 0x4f, 0x44, 0x42, 0x10, 0x00, - 0x12, 0x09, 0x0a, 0x05, 0x4e, 0x45, 0x4f, 0x34, 0x4a, 0x10, 0x01, 0x32, 0x57, 0x0a, 0x13, 0x44, - 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, - 0x63, 0x65, 0x12, 0x40, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, - 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x14, 0x2e, 0x44, 0x61, 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, - 0x54, 0x79, 0x70, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x15, 0x2e, 0x44, 0x61, - 0x74, 0x61, 0x62, 0x61, 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x22, 0x00, 0x42, 0x47, 0x5a, 0x45, 0x75, 0x73, 0x65, 0x72, 0x2d, 0x6d, 0x61, 0x6e, - 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2d, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2f, - 0x69, 0x6e, 0x74, 0x65, 0x72, 0x6e, 0x61, 0x6c, 0x2f, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x2f, - 0x72, 0x70, 0x63, 0x64, 0x72, 0x69, 0x76, 0x65, 0x72, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x62, 0x61, - 0x73, 0x65, 0x54, 0x79, 0x70, 0x65, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x62, 0x06, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x33, -} - -var ( - file_databaseType_proto_rawDescOnce sync.Once - file_databaseType_proto_rawDescData = file_databaseType_proto_rawDesc -) - -func file_databaseType_proto_rawDescGZIP() []byte { - file_databaseType_proto_rawDescOnce.Do(func() { - file_databaseType_proto_rawDescData = protoimpl.X.CompressGZIP(file_databaseType_proto_rawDescData) - }) - return file_databaseType_proto_rawDescData -} - -var file_databaseType_proto_enumTypes = make([]protoimpl.EnumInfo, 1) -var file_databaseType_proto_msgTypes = make([]protoimpl.MessageInfo, 2) -var file_databaseType_proto_goTypes = []interface{}{ - (DatabaseTypeResponse_DatabaseType)(0), // 0: DatabaseTypeResponse.DatabaseType - (*DatabaseTypeRequest)(nil), // 1: DatabaseTypeRequest - (*DatabaseTypeResponse)(nil), // 2: DatabaseTypeResponse -} -var file_databaseType_proto_depIdxs = []int32{ - 0, // 0: DatabaseTypeResponse.databaseType:type_name -> DatabaseTypeResponse.DatabaseType - 1, // 1: DatabaseTypeService.GetDatabaseType:input_type -> DatabaseTypeRequest - 2, // 2: DatabaseTypeService.GetDatabaseType:output_type -> DatabaseTypeResponse - 2, // [2:3] is the sub-list for method output_type - 1, // [1:2] is the sub-list for method input_type - 1, // [1:1] is the sub-list for extension type_name - 1, // [1:1] is the sub-list for extension extendee - 0, // [0:1] is the sub-list for field type_name -} - -func init() { file_databaseType_proto_init() } -func file_databaseType_proto_init() { - if File_databaseType_proto != nil { - return - } - if !protoimpl.UnsafeEnabled { - file_databaseType_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DatabaseTypeRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_databaseType_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { - switch v := v.(*DatabaseTypeResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } - type x struct{} - out := protoimpl.TypeBuilder{ - File: protoimpl.DescBuilder{ - GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_databaseType_proto_rawDesc, - NumEnums: 1, - NumMessages: 2, - NumExtensions: 0, - NumServices: 1, - }, - GoTypes: file_databaseType_proto_goTypes, - DependencyIndexes: file_databaseType_proto_depIdxs, - EnumInfos: file_databaseType_proto_enumTypes, - MessageInfos: file_databaseType_proto_msgTypes, - }.Build() - File_databaseType_proto = out.File - file_databaseType_proto_rawDesc = nil - file_databaseType_proto_goTypes = nil - file_databaseType_proto_depIdxs = nil -} diff --git a/internal/drivers/rpcdriver/databaseTypeService/databaseType_grpc.pb.go b/internal/drivers/rpcdriver/databaseTypeService/databaseType_grpc.pb.go deleted file mode 100644 index be95959..0000000 --- a/internal/drivers/rpcdriver/databaseTypeService/databaseType_grpc.pb.go +++ /dev/null @@ -1,106 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -// Code generated by protoc-gen-go-grpc. DO NOT EDIT. - -package databaseTypeService - -import ( - context "context" - grpc "google.golang.org/grpc" - codes "google.golang.org/grpc/codes" - status "google.golang.org/grpc/status" -) - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -// Requires gRPC-Go v1.32.0 or later. -const _ = grpc.SupportPackageIsVersion7 - -// DatabaseTypeServiceClient is the client API for DatabaseTypeService service. -// -// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. -type DatabaseTypeServiceClient interface { - GetDatabaseType(ctx context.Context, in *DatabaseTypeRequest, opts ...grpc.CallOption) (*DatabaseTypeResponse, error) -} - -type databaseTypeServiceClient struct { - cc grpc.ClientConnInterface -} - -func NewDatabaseTypeServiceClient(cc grpc.ClientConnInterface) DatabaseTypeServiceClient { - return &databaseTypeServiceClient{cc} -} - -func (c *databaseTypeServiceClient) GetDatabaseType(ctx context.Context, in *DatabaseTypeRequest, opts ...grpc.CallOption) (*DatabaseTypeResponse, error) { - out := new(DatabaseTypeResponse) - err := c.cc.Invoke(ctx, "/DatabaseTypeService/GetDatabaseType", in, out, opts...) - if err != nil { - return nil, err - } - return out, nil -} - -// DatabaseTypeServiceServer is the server API for DatabaseTypeService service. -// All implementations must embed UnimplementedDatabaseTypeServiceServer -// for forward compatibility -type DatabaseTypeServiceServer interface { - GetDatabaseType(context.Context, *DatabaseTypeRequest) (*DatabaseTypeResponse, error) - mustEmbedUnimplementedDatabaseTypeServiceServer() -} - -// UnimplementedDatabaseTypeServiceServer must be embedded to have forward compatible implementations. -type UnimplementedDatabaseTypeServiceServer struct { -} - -func (UnimplementedDatabaseTypeServiceServer) GetDatabaseType(context.Context, *DatabaseTypeRequest) (*DatabaseTypeResponse, error) { - return nil, status.Errorf(codes.Unimplemented, "method GetDatabaseType not implemented") -} -func (UnimplementedDatabaseTypeServiceServer) mustEmbedUnimplementedDatabaseTypeServiceServer() {} - -// UnsafeDatabaseTypeServiceServer may be embedded to opt out of forward compatibility for this service. -// Use of this interface is not recommended, as added methods to DatabaseTypeServiceServer will -// result in compilation errors. -type UnsafeDatabaseTypeServiceServer interface { - mustEmbedUnimplementedDatabaseTypeServiceServer() -} - -func RegisterDatabaseTypeServiceServer(s grpc.ServiceRegistrar, srv DatabaseTypeServiceServer) { - s.RegisterService(&DatabaseTypeService_ServiceDesc, srv) -} - -func _DatabaseTypeService_GetDatabaseType_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { - in := new(DatabaseTypeRequest) - if err := dec(in); err != nil { - return nil, err - } - if interceptor == nil { - return srv.(DatabaseTypeServiceServer).GetDatabaseType(ctx, in) - } - info := &grpc.UnaryServerInfo{ - Server: srv, - FullMethod: "/DatabaseTypeService/GetDatabaseType", - } - handler := func(ctx context.Context, req interface{}) (interface{}, error) { - return srv.(DatabaseTypeServiceServer).GetDatabaseType(ctx, req.(*DatabaseTypeRequest)) - } - return interceptor(ctx, in, info, handler) -} - -// DatabaseTypeService_ServiceDesc is the grpc.ServiceDesc for DatabaseTypeService service. -// It's only intended for direct use with grpc.RegisterService, -// and not to be introspected or modified (even as a copy) -var DatabaseTypeService_ServiceDesc = grpc.ServiceDesc{ - ServiceName: "DatabaseTypeService", - HandlerType: (*DatabaseTypeServiceServer)(nil), - Methods: []grpc.MethodDesc{ - { - MethodName: "GetDatabaseType", - Handler: _DatabaseTypeService_GetDatabaseType_Handler, - }, - }, - Streams: []grpc.StreamDesc{}, - Metadata: "databaseType.proto", -} diff --git a/internal/drivers/rpcdriver/getDatabaseType.go b/internal/drivers/rpcdriver/getDatabaseType.go deleted file mode 100644 index 73b5d6a..0000000 --- a/internal/drivers/rpcdriver/getDatabaseType.go +++ /dev/null @@ -1,55 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -package rpcdriver - -import ( - "context" - "errors" - "os" - "schema-orchestrator/internal/drivers/rpcdriver/databaseTypeService" - - "github.com/rs/zerolog/log" - "google.golang.org/grpc" -) - -/* -GetDatabaseType opens a gRPC connection to the user management service and retrieves the database info for the given client and database name - - clientID: *string, the ID of the client - databaseName: *string, the name of the database - Return: (*string, error), returns the type of the database and a potential error -*/ -func (driver *Driver) GetDatabaseType(clientID *string, databaseName *string) (*string, error) { - userManagementServiceAddress := os.Getenv("USER_MANAGEMENT_SERVICE_RPC") - conn, err := grpc.Dial(userManagementServiceAddress, grpc.WithInsecure()) - if err != nil { - log.Error().AnErr("ERROR", err).Str("url", userManagementServiceAddress).Msg("ERROR connecting to user-management-service:9000") - return nil, err - } - defer conn.Close() - - grpcClient := databaseTypeService.NewDatabaseTypeServiceClient(conn) - - response, err := grpcClient.GetDatabaseType(context.Background(), &databaseTypeService.DatabaseTypeRequest{ClientID: *clientID, DatabaseName: *databaseName}) - if err != nil { - log.Error().AnErr("ERROR", err).Str("url", userManagementServiceAddress).Msg("ERROR querying to user-management-service:9000") - return nil, err - } else { - log.Info().Msg("Successfully retrieved database type") - } - - var databaseType string - switch response.DatabaseType { - case databaseTypeService.DatabaseTypeResponse_ARANGODB: - databaseType = "arangodb" - case databaseTypeService.DatabaseTypeResponse_NEO4J: - databaseType = "neo4j" - default: - return nil, errors.New("Unknown database type returned") - } - - return &databaseType, nil -} diff --git a/internal/drivers/rpcdriver/grpcdriver.go b/internal/drivers/rpcdriver/grpcdriver.go deleted file mode 100644 index 3ebdef0..0000000 --- a/internal/drivers/rpcdriver/grpcdriver.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -package rpcdriver - -/* -A Driver implements the rpc driver interface -*/ -type Driver struct { -} - -/* -New creates a new rpc driver -*/ -func New() Interface { - return &Driver{} -} diff --git a/internal/drivers/rpcdriver/interface.go b/internal/drivers/rpcdriver/interface.go deleted file mode 100644 index 0f6fe97..0000000 --- a/internal/drivers/rpcdriver/interface.go +++ /dev/null @@ -1,13 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -package rpcdriver - -/* -Interface specifies the methods a rpc driver should implement -*/ -type Interface interface { - GetDatabaseType(clientID *string, databaseName *string) (*string, error) -} diff --git a/internal/drivers/webdriver/web.go b/internal/drivers/webdriver/web.go deleted file mode 100755 index 13172a6..0000000 --- a/internal/drivers/webdriver/web.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -package webdriver - -import ( - "schema-orchestrator/internal/usecases/schema" - - "git.science.uu.nl/graphpolaris/go-common/microservice" -) - -/* -A Listener is a concrete implementation for a web listener -*/ -type Listener struct { - api microservice.API - schemaService schema.UseCase -} - -/* -CreateListener creates a web listener - - schemaService: schema.UseCase, the schema usecase - Return: *Listener, returns a web listener -*/ -func CreateListener(schemaService schema.UseCase) *Listener { - return &Listener{ - api: microservice.New(), - schemaService: schemaService, - } -} - -/* -Start starts the web listener on port 3002 -*/ -func (l *Listener) Start() { - r := l.api.Routes() - r.Post("/", l.schemaRequestHandler) - l.api.Start(3002, r) -} diff --git a/internal/usecases/produce/interface.go b/internal/usecases/produce/interface.go index 8b674b8..07b97f4 100644 --- a/internal/usecases/produce/interface.go +++ b/internal/usecases/produce/interface.go @@ -5,10 +5,42 @@ This program has been developed by students from the bachelor Computer Science a package produce +import ( + "git.science.uu.nl/graphpolaris/broker" + "git.science.uu.nl/graphpolaris/broker/producer" + "git.science.uu.nl/graphpolaris/keyvaluestore" +) + +/* +Service wraps consumer methods +*/ +type ProduceService struct { + brokerDriver broker.Interface + keyValueStore keyvaluestore.Interface + RequestProducer producer.BrokerProducerI + ResultProducer producer.BrokerProducerI +} + +/* +New creates a new service + + broker: broker.Interface, the broker for the new service + keyValueStore: keyvaluestore.Interface, the key value store for the new service + rpcDriver: rpcdriver.Interface, the rpc driver of the new interface + Return: *Service, the new service +*/ +func New(broker broker.Interface, keyValueStore keyvaluestore.Interface) *ProduceService { + return &ProduceService{ + brokerDriver: broker, + keyValueStore: keyValueStore, + } +} + /* -UseCase is an interface describing the produce usecases +Start starts the producer */ -type UseCase interface { - ProduceSchemaRetrievalRequest(request []byte, sessionID string, clientID string, databaseName string) - ProduceCachedSchema(schema *[]byte, sessionID *string) +func (s *ProduceService) Start() { + // Create producer + s.RequestProducer = s.brokerDriver.CreateProducer("requests-exchange", "schema-orchestrator") + s.ResultProducer = s.brokerDriver.CreateProducer("ui-direct-exchange", "schema-orchestrator") } diff --git a/internal/usecases/produce/produce.go b/internal/usecases/produce/produce.go deleted file mode 100755 index f7c5a91..0000000 --- a/internal/usecases/produce/produce.go +++ /dev/null @@ -1,50 +0,0 @@ -/* -This program has been developed by students from the bachelor Computer Science at Utrecht University within the Software Project course. -© Copyright Utrecht University (Department of Information and Computing Sciences) -*/ - -package produce - -import ( - "schema-orchestrator/internal/drivers/rpcdriver" - - "git.science.uu.nl/graphpolaris/broker" - "git.science.uu.nl/graphpolaris/broker/producer" - "git.science.uu.nl/graphpolaris/keyvaluestore" -) - -/* -Service wraps consumer methods -*/ -type Service struct { - brokerDriver broker.Interface - requestProducer producer.BrokerProducerI - resultProducer producer.BrokerProducerI - keyValueStore keyvaluestore.Interface - rpcDriver rpcdriver.Interface -} - -/* -NewService creates a new service - - broker: broker.Interface, the broker for the new service - keyValueStore: keyvaluestore.Interface, the key value store for the new service - rpcDriver: rpcdriver.Interface, the rpc driver of the new interface - Return: *Service, the new service -*/ -func NewService(broker broker.Interface, keyValueStore keyvaluestore.Interface, rpcDriver rpcdriver.Interface) *Service { - return &Service{ - brokerDriver: broker, - keyValueStore: keyValueStore, - rpcDriver: rpcDriver, - } -} - -/* -Start starts the producer -*/ -func (s *Service) Start() { - // Create producer - s.requestProducer = s.brokerDriver.CreateProducer("requests-exchange", "schema-orchestrator") - s.resultProducer = s.brokerDriver.CreateProducer("ui-direct-exchange", "schema-orchestrator") -} diff --git a/internal/usecases/produce/produceCachedSchema.go b/internal/usecases/produce/produceCachedSchema.go deleted file mode 100755 index 988e16b..0000000 --- a/internal/usecases/produce/produceCachedSchema.go +++ /dev/null @@ -1,29 +0,0 @@ -package produce - -import ( - "context" - "fmt" - "log" - - "git.science.uu.nl/graphpolaris/keyvaluestore" -) - -/* -ProduceCachedSchema produces the cached schema - - schema: *[]byte, the schema result - sessionID: *string, the ID of the session -*/ -func (s *Service) ProduceCachedSchema(schema *[]byte, sessionID *string) { - // Use the sessionID to query the key value store to get the queue we need to send this message to - clientQueueID, err := s.keyValueStore.Get(context.Background(), fmt.Sprintf("routing %s", *sessionID), keyvaluestore.String) - if err != nil || clientQueueID == nil { - return - } - - log.Println(fmt.Sprintf("Found client queue %s for session %s", clientQueueID, *sessionID)) - - headers := make(map[string]interface{}) - headers["sessionID"] = *sessionID - s.resultProducer.PublishMessageJsonHeaders(schema, clientQueueID.(string), &headers) -} diff --git a/internal/usecases/produce/produceSchemaRetrievalRequest.go b/internal/usecases/produce/produceSchemaRetrievalRequest.go deleted file mode 100755 index 5114736..0000000 --- a/internal/usecases/produce/produceSchemaRetrievalRequest.go +++ /dev/null @@ -1,40 +0,0 @@ -package produce - -import "log" - -/* -ProduceSchemaRetrievalRequest publishes a schema retrieval request - - request: []byte, the schema retrieval request - sessionID: string, the ID of the session - clientID: string, the ID of the client - databaseName: string, the name of the database -*/ -func (s *Service) ProduceSchemaRetrievalRequest(request []byte, sessionID string, clientID string, databaseName string) { - headers := make(map[string]interface{}) - headers["sessionID"] = sessionID - headers["clientID"] = clientID - - // Send the message to the correct queue - // Request the database type from the user management service - databaseType, err := s.rpcDriver.GetDatabaseType(&clientID, &databaseName) - if err != nil { - log.Println("Error producing schema retrieval! No database type found") - // log.Println("No valid database type found, defaulting to neo4j") // TODO - // s.requestProducer.PublishMessage(&request, "neo4j-schema-request", &headers) - return // TODO - } - - switch *databaseType { - case "arangodb": - log.Println("Publishing to arangodb queue") - s.requestProducer.PublishMessageJsonHeaders(&request, "arangodb-schema-request", &headers) - case "neo4j": - log.Println("Publishing to neo4j queue") - s.requestProducer.PublishMessageJsonHeaders(&request, "neo4j-schema-request", &headers) - default: - log.Println("No valid database type found") - return - } - -} diff --git a/internal/usecases/schema/interface.go b/internal/usecases/schema/interface.go deleted file mode 100644 index 67850eb..0000000 --- a/internal/usecases/schema/interface.go +++ /dev/null @@ -1,8 +0,0 @@ -package schema - -/* -UseCase describes the methods a schema usecase should implement -*/ -type UseCase interface { - Retrieve(request *[]byte, sessionID string, clientID string, databaseName string, cached bool) (err error) -} diff --git a/internal/usecases/schema/retrieve.go b/internal/usecases/schema/retrieve.go deleted file mode 100644 index 44f5ec3..0000000 --- a/internal/usecases/schema/retrieve.go +++ /dev/null @@ -1,81 +0,0 @@ -package schema - -import ( - "context" - "encoding/json" - "errors" - "fmt" - "time" - - "github.com/rs/zerolog/log" - - "git.science.uu.nl/graphpolaris/keyvaluestore" -) - -/* -Retrieve retrieves the schema. It will check if the schema is currently being made, if not it will check if it has been cached, if not it will produce a request to create it. - - request: *[]byte, the raw JSON request (contains cached boolean and database name) - sessionID: string, the session ID - clientID: string, the client ID - databaseName: string, name of the database we are requesting the schema for - cached: bool, whether a cached schema should be returned -*/ -func (s *Service) Retrieve(request *[]byte, sessionID string, clientID string, databaseName string, cached bool) (err error) { - // Check if the schema is currently being retrieved for this database - schemaBeingRetrievedInterface, err := s.keyValueStore.Get(context.Background(), fmt.Sprintf("cached-schema retrieval %s-%s", databaseName, clientID), keyvaluestore.Bool) - if err != nil && err != keyvaluestore.ErrKeyNotFound { - log.Error().AnErr("Err", err).Msg("error accessing keyvaluestore in schema retrieve method") - return err - } - - // Cast the interface{} to bool - schemaBeingRetrieved, ok := schemaBeingRetrievedInterface.(bool) - if !ok { - log.Error().Msg("error casting schemaBeingRetrieved to bool") - return errors.New("internal error (kvs)") - } - - // Schema is already being retrieved - if schemaBeingRetrieved { - log.Warn().Msg("schema is already being retrieved") - return nil - } - - log.Trace().Msg("schema is not being retrieved yet") - - // Check if the schema should be force refreshed - if !cached { - log.Trace().Msg("Not cached! Producing a schema retrieval request") - - // Produce a schema retrieval request - s.producer.ProduceSchemaRetrievalRequest(*request, sessionID, clientID, databaseName) - return nil - } - - // Check if the schema has been cached - schema, err := s.retrieveCachedSchema(clientID, databaseName) - if err != nil { - log.Trace().Msg("schema not present in object store") - // Schema was not in object store, or there was another issue with the object store - // Produce a schema retrieval request - s.producer.ProduceSchemaRetrievalRequest(*request, sessionID, clientID, databaseName) - - // Update KVS - s.keyValueStore.SetWithEnvDuration(context.Background(), fmt.Sprintf("cached-schema retrieval %s-%s", databaseName, clientID), true, "REDIS_SCHEMA_RETRIEVAL_DURATION", time.Minute) - - // Return no error - return nil - } - - log.Trace().Msg("schema present in object store") - - // Produce the cached schema - jsonResult := make(map[string]interface{}) - jsonResult["type"] = "schema_result" - jsonResult["value"] = *schema - b, _ := json.Marshal(jsonResult) - s.producer.ProduceCachedSchema(&b, &sessionID) - - return -} diff --git a/internal/usecases/schema/retrieveCachedSchema.go b/internal/usecases/schema/retrieveCachedSchema.go deleted file mode 100644 index b73dcd9..0000000 --- a/internal/usecases/schema/retrieveCachedSchema.go +++ /dev/null @@ -1,36 +0,0 @@ -package schema - -import ( - "context" - "encoding/json" - "fmt" - "log" - "schema-orchestrator/internal/entity" - "time" - - "git.science.uu.nl/graphpolaris/keyvaluestore" -) - -/* -retrieveCachedSchema retrieves a cached schema. This method is called if the cached key in the message is set to 'true' - - clientID: string, the clientID this database belongs to - databaseName: string, the name of the database we want to fetch the cached schema for - Returns: schema: *entity.JSONReturnFormat, possible error -*/ -func (s *Service) retrieveCachedSchema(clientID string, databaseName string) (*entity.JSONReturnFormat, error) { - // Retrieve the schema from the object store - // Create context with 30 second timeout - getBucketContext, cancelGetBucket := context.WithTimeout(context.Background(), time.Second*30) - defer cancelGetBucket() - byteArray, err := s.keyValueStore.Get(getBucketContext, fmt.Sprintf("cached-schemas %s-%s", databaseName, clientID), keyvaluestore.Bytes) - if err != nil { - log.Println(err, "Failed to retrieve schema from object store") - return nil, err - } - - var schema entity.JSONReturnFormat - err = json.Unmarshal(byteArray.([]byte), &schema) - - return &schema, err -} diff --git a/internal/usecases/schema/schema.go b/internal/usecases/schema/schema.go deleted file mode 100644 index 604f240..0000000 --- a/internal/usecases/schema/schema.go +++ /dev/null @@ -1,27 +0,0 @@ -package schema - -import ( - "schema-orchestrator/internal/usecases/produce" - - "git.science.uu.nl/graphpolaris/keyvaluestore" -) - -/* -Service implements the schema UseCase -*/ -type Service struct { - keyValueStore keyvaluestore.Interface - producer produce.UseCase -} - -/* -New creates a new schema UseCase - - keyValueStore: keyvaluestore.Interface, the keyvaluestore driver to use to check schema status -*/ -func New(keyValueStore keyvaluestore.Interface, producer produce.UseCase) UseCase { - return &Service{ - keyValueStore: keyValueStore, - producer: producer, - } -} diff --git a/internal/usecases/schemaService/cacheHitCallback.go b/internal/usecases/schemaService/cacheHitCallback.go new file mode 100755 index 0000000..04a31b5 --- /dev/null +++ b/internal/usecases/schemaService/cacheHitCallback.go @@ -0,0 +1,17 @@ +package schemaService + +// func (s *SchemaService) PublishSchema(data *[]byte, sessionData structs.SessionData) error { +// var result map[string]interface{} +// err := json.Unmarshal(*data, &result) +// if err != nil { +// return err +// } + +// // Produce the cached schema +// jsonResult := make(map[string]interface{}) +// jsonResult["type"] = "schema_result" +// jsonResult["value"] = result +// b, _ := json.Marshal(jsonResult) +// s.ProduceCachedSchema(&b, sessionData) +// return nil +// } diff --git a/internal/usecases/schemaService/cacheMissCallback.go b/internal/usecases/schemaService/cacheMissCallback.go new file mode 100755 index 0000000..5142bbf --- /dev/null +++ b/internal/usecases/schemaService/cacheMissCallback.go @@ -0,0 +1 @@ +package schemaService diff --git a/internal/drivers/webdriver/schemaRequest.go b/internal/usecases/schemaService/handler.go similarity index 73% rename from internal/drivers/webdriver/schemaRequest.go rename to internal/usecases/schemaService/handler.go index 970da6a..c3dd1e2 100755 --- a/internal/drivers/webdriver/schemaRequest.go +++ b/internal/usecases/schemaService/handler.go @@ -1,73 +1,78 @@ -package webdriver - -import ( - "encoding/json" - "net/http" - "os" - - "github.com/rs/zerolog/log" -) - -/* -Handles incoming schema requests - - authService: auth.UseCase, the usecase for the authentication service - produceService: produce.UseCase, the usecase for the producer service - Return: http.Handler, returns an http handler -*/ -func (l *Listener) schemaRequestHandler(w http.ResponseWriter, r *http.Request) { - log. - Trace(). - Str("schemaRequestHandler", "loading headers"). - Msg("processing schema request") - - // Publish a message into the query queue - - // Grab the databaseName and cached bool from the request - var temp map[string]interface{} - err := json.NewDecoder(r.Body).Decode(&temp) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - log.Trace().Str("schemaRequestHandler", "processing body").Any("body", temp).Msg("processing schema request") - - databaseName, ok := temp["databaseName"].(string) - if !ok { - w.WriteHeader(http.StatusBadRequest) - return - } - - log. - Trace(). - Str("schemaRequestHandler - Databasename", databaseName). - Msg("processing schema request") - - cached, ok := temp["cached"].(bool) - if !ok { - w.WriteHeader(http.StatusBadRequest) - return - } - - // Pass request to schema usecase - UserID := "UserID" - SessionID := "SessionID" - if os.Getenv("DEV") != "true" { - UserID = r.Header["Userid"][0] - SessionID = r.Header["Sessionid"][0] - } - - body, err := json.Marshal(&temp) // json -> bytes - if err != nil { - w.WriteHeader(http.StatusBadRequest) - return - } - - err = l.schemaService.Retrieve(&body, SessionID, UserID, databaseName, cached) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - return - } - - w.WriteHeader(http.StatusOK) -} +package schemaService + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + + "git.science.uu.nl/graphpolaris/go-common/structs" + "github.com/rs/zerolog/log" +) + +/* +Handles incoming schema requests + + authService: auth.UseCase, the usecase for the authentication service + produceService: produce.UseCase, the usecase for the producer service + Return: http.Handler, returns an http handler +*/ +func (s *SchemaService) Handler(w http.ResponseWriter, r *http.Request) { + log. + Trace(). + Str("schemaRequestHandler", "loading headers"). + Msg("processing schema request") + + // Publish a message into the query queue + + // Grab the databaseName and cached bool from the request + var temp map[string]interface{} + err := json.NewDecoder(r.Body).Decode(&temp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + log.Trace().Str("schemaRequestHandler", "processing body").Any("body", temp).Msg("processing schema request") + + databaseName, ok := temp["databaseName"].(string) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + log. + Trace(). + Str("schemaRequestHandler - Databasename", databaseName). + Msg("processing schema request") + + cached, ok := temp["cached"].(bool) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Pass request to schema usecase + sessionData := structs.SessionData{ + UserID: "UserID", + SessionID: "SessionID", + DatabaseName: databaseName, + } + if os.Getenv("DEV") != "true" { + sessionData.UserID = r.Header["Userid"][0] + sessionData.SessionID = r.Header["Sessionid"][0] + } + + body, err := json.Marshal(&temp) // json -> bytes + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + s.caching.RetrieveAsync(&body, fmt.Sprintf("%s-%s", sessionData.DatabaseName, sessionData.UserID), cached, sessionData, true) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/internal/usecases/schemaService/interface.go b/internal/usecases/schemaService/interface.go new file mode 100644 index 0000000..050d9f2 --- /dev/null +++ b/internal/usecases/schemaService/interface.go @@ -0,0 +1,97 @@ +package schemaService + +import ( + "context" + "fmt" + "schema-orchestrator/internal/usecases/produce" + + "git.science.uu.nl/graphpolaris/go-common/caching" + "git.science.uu.nl/graphpolaris/go-common/drivers/databaserpcdriver" + "git.science.uu.nl/graphpolaris/go-common/structs" + "git.science.uu.nl/graphpolaris/keyvaluestore" + "github.com/goccy/go-json" + "github.com/rs/zerolog/log" +) + +type SchemaService struct { + producer *produce.ProduceService + caching *caching.Caching +} + +func New(producer *produce.ProduceService, redisService keyvaluestore.Interface) *SchemaService { + s := &SchemaService{ + producer: producer, + caching: caching.New("cached-schemas", redisService, "SCHEMA_RETRIEVAL_DURATION"), + } + + s.caching.CacheMissCallback = s.CacheMissCallback + s.caching.CacheHitCallback = s.CacheHitCallback + return s +} + +/* +CacheMissCallback publishes a schema retrieval request + + request: []byte, the schema retrieval request + sessionID: string, the ID of the session + userID: string, the ID of the client + databaseName: string, the name of the database +*/ +func (s *SchemaService) CacheMissCallback(request []byte, sessionData structs.SessionData) (data interface{}, err error) { + headers := make(map[string]interface{}) + headers["sessionID"] = sessionData.SessionID + headers["userID"] = sessionData.UserID + + // Send the message to the correct queue + // Request the database type from the user management service + databaseType, err := databaserpcdriver.GetDatabaseType(&sessionData.UserID, &sessionData.DatabaseName) + if err != nil { + log.Error().AnErr("Err", err).Msg("error retrieving database type No database type found") + // log.Println("No valid database type found, defaulting to neo4j") // TODO + // s.requestProducer.PublishMessage(&request, "neo4j-schema-request", &headers) + return nil, err // TODO + } + + switch *databaseType { + case "arangodb": + log.Trace().Msg("Publishing to arangodb stats queue") + s.producer.RequestProducer.PublishMessageJsonHeaders(&request, "arangodb-schema-request", &headers) + case "neo4j": + log.Trace().Msg("Publishing to neo4j stats queue") + s.producer.RequestProducer.PublishMessageJsonHeaders(&request, "neo4j-schema-request", &headers) + default: + log.Error().Msg("No valid database type found") + return nil, err + } + + return nil, nil + +} + +/* +CacheHitCallback produces the cached schema + + data: *interface{}, the schema result + sessionID: *string, the ID of the session +*/ +func (s *SchemaService) CacheHitCallback(data interface{}, sessionData structs.SessionData) error { + // Use the sessionID to query the key value store to get the queue we need to send this message to + clientQueueID, err := s.caching.KeyValueStore.Get(context.Background(), fmt.Sprintf("routing %s", sessionData.SessionID), keyvaluestore.String) + if err != nil || clientQueueID == nil { + return err + } + + log.Trace().Str("sessionID", sessionData.SessionID).Str("userID", sessionData.UserID).Str("clientQueueID", clientQueueID.(string)).Msg("Found client queue for session") + + headers := make(map[string]interface{}) + headers["sessionID"] = sessionData.SessionID + headers["userID"] = sessionData.UserID + + dataBytes, err := json.Marshal(data) + if err != nil { + return err + } + + s.producer.ResultProducer.PublishMessageJsonHeaders(&dataBytes, clientQueueID.(string), &headers) + return nil +} diff --git a/internal/usecases/schemaStatsService/handler.go b/internal/usecases/schemaStatsService/handler.go new file mode 100755 index 0000000..a042b9d --- /dev/null +++ b/internal/usecases/schemaStatsService/handler.go @@ -0,0 +1,78 @@ +package schemaStatsService + +import ( + "encoding/json" + "fmt" + "net/http" + "os" + + "git.science.uu.nl/graphpolaris/go-common/structs" + "github.com/rs/zerolog/log" +) + +/* +Handles incoming schema stat requests + + authService: auth.UseCase, the usecase for the authentication service + produceService: produce.UseCase, the usecase for the producer service + Return: http.Handler, returns an http handler +*/ +func (s *SchemaService) Handler(w http.ResponseWriter, r *http.Request) { + log. + Trace(). + Str("schemaRequestHandler", "loading headers"). + Msg("processing schema request") + + // Publish a message into the query queue + + // Grab the databaseName and cached bool from the request + var temp map[string]interface{} + err := json.NewDecoder(r.Body).Decode(&temp) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + log.Trace().Str("schemaStatsRequestHandler", "processing body").Any("body", temp).Msg("processing schema stats request") + + databaseName, ok := temp["databaseName"].(string) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + log. + Trace(). + Str("schemaStatsRequestHandler - Databasename", databaseName). + Msg("processing schema stats request") + + cached, ok := temp["cached"].(bool) + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + + // Pass request to schema usecase + sessionData := structs.SessionData{ + UserID: "UserID", + SessionID: "SessionID", + DatabaseName: databaseName, + } + if os.Getenv("DEV") != "true" { + sessionData.UserID = r.Header["Userid"][0] + sessionData.SessionID = r.Header["Sessionid"][0] + } + + body, err := json.Marshal(&temp) // json -> bytes + if err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + s.caching.RetrieveAsync(&body, fmt.Sprintf("%s-%s", sessionData.DatabaseName, sessionData.UserID), cached, sessionData, true) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} diff --git a/internal/usecases/schemaStatsService/interface.go b/internal/usecases/schemaStatsService/interface.go new file mode 100644 index 0000000..a1e6a15 --- /dev/null +++ b/internal/usecases/schemaStatsService/interface.go @@ -0,0 +1,98 @@ +package schemaStatsService + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "schema-orchestrator/internal/usecases/produce" + + "git.science.uu.nl/graphpolaris/go-common/caching" + "git.science.uu.nl/graphpolaris/go-common/drivers/databaserpcdriver" + "git.science.uu.nl/graphpolaris/go-common/structs" + "git.science.uu.nl/graphpolaris/keyvaluestore" + "github.com/rs/zerolog/log" +) + +type SchemaService struct { + producer *produce.ProduceService + caching *caching.Caching +} + +func New(producer *produce.ProduceService, redisService keyvaluestore.Interface) *SchemaService { + s := &SchemaService{ + producer: producer, + caching: caching.New("cached-schemas-stats", redisService, "SCHEMA_RETRIEVAL_DURATION"), + } + + s.caching.CacheMissCallback = s.CacheMissCallback + s.caching.CacheHitCallback = s.CacheHitCallback + return s +} + +/* +CacheMissCallback publishes a schema retrieval request + + request: []byte, the schema retrieval request + sessionID: string, the ID of the session + userID: string, the ID of the client + databaseName: string, the name of the database +*/ +func (s *SchemaService) CacheMissCallback(request []byte, sessionData structs.SessionData) (data interface{}, err error) { + headers := make(map[string]interface{}) + headers["sessionID"] = sessionData.SessionID + headers["userID"] = sessionData.UserID + + // Send the message to the correct queue + // Request the database type from the user management service + databaseType, err := databaserpcdriver.GetDatabaseType(&sessionData.UserID, &sessionData.DatabaseName) + if err != nil { + log.Error().AnErr("Err", err).Msg("error retrieving database type No database type found") + // log.Println("No valid database type found, defaulting to neo4j") // TODO + // s.requestProducer.PublishMessage(&request, "neo4j-schema-request", &headers) + return nil, err // TODO + } + + switch *databaseType { + case "arangodb": + log.Trace().Msg("Publishing to arangodb stats queue") + s.producer.RequestProducer.PublishMessageJsonHeaders(&request, "arangodb-schema-stats-request", &headers) + case "neo4j": + log.Trace().Msg("Publishing to neo4j stats queue") + s.producer.RequestProducer.PublishMessageJsonHeaders(&request, "neo4j-schema-stats-request", &headers) + default: + log.Error().Msg("No valid database type found") + return nil, errors.New("No valid database type found") + } + + return nil, nil + +} + +/* +CacheHitCallback produces the cached schema + + schema: *[]byte, the schema result + sessionID: *string, the ID of the session +*/ +func (s *SchemaService) CacheHitCallback(data interface{}, sessionData structs.SessionData) error { + // Use the sessionID to query the key value store to get the queue we need to send this message to + clientQueueID, err := s.caching.KeyValueStore.Get(context.Background(), fmt.Sprintf("routing %s", sessionData.SessionID), keyvaluestore.String) + if err != nil || clientQueueID == nil { + return err + } + + log.Trace().Str("sessionID", sessionData.SessionID).Str("userID", sessionData.UserID).Str("clientQueueID", clientQueueID.(string)).Msg("Found client queue for session") + + headers := make(map[string]interface{}) + headers["sessionID"] = sessionData.SessionID + headers["userID"] = sessionData.UserID + + dataBytes, err := json.Marshal(data) + if err != nil { + return err + } + + s.producer.ResultProducer.PublishMessageJsonHeaders(&dataBytes, clientQueueID.(string), &headers) + return nil +} -- GitLab