diff --git a/backend/deploymentNew.yaml b/backend/deploymentNew.yaml new file mode 100644 index 0000000000000000000000000000000000000000..41a12ba2b194fd66d495e584cef6e3e20101e41b --- /dev/null +++ b/backend/deploymentNew.yaml @@ -0,0 +1,132 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: proveespace +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: proveegatewaydeployment + namespace: proveespace +spec: + replicas: 1 + selector: + matchLabels: + app: gateway + strategy: {} + template: + metadata: + labels: + app: gateway + spec: + containers: + - image: isolatedsushi/gateway + name: gateway + env: + - name: KNN_TARGET + value: "proveeknnservice:50052" + - name: PROJECTOR_TARGET + value: "proveeprojectorservice:50051" + ports: + - containerPort: 9898 + name: gateway + resources: {} +status: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: proveegatewayservice + namespace: proveespace +spec: + selector: + app: gateway + ports: + - name: grpc + protocol: TCP + port: 9898 + targetPort: 9898 + type: LoadBalancer + + + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: proveeprojectordeployment + namespace: proveespace +spec: + replicas: 1 + selector: + matchLabels: + app: grpcprojector + strategy: {} + template: + metadata: + labels: + app: grpcprojector + spec: + containers: + - name: grpcprojector + image: isolatedsushi/grpcprojector + ports: + - containerPort: 50051 + resources: {} +status: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: proveeprojectorservice + namespace: proveespace +spec: + selector: + app: grpcprojector + ports: + - name: grpcprojector + protocol: TCP + port: 50051 + targetPort: 50051 + type: LoadBalancer + + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: proveeknndeployment + namespace: proveespace +spec: + replicas: 1 + selector: + matchLabels: + app: grpcknn + strategy: {} + template: + metadata: + labels: + app: grpcknn + spec: + containers: + - name: grpcknn + image: isolatedsushi/grpcknn + ports: + - containerPort: 50052 + resources: {} +status: {} +--- +apiVersion: v1 +kind: Service +metadata: + name: proveeknnservice + namespace: proveespace +spec: + selector: + app: grpcknn + ports: + - name: grpcknn + protocol: TCP + port: 50052 + targetPort: 50052 + type: LoadBalancer \ No newline at end of file diff --git a/backend/docker-compose.yml b/backend/docker-compose.yml index a4435b9c5bb3b1bf162de10ce08df68f93b24fef..b919d8e7a9e151cf6d87babc7bf8120cb8fbbbf8 100644 --- a/backend/docker-compose.yml +++ b/backend/docker-compose.yml @@ -5,7 +5,8 @@ services: context: ./webSocketGateway dockerfile: Dockerfile environment: - - PROJECTOR_ADDRESS=grpcprojector:50051 + - PROJECTOR_TARGET=grpcprojector:50051 + - KNN_TARGET=grpcknn:50052 ports: - "9898:9898" grpcprojector: @@ -13,4 +14,10 @@ services: context: ./gRPCProjector/python dockerfile: Dockerfile ports: - - "50051:50051" + - "50051:50051" + grpcknn: + build: + context: ./grpcKNN + dockerfile: Dockerfile + ports: + - "50052:50052" diff --git a/backend/gRPCProjector/node/src/grpcServer.js b/backend/gRPCProjector/node/src/grpcServer.js index 01e333448f90ab7d30d8883ed8de408e47863494..ee5c2de8f8440d859696802195895fdb1f2631e8 100644 --- a/backend/gRPCProjector/node/src/grpcServer.js +++ b/backend/gRPCProjector/node/src/grpcServer.js @@ -60,6 +60,10 @@ function echoPoint(call, hdvector){ var response = {id : pointID, x: hdvector[0], y: hdvector[1]}; call.write(response); pointID += 1; + + if(pointID%1000==0){ + console.log("Processed " + pointID + " points") + } } //In case of no points diff --git a/backend/gRPCProjector/python/__pycache__/projector_pb2.cpython-38.pyc b/backend/gRPCProjector/python/__pycache__/projector_pb2.cpython-38.pyc index 8fa591fb0883eb803e4c84d572c90d11ef556633..f75695b668d0271b578b691d105712252c1b70c1 100644 Binary files a/backend/gRPCProjector/python/__pycache__/projector_pb2.cpython-38.pyc and b/backend/gRPCProjector/python/__pycache__/projector_pb2.cpython-38.pyc differ diff --git a/backend/gRPCProjector/python/__pycache__/projector_pb2_grpc.cpython-38.pyc b/backend/gRPCProjector/python/__pycache__/projector_pb2_grpc.cpython-38.pyc index eb5e0ff7a087431c739e9872648ba2a26cdf6080..14a650c93a143af4323e14e96108e0f198342bd1 100644 Binary files a/backend/gRPCProjector/python/__pycache__/projector_pb2_grpc.cpython-38.pyc and b/backend/gRPCProjector/python/__pycache__/projector_pb2_grpc.cpython-38.pyc differ diff --git a/backend/gRPCProjector/python/grpcServer.py b/backend/gRPCProjector/python/grpcServer.py index af6d30646d461aa42d41687eba4dceed599b6b3e..5a7f768ba55ed5ddb5dd1a90dd73c0143ae576e7 100644 --- a/backend/gRPCProjector/python/grpcServer.py +++ b/backend/gRPCProjector/python/grpcServer.py @@ -8,11 +8,19 @@ class ProjectorService(rpc.ProjectorServicer): def __init__(self): self.pointID = 0 + def getProjectionPoints(self, request_iterator, context): print("Connected") - for message in request_iterator: - yield self.rowToPoint(message) + for trainingChunk in request_iterator: + pointChunkBuffer = [] + for row in trainingChunk.rows: + pointChunkBuffer.append(self.rowToPoint(row)) + + + pointChunk = projector.PointChunk(points=pointChunkBuffer) + yield pointChunk + def rowToPoint(self, row): hdvector = row.hdvector diff --git a/backend/gRPCProjector/python/oldGenerated/projector_pb2.py b/backend/gRPCProjector/python/oldGenerated/projector_pb2.py new file mode 100644 index 0000000000000000000000000000000000000000..3f34f7013e3463ae66d6eccbf6a8ba4069907ad1 --- /dev/null +++ b/backend/gRPCProjector/python/oldGenerated/projector_pb2.py @@ -0,0 +1,239 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: projector.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='projector.proto', + package='provee', + syntax='proto3', + serialized_options=b'\n\017nl.uuvig.proveeB\022ProveProjectorGRCPP\001\242\002\006PROVEE', + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x0fprojector.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\")\n\x05Point\x12\n\n\x02id\x18\x01 \x01(\x05\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\"D\n\x0bTrainingSet\x12\x0f\n\x07modelid\x18\x01 \x01(\t\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\xf9\x01\n\tProjector\x12\x37\n\x05start\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x36\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x42\n\x13getProjectionPoints\x12\x16.provee.TrainingSetRow\x1a\r.provee.Point\"\x00(\x01\x30\x01\x12\x37\n\ngetUpdates\x12\x16.google.protobuf.Empty\x1a\r.provee.Point\"\x00\x30\x01\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3' + , + dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) + + + + +_POINT = _descriptor.Descriptor( + name='Point', + full_name='provee.Point', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='provee.Point.id', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='x', full_name='provee.Point.x', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='y', full_name='provee.Point.y', index=2, + number=3, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=56, + serialized_end=97, +) + + +_TRAININGSET = _descriptor.Descriptor( + name='TrainingSet', + full_name='provee.TrainingSet', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='modelid', full_name='provee.TrainingSet.modelid', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='rows', full_name='provee.TrainingSet.rows', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=99, + serialized_end=167, +) + + +_TRAININGSETROW = _descriptor.Descriptor( + name='TrainingSetRow', + full_name='provee.TrainingSetRow', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='provee.TrainingSetRow.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='hdvector', full_name='provee.TrainingSetRow.hdvector', index=1, + number=2, type=1, cpp_type=5, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=b'\020\001', file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=169, + serialized_end=219, +) + +_TRAININGSET.fields_by_name['rows'].message_type = _TRAININGSETROW +DESCRIPTOR.message_types_by_name['Point'] = _POINT +DESCRIPTOR.message_types_by_name['TrainingSet'] = _TRAININGSET +DESCRIPTOR.message_types_by_name['TrainingSetRow'] = _TRAININGSETROW +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +Point = _reflection.GeneratedProtocolMessageType('Point', (_message.Message,), { + 'DESCRIPTOR' : _POINT, + '__module__' : 'projector_pb2' + # @@protoc_insertion_point(class_scope:provee.Point) + }) +_sym_db.RegisterMessage(Point) + +TrainingSet = _reflection.GeneratedProtocolMessageType('TrainingSet', (_message.Message,), { + 'DESCRIPTOR' : _TRAININGSET, + '__module__' : 'projector_pb2' + # @@protoc_insertion_point(class_scope:provee.TrainingSet) + }) +_sym_db.RegisterMessage(TrainingSet) + +TrainingSetRow = _reflection.GeneratedProtocolMessageType('TrainingSetRow', (_message.Message,), { + 'DESCRIPTOR' : _TRAININGSETROW, + '__module__' : 'projector_pb2' + # @@protoc_insertion_point(class_scope:provee.TrainingSetRow) + }) +_sym_db.RegisterMessage(TrainingSetRow) + + +DESCRIPTOR._options = None +_TRAININGSETROW.fields_by_name['hdvector']._options = None + +_PROJECTOR = _descriptor.ServiceDescriptor( + name='Projector', + full_name='provee.Projector', + file=DESCRIPTOR, + index=0, + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_start=222, + serialized_end=471, + methods=[ + _descriptor.MethodDescriptor( + name='start', + full_name='provee.Projector.start', + index=0, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='stop', + full_name='provee.Projector.stop', + index=1, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='getProjectionPoints', + full_name='provee.Projector.getProjectionPoints', + index=2, + containing_service=None, + input_type=_TRAININGSETROW, + output_type=_POINT, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='getUpdates', + full_name='provee.Projector.getUpdates', + index=3, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=_POINT, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), +]) +_sym_db.RegisterServiceDescriptor(_PROJECTOR) + +DESCRIPTOR.services_by_name['Projector'] = _PROJECTOR + +# @@protoc_insertion_point(module_scope) diff --git a/backend/gRPCProjector/python/oldGenerated/projector_pb2_grpc.py b/backend/gRPCProjector/python/oldGenerated/projector_pb2_grpc.py new file mode 100644 index 0000000000000000000000000000000000000000..f36b3e05eee43d386a43cd7478846f73688d6141 --- /dev/null +++ b/backend/gRPCProjector/python/oldGenerated/projector_pb2_grpc.py @@ -0,0 +1,183 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +import projector_pb2 as projector__pb2 + + +class ProjectorStub(object): + """Interface exported by the server. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.start = channel.unary_unary( + '/provee.Projector/start', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + ) + self.stop = channel.unary_unary( + '/provee.Projector/stop', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + ) + self.getProjectionPoints = channel.stream_stream( + '/provee.Projector/getProjectionPoints', + request_serializer=projector__pb2.TrainingSetRow.SerializeToString, + response_deserializer=projector__pb2.Point.FromString, + ) + self.getUpdates = channel.unary_stream( + '/provee.Projector/getUpdates', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=projector__pb2.Point.FromString, + ) + + +class ProjectorServicer(object): + """Interface exported by the server. + """ + + def start(self, request, context): + """A simple RPC. + + Start the Projector calculation + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def stop(self, request, context): + """Stop the Projector calculation + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def getProjectionPoints(self, request_iterator, context): + """A server-to-client streaming RPC. + + Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are + streamed rather than returned at once (e.g. in a response message with a + repeated field). + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def getUpdates(self, request, context): + """A server-to-client streaming RPC. + + Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are + streamed rather than returned at once (e.g. in a response message with a + repeated field). + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_ProjectorServicer_to_server(servicer, server): + rpc_method_handlers = { + 'start': grpc.unary_unary_rpc_method_handler( + servicer.start, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'stop': grpc.unary_unary_rpc_method_handler( + servicer.stop, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'getProjectionPoints': grpc.stream_stream_rpc_method_handler( + servicer.getProjectionPoints, + request_deserializer=projector__pb2.TrainingSetRow.FromString, + response_serializer=projector__pb2.Point.SerializeToString, + ), + 'getUpdates': grpc.unary_stream_rpc_method_handler( + servicer.getUpdates, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=projector__pb2.Point.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'provee.Projector', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class Projector(object): + """Interface exported by the server. + """ + + @staticmethod + def start(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/provee.Projector/start', + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def stop(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/provee.Projector/stop', + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def getProjectionPoints(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_stream(request_iterator, target, '/provee.Projector/getProjectionPoints', + projector__pb2.TrainingSetRow.SerializeToString, + projector__pb2.Point.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def getUpdates(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/provee.Projector/getUpdates', + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + projector__pb2.Point.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/backend/gRPCProjector/python/projector_pb2.py b/backend/gRPCProjector/python/projector_pb2.py index 3f34f7013e3463ae66d6eccbf6a8ba4069907ad1..70e8055dbe58e5f050dc286dc2a08567d3b6410c 100644 --- a/backend/gRPCProjector/python/projector_pb2.py +++ b/backend/gRPCProjector/python/projector_pb2.py @@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor( syntax='proto3', serialized_options=b'\n\017nl.uuvig.proveeB\022ProveProjectorGRCPP\001\242\002\006PROVEE', create_key=_descriptor._internal_create_key, - serialized_pb=b'\n\x0fprojector.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\")\n\x05Point\x12\n\n\x02id\x18\x01 \x01(\x05\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\"D\n\x0bTrainingSet\x12\x0f\n\x07modelid\x18\x01 \x01(\t\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\xf9\x01\n\tProjector\x12\x37\n\x05start\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x36\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x42\n\x13getProjectionPoints\x12\x16.provee.TrainingSetRow\x1a\r.provee.Point\"\x00(\x01\x30\x01\x12\x37\n\ngetUpdates\x12\x16.google.protobuf.Empty\x1a\r.provee.Point\"\x00\x30\x01\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3' + serialized_pb=b'\n\x0fprojector.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\")\n\x05Point\x12\n\n\x02id\x18\x01 \x01(\x05\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\"+\n\nPointChunk\x12\x1d\n\x06points\x18\x01 \x03(\x0b\x32\r.provee.Point\"5\n\rTrainingChunk\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\x82\x02\n\tProjector\x12\x37\n\x05start\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x36\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x46\n\x13getProjectionPoints\x12\x15.provee.TrainingChunk\x1a\x12.provee.PointChunk\"\x00(\x01\x30\x01\x12<\n\ngetUpdates\x12\x16.google.protobuf.Empty\x1a\x12.provee.PointChunk\"\x00\x30\x01\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3' , dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) @@ -73,23 +73,48 @@ _POINT = _descriptor.Descriptor( ) -_TRAININGSET = _descriptor.Descriptor( - name='TrainingSet', - full_name='provee.TrainingSet', +_POINTCHUNK = _descriptor.Descriptor( + name='PointChunk', + full_name='provee.PointChunk', filename=None, file=DESCRIPTOR, containing_type=None, create_key=_descriptor._internal_create_key, fields=[ _descriptor.FieldDescriptor( - name='modelid', full_name='provee.TrainingSet.modelid', index=0, - number=1, type=9, cpp_type=9, label=1, - has_default_value=False, default_value=b"".decode('utf-8'), + name='points', full_name='provee.PointChunk.points', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, is_extension=False, extension_scope=None, serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=99, + serialized_end=142, +) + + +_TRAININGCHUNK = _descriptor.Descriptor( + name='TrainingChunk', + full_name='provee.TrainingChunk', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ _descriptor.FieldDescriptor( - name='rows', full_name='provee.TrainingSet.rows', index=1, + name='rows', full_name='provee.TrainingChunk.rows', index=0, number=2, type=11, cpp_type=10, label=3, has_default_value=False, default_value=[], message_type=None, enum_type=None, containing_type=None, @@ -107,8 +132,8 @@ _TRAININGSET = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=99, - serialized_end=167, + serialized_start=144, + serialized_end=197, ) @@ -146,13 +171,15 @@ _TRAININGSETROW = _descriptor.Descriptor( extension_ranges=[], oneofs=[ ], - serialized_start=169, - serialized_end=219, + serialized_start=199, + serialized_end=249, ) -_TRAININGSET.fields_by_name['rows'].message_type = _TRAININGSETROW +_POINTCHUNK.fields_by_name['points'].message_type = _POINT +_TRAININGCHUNK.fields_by_name['rows'].message_type = _TRAININGSETROW DESCRIPTOR.message_types_by_name['Point'] = _POINT -DESCRIPTOR.message_types_by_name['TrainingSet'] = _TRAININGSET +DESCRIPTOR.message_types_by_name['PointChunk'] = _POINTCHUNK +DESCRIPTOR.message_types_by_name['TrainingChunk'] = _TRAININGCHUNK DESCRIPTOR.message_types_by_name['TrainingSetRow'] = _TRAININGSETROW _sym_db.RegisterFileDescriptor(DESCRIPTOR) @@ -163,12 +190,19 @@ Point = _reflection.GeneratedProtocolMessageType('Point', (_message.Message,), { }) _sym_db.RegisterMessage(Point) -TrainingSet = _reflection.GeneratedProtocolMessageType('TrainingSet', (_message.Message,), { - 'DESCRIPTOR' : _TRAININGSET, +PointChunk = _reflection.GeneratedProtocolMessageType('PointChunk', (_message.Message,), { + 'DESCRIPTOR' : _POINTCHUNK, + '__module__' : 'projector_pb2' + # @@protoc_insertion_point(class_scope:provee.PointChunk) + }) +_sym_db.RegisterMessage(PointChunk) + +TrainingChunk = _reflection.GeneratedProtocolMessageType('TrainingChunk', (_message.Message,), { + 'DESCRIPTOR' : _TRAININGCHUNK, '__module__' : 'projector_pb2' - # @@protoc_insertion_point(class_scope:provee.TrainingSet) + # @@protoc_insertion_point(class_scope:provee.TrainingChunk) }) -_sym_db.RegisterMessage(TrainingSet) +_sym_db.RegisterMessage(TrainingChunk) TrainingSetRow = _reflection.GeneratedProtocolMessageType('TrainingSetRow', (_message.Message,), { 'DESCRIPTOR' : _TRAININGSETROW, @@ -188,8 +222,8 @@ _PROJECTOR = _descriptor.ServiceDescriptor( index=0, serialized_options=None, create_key=_descriptor._internal_create_key, - serialized_start=222, - serialized_end=471, + serialized_start=252, + serialized_end=510, methods=[ _descriptor.MethodDescriptor( name='start', @@ -216,8 +250,8 @@ _PROJECTOR = _descriptor.ServiceDescriptor( full_name='provee.Projector.getProjectionPoints', index=2, containing_service=None, - input_type=_TRAININGSETROW, - output_type=_POINT, + input_type=_TRAININGCHUNK, + output_type=_POINTCHUNK, serialized_options=None, create_key=_descriptor._internal_create_key, ), @@ -227,7 +261,7 @@ _PROJECTOR = _descriptor.ServiceDescriptor( index=3, containing_service=None, input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, - output_type=_POINT, + output_type=_POINTCHUNK, serialized_options=None, create_key=_descriptor._internal_create_key, ), diff --git a/backend/gRPCProjector/python/projector_pb2_grpc.py b/backend/gRPCProjector/python/projector_pb2_grpc.py index f36b3e05eee43d386a43cd7478846f73688d6141..e4b7402def98fb6e2d7d7dfaa7ace3bfa3d7928c 100644 --- a/backend/gRPCProjector/python/projector_pb2_grpc.py +++ b/backend/gRPCProjector/python/projector_pb2_grpc.py @@ -28,13 +28,13 @@ class ProjectorStub(object): ) self.getProjectionPoints = channel.stream_stream( '/provee.Projector/getProjectionPoints', - request_serializer=projector__pb2.TrainingSetRow.SerializeToString, - response_deserializer=projector__pb2.Point.FromString, + request_serializer=projector__pb2.TrainingChunk.SerializeToString, + response_deserializer=projector__pb2.PointChunk.FromString, ) self.getUpdates = channel.unary_stream( '/provee.Projector/getUpdates', request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, - response_deserializer=projector__pb2.Point.FromString, + response_deserializer=projector__pb2.PointChunk.FromString, ) @@ -95,13 +95,13 @@ def add_ProjectorServicer_to_server(servicer, server): ), 'getProjectionPoints': grpc.stream_stream_rpc_method_handler( servicer.getProjectionPoints, - request_deserializer=projector__pb2.TrainingSetRow.FromString, - response_serializer=projector__pb2.Point.SerializeToString, + request_deserializer=projector__pb2.TrainingChunk.FromString, + response_serializer=projector__pb2.PointChunk.SerializeToString, ), 'getUpdates': grpc.unary_stream_rpc_method_handler( servicer.getUpdates, request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, - response_serializer=projector__pb2.Point.SerializeToString, + response_serializer=projector__pb2.PointChunk.SerializeToString, ), } generic_handler = grpc.method_handlers_generic_handler( @@ -160,8 +160,8 @@ class Projector(object): timeout=None, metadata=None): return grpc.experimental.stream_stream(request_iterator, target, '/provee.Projector/getProjectionPoints', - projector__pb2.TrainingSetRow.SerializeToString, - projector__pb2.Point.FromString, + projector__pb2.TrainingChunk.SerializeToString, + projector__pb2.PointChunk.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) @@ -178,6 +178,6 @@ class Projector(object): metadata=None): return grpc.experimental.unary_stream(request, target, '/provee.Projector/getUpdates', google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, - projector__pb2.Point.FromString, + projector__pb2.PointChunk.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/backend/gRPCProjector/python/protos/README.md b/backend/gRPCProjector/python/protos/README.md deleted file mode 100644 index 128e4b2fa1fce1c43af79e2956e931e79b266d90..0000000000000000000000000000000000000000 --- a/backend/gRPCProjector/python/protos/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# protos - -Contains all Protos (gRCP) files that are used to communicate between services in Provee \ No newline at end of file diff --git a/backend/grpcKNN/Dockerfile b/backend/grpcKNN/Dockerfile new file mode 100644 index 0000000000000000000000000000000000000000..f0ce763951403bcc161003eccfd66ede16a19de6 --- /dev/null +++ b/backend/grpcKNN/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.8 +WORKDIR /code +COPY requirements.txt . +RUN pip install -r requirements.txt +COPY /. . +ENV PYTHONUNBUFFERED=1 +CMD ["python3","-u", "./knnServer.py"] \ No newline at end of file diff --git a/backend/grpcKNN/faissTest.py b/backend/grpcKNN/faissTest.py new file mode 100644 index 0000000000000000000000000000000000000000..91b5a4e4048e501d432b4243e61e49bc0783d594 --- /dev/null +++ b/backend/grpcKNN/faissTest.py @@ -0,0 +1,58 @@ +import numpy as np +import faiss +from tabulate import tabulate +fileName = "word2vec_reddit_300_10000.txt" + + +#Read in data +with open(fileName) as f: + content = f.readlines() + content = [x.split(' ') for x in content] + ids = [x[0] for x in content] + wordToID = {k: v for v,k in enumerate(ids)} + db = [x[1:] for x in content] + db = [[np.float32(y) for y in x] for x in db] + allData = np.array(db) + +index = faiss.IndexFlatL2(len(allData[0])) # build the index +index.add(allData) # add vectors to the index + + +def printClosest(indices, distances): + words = [ids[x] for x in indices[0]] + tableForm = tabulate(list(zip(words,distances[0])),headers=['Word','Distance']) + print(tableForm) + +def knn(word,k): + if word not in wordToID: + print(word, "not in the set") + return + + wordIndex = wordToID[word] + vector = allData[wordIndex] + knnVector(vector,k) + + + +def knnVector(vector,k): + D, I = index.search(np.asarray([vector]), k) # sanity check + printClosest(I, D) + +#knn("woman",5) + + + + +def knnSemantic(word1,word2,word3,k): + if word1 not in wordToID or word2 not in wordToID or word3 not in wordToID: + print("a word is not in the set") + return + vector1 = allData[wordToID[word1]] + vector2 = allData[wordToID[word2]] + vector3 = allData[wordToID[word3]] + + knnVector(vector1 - vector2 + vector3,k) + +knnSemantic("king","man","woman",10) + + diff --git a/backend/grpcKNN/knnServer.py b/backend/grpcKNN/knnServer.py new file mode 100644 index 0000000000000000000000000000000000000000..632b4fed70c51309d7301d0b950d571b6cef4819 --- /dev/null +++ b/backend/grpcKNN/knnServer.py @@ -0,0 +1,118 @@ +from concurrent import futures +import grpc +import knn_pb2_grpc as rpc +import faiss +import knn_pb2 as knn +import numpy as np +import re +import operator +from tabulate import tabulate + +class Data(): + def __init__(self): + self.allData = [] + self.wordToID = {} + self.ids = [] + self.index = None + +class KNNService(rpc.KNNServicer): + def __init__(self): + self.clientList = [] + self.pointID = 0 + + #Get client ID + def metaToID(self,context): + try: + metadict = dict(context.invocation_metadata()) + userid = int(metadict['id']) + return userid + except Exception as e: + print("Error in metaToID (forgot to add user id?)") + print(e) + + #Store all the data from a new point (can be improved ater) + def storePoint(self,message, context): + try: + dataObject = self.clientList[self.metaToID(context)] + hdvector = [np.float32(x) for x in message.hdvector] + + if not dataObject.index: + dataObject.index = faiss.IndexFlatL2(len(hdvector)) + dataObject.ids.append(message.id) + dataObject.allData.append(hdvector) + dataObject.wordToID[message.id] = len(dataObject.allData)-1 + dataObject.index.add(np.asarray([hdvector])) + except Exception as e: + print(e) + + #Supply client with ID so that it can give it later on with metadata to identify + def getIDfromServer(self, request,context): + clientID = len(self.clientList) + self.clientList.append(Data()) + print("New id: ",clientID) + return knn.ID(id=str(clientID)) + + def calculateFinalVector(self,dataObject,request): + ops = { "+": operator.add, "-": operator.sub } + + strippedWord = "".join(request.words.split()) + wordList = re.split('-|\+',strippedWord) + + wordList = [x.lower() for x in wordList] + print("Wordlist",wordList) + for word in wordList: + if word not in dataObject.wordToID: + print(word,"not in set yet?") + return knn.Neighbours() + + orderedOperators = ''.join(c for c in request.words if c in ["-","+"]) + firstID = dataObject.wordToID[wordList[0]] + resultedVector = np.array(dataObject.allData[firstID]) + + for i in range(1,len(wordList)): + nextID = dataObject.wordToID[wordList[i]] + resultedVector = ops[orderedOperators[i-1]](resultedVector, np.array(dataObject.allData[nextID])) + + return resultedVector + #Do simple linalg expression with 3 vector, improve later + def getKNNRequest(self,request,context): + dataObject = self.clientList[self.metaToID(context)] + + resultedVector = self.calculateFinalVector(dataObject,request) + neighbours = self.knnVector(resultedVector,10,dataObject) + return neighbours + + #Perform actual KNN + def knnVector(self,vector,k,dataObject): + D, I = dataObject.index.search(np.asarray([vector]), k) + words = [dataObject.ids[x] for x in I[0]] + distances = D[0] + returnRows = [knn.Row(id=x,distance=y) for x,y in zip(words,distances)] + neighbours = knn.Neighbours(rows=returnRows) + return neighbours + + def sendProjectionPoints(self, request_iterator, context): + print("Connected") + try: + for trainingChunk in request_iterator: + for row in trainingChunk.rows: + self.storePoint(row,context) + + if self.pointID % 1000 == 0: + print("Received {} points!".format(self.pointID)) + self.pointID+= 1 + except Exception as e: + print("Error",e) + +def serveServer(): + port = '[::]:50052' + server = grpc.server(futures.ThreadPoolExecutor(max_workers=10)) + rpc.add_KNNServicer_to_server(KNNService(), server) + server.add_insecure_port(port) + server.start() + print("Listening on port: " + port) + server.wait_for_termination() + +if __name__ == '__main__': + print("Starting the KNN server") + serveServer() diff --git a/backend/grpcKNN/knn_pb2.py b/backend/grpcKNN/knn_pb2.py new file mode 100644 index 0000000000000000000000000000000000000000..55189de882ddd335c1260c1bcbc20b43d0d4239a --- /dev/null +++ b/backend/grpcKNN/knn_pb2.py @@ -0,0 +1,343 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: knn.proto + +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='knn.proto', + package='provee', + syntax='proto3', + serialized_options=b'\n\017nl.uuvig.proveeB\022ProveProjectorGRCPP\001\242\002\006PROVEE', + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\tknn.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\"\x10\n\x02ID\x12\n\n\x02id\x18\x01 \x01(\t\"&\n\nknnRequest\x12\t\n\x01k\x18\x01 \x01(\x05\x12\r\n\x05words\x18\x02 \x01(\t\"5\n\rTrainingChunk\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"\'\n\nNeighbours\x12\x19\n\x04rows\x18\x01 \x03(\x0b\x32\x0b.provee.Row\"#\n\x03Row\x12\n\n\x02id\x18\x01 \x01(\t\x12\x10\n\x08\x64istance\x18\x02 \x01(\x02\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\xc4\x01\n\x03KNN\x12I\n\x14sendProjectionPoints\x12\x15.provee.TrainingChunk\x1a\x16.google.protobuf.Empty\"\x00(\x01\x12\x39\n\rgetKNNRequest\x12\x12.provee.knnRequest\x1a\x12.provee.Neighbours\"\x00\x12\x37\n\x0fgetIDfromServer\x12\x16.google.protobuf.Empty\x1a\n.provee.ID\"\x00\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3' + , + dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,]) + + + + +_ID = _descriptor.Descriptor( + name='ID', + full_name='provee.ID', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='provee.ID.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=50, + serialized_end=66, +) + + +_KNNREQUEST = _descriptor.Descriptor( + name='knnRequest', + full_name='provee.knnRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='k', full_name='provee.knnRequest.k', index=0, + number=1, type=5, cpp_type=1, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='words', full_name='provee.knnRequest.words', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=68, + serialized_end=106, +) + + +_TRAININGCHUNK = _descriptor.Descriptor( + name='TrainingChunk', + full_name='provee.TrainingChunk', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='rows', full_name='provee.TrainingChunk.rows', index=0, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=108, + serialized_end=161, +) + + +_NEIGHBOURS = _descriptor.Descriptor( + name='Neighbours', + full_name='provee.Neighbours', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='rows', full_name='provee.Neighbours.rows', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=163, + serialized_end=202, +) + + +_ROW = _descriptor.Descriptor( + name='Row', + full_name='provee.Row', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='provee.Row.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='distance', full_name='provee.Row.distance', index=1, + number=2, type=2, cpp_type=6, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=204, + serialized_end=239, +) + + +_TRAININGSETROW = _descriptor.Descriptor( + name='TrainingSetRow', + full_name='provee.TrainingSetRow', + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name='id', full_name='provee.TrainingSetRow.id', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=b"".decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + _descriptor.FieldDescriptor( + name='hdvector', full_name='provee.TrainingSetRow.hdvector', index=1, + number=2, type=1, cpp_type=5, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=b'\020\001', file=DESCRIPTOR, create_key=_descriptor._internal_create_key), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=241, + serialized_end=291, +) + +_TRAININGCHUNK.fields_by_name['rows'].message_type = _TRAININGSETROW +_NEIGHBOURS.fields_by_name['rows'].message_type = _ROW +DESCRIPTOR.message_types_by_name['ID'] = _ID +DESCRIPTOR.message_types_by_name['knnRequest'] = _KNNREQUEST +DESCRIPTOR.message_types_by_name['TrainingChunk'] = _TRAININGCHUNK +DESCRIPTOR.message_types_by_name['Neighbours'] = _NEIGHBOURS +DESCRIPTOR.message_types_by_name['Row'] = _ROW +DESCRIPTOR.message_types_by_name['TrainingSetRow'] = _TRAININGSETROW +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +ID = _reflection.GeneratedProtocolMessageType('ID', (_message.Message,), { + 'DESCRIPTOR' : _ID, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.ID) + }) +_sym_db.RegisterMessage(ID) + +knnRequest = _reflection.GeneratedProtocolMessageType('knnRequest', (_message.Message,), { + 'DESCRIPTOR' : _KNNREQUEST, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.knnRequest) + }) +_sym_db.RegisterMessage(knnRequest) + +TrainingChunk = _reflection.GeneratedProtocolMessageType('TrainingChunk', (_message.Message,), { + 'DESCRIPTOR' : _TRAININGCHUNK, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.TrainingChunk) + }) +_sym_db.RegisterMessage(TrainingChunk) + +Neighbours = _reflection.GeneratedProtocolMessageType('Neighbours', (_message.Message,), { + 'DESCRIPTOR' : _NEIGHBOURS, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.Neighbours) + }) +_sym_db.RegisterMessage(Neighbours) + +Row = _reflection.GeneratedProtocolMessageType('Row', (_message.Message,), { + 'DESCRIPTOR' : _ROW, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.Row) + }) +_sym_db.RegisterMessage(Row) + +TrainingSetRow = _reflection.GeneratedProtocolMessageType('TrainingSetRow', (_message.Message,), { + 'DESCRIPTOR' : _TRAININGSETROW, + '__module__' : 'knn_pb2' + # @@protoc_insertion_point(class_scope:provee.TrainingSetRow) + }) +_sym_db.RegisterMessage(TrainingSetRow) + + +DESCRIPTOR._options = None +_TRAININGSETROW.fields_by_name['hdvector']._options = None + +_KNN = _descriptor.ServiceDescriptor( + name='KNN', + full_name='provee.KNN', + file=DESCRIPTOR, + index=0, + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_start=294, + serialized_end=490, + methods=[ + _descriptor.MethodDescriptor( + name='sendProjectionPoints', + full_name='provee.KNN.sendProjectionPoints', + index=0, + containing_service=None, + input_type=_TRAININGCHUNK, + output_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='getKNNRequest', + full_name='provee.KNN.getKNNRequest', + index=1, + containing_service=None, + input_type=_KNNREQUEST, + output_type=_NEIGHBOURS, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), + _descriptor.MethodDescriptor( + name='getIDfromServer', + full_name='provee.KNN.getIDfromServer', + index=2, + containing_service=None, + input_type=google_dot_protobuf_dot_empty__pb2._EMPTY, + output_type=_ID, + serialized_options=None, + create_key=_descriptor._internal_create_key, + ), +]) +_sym_db.RegisterServiceDescriptor(_KNN) + +DESCRIPTOR.services_by_name['KNN'] = _KNN + +# @@protoc_insertion_point(module_scope) diff --git a/backend/grpcKNN/knn_pb2_grpc.py b/backend/grpcKNN/knn_pb2_grpc.py new file mode 100644 index 0000000000000000000000000000000000000000..3ebf0049852ca511f1422ac1eba642061ed95f65 --- /dev/null +++ b/backend/grpcKNN/knn_pb2_grpc.py @@ -0,0 +1,136 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc + +from google.protobuf import empty_pb2 as google_dot_protobuf_dot_empty__pb2 +import knn_pb2 as knn__pb2 + + +class KNNStub(object): + """Interface exported by the server. + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.sendProjectionPoints = channel.stream_unary( + '/provee.KNN/sendProjectionPoints', + request_serializer=knn__pb2.TrainingChunk.SerializeToString, + response_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + ) + self.getKNNRequest = channel.unary_unary( + '/provee.KNN/getKNNRequest', + request_serializer=knn__pb2.knnRequest.SerializeToString, + response_deserializer=knn__pb2.Neighbours.FromString, + ) + self.getIDfromServer = channel.unary_unary( + '/provee.KNN/getIDfromServer', + request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + response_deserializer=knn__pb2.ID.FromString, + ) + + +class KNNServicer(object): + """Interface exported by the server. + """ + + def sendProjectionPoints(self, request_iterator, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def getKNNRequest(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def getIDfromServer(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_KNNServicer_to_server(servicer, server): + rpc_method_handlers = { + 'sendProjectionPoints': grpc.stream_unary_rpc_method_handler( + servicer.sendProjectionPoints, + request_deserializer=knn__pb2.TrainingChunk.FromString, + response_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + ), + 'getKNNRequest': grpc.unary_unary_rpc_method_handler( + servicer.getKNNRequest, + request_deserializer=knn__pb2.knnRequest.FromString, + response_serializer=knn__pb2.Neighbours.SerializeToString, + ), + 'getIDfromServer': grpc.unary_unary_rpc_method_handler( + servicer.getIDfromServer, + request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString, + response_serializer=knn__pb2.ID.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'provee.KNN', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + + + # This class is part of an EXPERIMENTAL API. +class KNN(object): + """Interface exported by the server. + """ + + @staticmethod + def sendProjectionPoints(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary(request_iterator, target, '/provee.KNN/sendProjectionPoints', + knn__pb2.TrainingChunk.SerializeToString, + google_dot_protobuf_dot_empty__pb2.Empty.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def getKNNRequest(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/provee.KNN/getKNNRequest', + knn__pb2.knnRequest.SerializeToString, + knn__pb2.Neighbours.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def getIDfromServer(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary(request, target, '/provee.KNN/getIDfromServer', + google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString, + knn__pb2.ID.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/backend/grpcKNN/requirements.txt b/backend/grpcKNN/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..d5d065e06c399475d7d96565cee5fbb0dad3365a --- /dev/null +++ b/backend/grpcKNN/requirements.txt @@ -0,0 +1,8 @@ +grpcio==1.34.0 +grpcio-tools==1.34.0 +protobuf==3.14.0 +six==1.15.0 +faiss-cpu==1.7.0 +numpy==1.19.2 +tabulate==0.8.7 +regex==2020.7.14 \ No newline at end of file diff --git a/backend/webSocketGateway/protos/v3/knn.proto b/backend/webSocketGateway/protos/v3/knn.proto new file mode 100644 index 0000000000000000000000000000000000000000..9189ac2ed17668f13a8b2eaa5b5b6f53386fed68 --- /dev/null +++ b/backend/webSocketGateway/protos/v3/knn.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package provee; + +import "google/protobuf/empty.proto"; + +option java_multiple_files = true; +option java_package = "nl.uuvig.provee"; +option java_outer_classname = "ProveProjectorGRCP"; +option objc_class_prefix = "PROVEE"; + + +// Interface exported by the server. +service KNN { + + + rpc sendProjectionPoints(stream TrainingChunk) returns (google.protobuf.Empty) {} + rpc getKNNRequest(knnRequest) returns (Neighbours) {} + rpc getIDfromServer(google.protobuf.Empty) returns (ID) {} +} + +message ID{ + string id = 1; +} + +message knnRequest{ + int32 k = 1; + string words = 2; +} + + +// Needs description +message TrainingChunk { + repeated TrainingSetRow rows = 2; +} + +message Neighbours { + repeated Row rows = 1; +} + +message Row { + string id = 1; + float distance = 2; +} + +// Needs documentation +message TrainingSetRow { + // The id of the row, e.g., row index. + string id = 1; + + // The hd vector of the item. + repeated double hdvector = 2 [packed=true]; +} + diff --git a/backend/webSocketGateway/protos/v3/old/knn.proto b/backend/webSocketGateway/protos/v3/old/knn.proto new file mode 100644 index 0000000000000000000000000000000000000000..2bdc51b9b654be823d2db65e7ee50e7b4dacf903 --- /dev/null +++ b/backend/webSocketGateway/protos/v3/old/knn.proto @@ -0,0 +1,48 @@ +syntax = "proto3"; + +package provee; + +import "google/protobuf/empty.proto"; + +option java_multiple_files = true; +option java_package = "nl.uuvig.provee"; +option java_outer_classname = "ProveProjectorGRCP"; +option objc_class_prefix = "PROVEE"; + + +// Interface exported by the server. +service KNN { + + + rpc sendProjectionPoints(stream TrainingSetRow) returns (google.protobuf.Empty) {} + rpc getKNNRequest(knnRequest) returns (Neighbours) {} + rpc getIDfromServer(google.protobuf.Empty) returns (ID) {} +} + +message ID{ + string id = 1; +} + +message knnRequest{ + int32 k = 1; + string words = 2; +} + +message Neighbours { + repeated Row rows = 1; +} + +message Row { + string id = 1; + float distance = 2; +} + +// Needs documentation +message TrainingSetRow { + // The id of the row, e.g., row index. + string id = 1; + + // The hd vector of the item. + repeated double hdvector = 2 [packed=true]; +} + diff --git a/backend/gRPCProjector/python/protos/v3/projector.proto b/backend/webSocketGateway/protos/v3/old/projector.proto similarity index 92% rename from backend/gRPCProjector/python/protos/v3/projector.proto rename to backend/webSocketGateway/protos/v3/old/projector.proto index de425c04e965f933bfdeac12364f6048b8374035..4ecc05dbdde02ff36b1943c6a2945487442d28e6 100644 --- a/backend/gRPCProjector/python/protos/v3/projector.proto +++ b/backend/webSocketGateway/protos/v3/old/projector.proto @@ -25,7 +25,7 @@ service Projector { // Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field). - rpc getProjectionPoints(stream TrainingSetRow) returns (stream Point) {} + rpc getProjectionPoints(stream TrainingChunk) returns (stream PointChunk) {} // A server-to-client streaming RPC. // @@ -54,8 +54,11 @@ message Point { float y = 3; } +message PointChunk{ + repeated Point points = 1; +} // Needs description -message TrainingSet { +message TrainingChunk { string modelid = 1; repeated TrainingSetRow rows = 2; } diff --git a/backend/webSocketGateway/protos/v3/projector.proto b/backend/webSocketGateway/protos/v3/projector.proto index de425c04e965f933bfdeac12364f6048b8374035..f5ae159c988fdeec0b2ac138227fb2e67475edec 100644 --- a/backend/webSocketGateway/protos/v3/projector.proto +++ b/backend/webSocketGateway/protos/v3/projector.proto @@ -25,14 +25,14 @@ service Projector { // Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field). - rpc getProjectionPoints(stream TrainingSetRow) returns (stream Point) {} + rpc getProjectionPoints(stream TrainingChunk) returns (stream PointChunk) {} // A server-to-client streaming RPC. // // Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field). - rpc getUpdates(google.protobuf.Empty) returns (stream Point) {} + rpc getUpdates(google.protobuf.Empty) returns (stream PointChunk) {} // // A client-to-server streaming RPC. // // @@ -54,9 +54,12 @@ message Point { float y = 3; } +message PointChunk{ + repeated Point points = 1; +} + // Needs description -message TrainingSet { - string modelid = 1; +message TrainingChunk { repeated TrainingSetRow rows = 2; } diff --git a/backend/webSocketGateway/src/grpcConnection.js b/backend/webSocketGateway/src/grpcConnection.js new file mode 100644 index 0000000000000000000000000000000000000000..47e4b5bc03dff09c6206e2d0c653471b170cb7ee --- /dev/null +++ b/backend/webSocketGateway/src/grpcConnection.js @@ -0,0 +1,121 @@ + +var grpc = require('@grpc/grpc-js'); +var protoLoader = require('@grpc/proto-loader'); + +class GrpcConnection{ + constructor(client){ + this.client = client + this.calls = null + this.id = null + } +} + + +function rowToTrainingRow(row){ + if (row.length == 0) { + return null; + } + + var hdvector = []; + for (var j = 1; j < row.length; j++) { + hdvector.push(parseFloat(row[j])); + } + return { id: row[0], hdvector: hdvector }; +} + + +module.exports = { + closeClient : function(client){ + grpc.closeClient(client); + }, + + getGRPCPackage : function (PROTO_PATH){ + var packageDefinition = protoLoader.loadSync( + PROTO_PATH, + { + keepCase: true, + longs: String, + enums: String, + defaults: true, + oneofs: true + }); + + return packageDefinition; + }, + + getGRPCClient: function (_target,package,name){ + let pkg = grpc.loadPackageDefinition(package); + let clientClass = pkg.provee[name]; + client = new clientClass(_target, grpc.credentials.createInsecure()); + let connection = new GrpcConnection(client); + return connection; + }, + + getGRPCBidirect: function (ws,call,callback){ + console.log(`Connecting to grpc Server`) + + call.on("data", (response) => { + callback(response, ws) + }); + + call.on("error", (err) => { + console.log( + `Unexpected stream error: code = ${err.code}` + + `, message = "${err.message}"` + ); + console.log("Retrying in 5 seconds"); + setTimeout(function () { getGRPCBidirect(ws,clientCall,callback) }, 5000); + }); + + call.on("end", () => { console.log("GRPC connection closed.") }) + return call; + }, + + //Send row to each microservice + sendRowToServer : function (allRows,servers) { + var trainingRows = [] + + for (var i = 0; i < allRows.length; i++) { + var trainingRow = rowToTrainingRow(allRows[i]); + if(trainingRow){ + trainingRows.push(trainingRow); + } + } + + var trainingChunk = {rows: trainingRows}; + servers.forEach(server => { + server.calls[0].write(trainingChunk) + }); + }, + //TODO, extract general setup, and move back to webSocketGateway.js + getKNNConnection: function (connection){ + connection.knnConn.client.getIDfromServer({},function(error,response){ + if(error){ + console.log("error") + console.log(error) + } + if(!response){ + return; + } + + connection.knnConn.id = response["id"]; + const meta = new grpc.Metadata(); + meta.add('id',connection.knnConn.id); + + let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){ + console.log("finished points"); + console.log(error); + }); + + connection.knnConn.calls=[call] + console.log("Setup knn call") + }); + }, +} + + + + + + + diff --git a/backend/webSocketGateway/src/webSocketGateway.js b/backend/webSocketGateway/src/webSocketGateway.js index 0f94c641c673cb30a0e71d379881a973e39d54d7..f265732ff60fa33c61e698209cfbe83b6ddf3922 100644 --- a/backend/webSocketGateway/src/webSocketGateway.js +++ b/backend/webSocketGateway/src/webSocketGateway.js @@ -1,86 +1,72 @@ const WebSocket = require('ws'); +var grpc = require("./grpcConnection") +var grpcJS = require('@grpc/grpc-js'); + +//Setup webserver const webSocketHost = "0.0.0.0" const webSocketPort = 9898 +var projectorTarget = process.env.PROJECTOR_TARGET ||'127.0.0.1:50051'; //var target = "proveeprojectorservice:50051" +var knnTarget = process.env.KNN_TARGET || '127.0.0.1:50052'; //"proveeknnservice:50052" const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort }); -const http = require("http"); -var target = process.env.PROJECTOR_ADDRESS ||'127.0.0.1:50051'; -console.log("Target is:" + target); -var PROTO_PATH = __dirname + '/../protos/v3/projector.proto'; - -var grpc = require('@grpc/grpc-js'); -var protoLoader = require('@grpc/proto-loader'); -const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call'); -var packageDefinition = protoLoader.loadSync( - PROTO_PATH, - { - keepCase: true, - longs: String, - enums: String, - defaults: true, - oneofs: true - }); -var count = 0; +console.log("Projector target " +projectorTarget); +console.log("KNN target: " + knnTarget) + +var projectorPackage = grpc.getGRPCPackage( __dirname + '/../protos/v3/projector.proto'); +var KNNPackage = grpc.getGRPCPackage(__dirname + '/../protos/v3/knn.proto'); + var pointID = 0; + class ProjectionRequest{ - constructor(ws,client,calls,lineAmount){ - this.index = count; - count += 1; + constructor(ws){ + this.index = 0; this.ws = ws; - this.client = client; - this.calls = calls; - this.lineAmount = lineAmount + this.projectorConn = null; + this.knnConn = null; + this.lineAmount = 0; } } wsServer.on('connection', function connection(ws) { console.log(`Client connected with websocket`); - var currConnection = new ProjectionRequest(ws,null,null,0); + var currConnection = new ProjectionRequest(ws); + ws.on('message', function incoming(message) { parseMessage(message, currConnection); }); - ws.on('close', (code,reason) =>{ + ws.on('close', (code, reason) => { console.log("Closed"); - if (currConnection.client){ + if (currConnection.client) { grpc.closeClient(client); - } + } }) }); +//Notify client of error +function sendError(ws,message){ + ws.send(JSON.stringify({type: "error", message: message})); + } -function getGRPCCLient(){ - let pkg = grpc.loadPackageDefinition(packageDefinition); - let projector = pkg.provee["Projector"]; - client = new projector(target, grpc.credentials.createInsecure()); - return client -} -//Connects to grpc after first message -function getConnection(ws,client){ - call = client.getProjectionPoints(); - console.log(`Connecting to grpc Server at: ${target}`) +function KNNNeighbourRequest(connection,words,k){ + console.log("Requesting " + words); + const meta = new grpcJS.Metadata(); + meta.add('id',connection.knnConn.id); - call.on("data", (response) => { - sendDataStreamToClient(JSON.stringify(response),ws) - }); - - call.on("error", (err) => { - console.log( - `Unexpected stream error: code = ${err.code}` + - `, message = "${err.message}"` - ); - console.log("Retrying in 5 seconds"); - setTimeout(function(){getConnection(ws)}, 5000); - }); - - call.on("end", () => {console.log("GRPC connection closed.")}) + connection.knnConn.client.getKNNRequest({k:k,words: words},meta,function(err,response){ + if(response){ + console.log("got response"); + sendNeighbour(response,connection.ws); + return; + } - return call; + sendError(connection.ws,"No valid request: " + words) + console.log(err); + }); } - //Parse the message from browser function parseMessage(message, connection) { const jsonMessage = JSON.parse(message); @@ -89,71 +75,91 @@ function parseMessage(message, connection) { console.log("Requested stream"); sendRandomPointStream(connection); break; + case "sendDataRow": - sendRowToServer(jsonMessage["row"],jsonMessage["lineIndex"],connection); + grpc.sendRowToServer(jsonMessage["row"], [connection.projectorConn,connection.knnConn]); break; + case "setLineCount": - connection.lineAmount = parseInt(jsonMessage["amount"]); + connection.lineAmount = parseInt(jsonMessage["amount"]); break; - case "setProjectorAmount": - var amount = parseInt(jsonMessage["amount"]); - connection.client = getGRPCCLient(); - var allCalls = [] - - //Create that amount of grpc connections - for(var i = 0; i< amount; i+=1){ - var grpcConnection = getConnection(connection.ws,connection.client); - allCalls.push(grpcConnection) + + case "getKNNNeighbours": + var k = parseInt(jsonMessage["amount"]); + var words = jsonMessage["word"] + + if(!connection.knnConn){ + console.log("No KNN microservice"); + sendError(connection.ws,"No KNN microservice") + break; } + KNNNeighbourRequest(connection,words,k); + break; - connection.calls = allCalls; + case "getKNN": + console.log("setup KNN"); + connection.knnConn = grpc.getGRPCClient(knnTarget,KNNPackage,"KNN"); + grpc.getKNNConnection(connection) + break; + + case "setProjectorAmount": + var amount = parseInt(jsonMessage["amount"]); + connection.projectorConn =grpc.getGRPCClient(projectorTarget,projectorPackage,"Projector"); + SetupKProjectors(amount,connection) break; + default: console.log("Error! Unknown request:" + jsonMessage["type"]); } } -function generatePoint() { - const id = pointID++; - const x = Math.floor(Math.random() * 100); - const y = Math.floor(Math.random() * 100); - return { id, x, y }; +function SetupKProjectors (amount,connection){ + var allCalls = [] + + //Create that amount of grpc connections + for(var i = 0; i< amount; i++){ + var call = connection.projectorConn.client.getProjectionPoints() + var grpcConnection = grpc.getGRPCBidirect(connection.ws,call,sendDataStreamToClient); + allCalls.push(grpcConnection) + } + connection.projectorConn.calls = allCalls; +} + +function generatePoints() { + var pointChunk = [] + for(var i = 0; i < 100; i++){ + const id = pointID++; + const x = Math.floor(Math.random() * 100); + const y = Math.floor(Math.random() * 100); + pointChunk.push({ id, x, y }) + } + + return {points: pointChunk}; } //Get the stream function sendRandomPointStream(connection) { - for(var i = 0; i < 100000; i++){ - sendDataStreamToClient(JSON.stringify(generatePoint()),connection.ws); + for (var i = 0; i < 1000; i++) { + sendDataStreamToClient(generatePoints(), connection.ws); } } -//Send row to each microservice -function sendRowToServer(row,lineIndex,connection) { - var hdvector = []; - for (var i =1; i < row.length; i++){ - hdvector.push(parseFloat(row[i])); +//Result from KNN request +function sendNeighbour(response,ws){ + if(!response || response["rows"].length == 0){ + const errMSG = "The word (or one of the words) was not in the set (or hasnt been received yet)"; + console.log(errMSG); + sendError(ws,errMSG) + return; } - - var trainingSetRow = {id: row[0] ,hdvector: hdvector}; - //Linearly distribute rows to the projectors - for(var i =0; i< connection.calls.length;i++){ - if( lineIndex < (connection.lineAmount / (connection.calls.length-i))){ - connection.calls[i].write(trainingSetRow) - } + for(var i = 0; i < response["rows"].length; i++){ + console.log(response["rows"][i]["id"] + " " + response["rows"][i]["distance"]); + ws.send(JSON.stringify({type: "neighbour", neighbour: response["rows"][i]["id"],distance: response["rows"][i]["distance"].toString()})); } } //Send client to browser function sendDataStreamToClient(data, ws) { - var list = data.toString().trim().split("\n"); - for (var i = 0; i < list.length; i++) { - try { - var response = list[i]; - ws.send(response); - } catch (e) { - console.log(e); - } - } -} - - + var rows = data["points"] + ws.send(JSON.stringify({type: "points", points: rows})); +} \ No newline at end of file diff --git a/frontend/.idea/vcs.xml b/frontend/.idea/vcs.xml new file mode 100644 index 0000000000000000000000000000000000000000..6c0b8635858dc7ad44b93df54b762707ce49eefc --- /dev/null +++ b/frontend/.idea/vcs.xml @@ -0,0 +1,6 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="VcsDirectoryMappings"> + <mapping directory="$PROJECT_DIR$/.." vcs="Git" /> + </component> +</project> \ No newline at end of file diff --git a/frontend/.idea/workspace.xml b/frontend/.idea/workspace.xml new file mode 100644 index 0000000000000000000000000000000000000000..4d8361b75d6f8ca2970cf879805eebc7a470c7e6 --- /dev/null +++ b/frontend/.idea/workspace.xml @@ -0,0 +1,62 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="BranchesTreeState"> + <expand> + <path> + <item name="ROOT" type="e8cecc67:BranchNodeDescriptor" /> + <item name="LOCAL_ROOT" type="e8cecc67:BranchNodeDescriptor" /> + </path> + <path> + <item name="ROOT" type="e8cecc67:BranchNodeDescriptor" /> + <item name="REMOTE_ROOT" type="e8cecc67:BranchNodeDescriptor" /> + </path> + <path> + <item name="ROOT" type="e8cecc67:BranchNodeDescriptor" /> + <item name="REMOTE_ROOT" type="e8cecc67:BranchNodeDescriptor" /> + <item name="GROUP_NODE:origin" type="e8cecc67:BranchNodeDescriptor" /> + </path> + </expand> + <select /> + </component> + <component name="ChangeListManager"> + <list default="true" id="61321bbf-d1d1-412e-a2a1-23c0d3dab7e9" name="Default Changelist" comment="" /> + <option name="SHOW_DIALOG" value="false" /> + <option name="HIGHLIGHT_CONFLICTS" value="true" /> + <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" /> + <option name="LAST_RESOLUTION" value="IGNORE" /> + </component> + <component name="Git.Settings"> + <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." /> + </component> + <component name="ProjectId" id="1oDFUgY2aaYL8kkjAENi72U5TKf" /> + <component name="ProjectLevelVcsManager" settingsEditedManually="true" /> + <component name="ProjectViewState"> + <option name="hideEmptyMiddlePackages" value="true" /> + <option name="showLibraryContents" value="true" /> + </component> + <component name="PropertiesComponent"> + <property name="RunOnceActivity.OpenProjectViewOnStart" value="true" /> + </component> + <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" /> + <component name="TaskManager"> + <task active="true" id="Default" summary="Default task"> + <changelist id="61321bbf-d1d1-412e-a2a1-23c0d3dab7e9" name="Default Changelist" comment="" /> + <created>1612813272491</created> + <option name="number" value="Default" /> + <option name="presentableId" value="Default" /> + <updated>1612813272491</updated> + </task> + <servers /> + </component> + <component name="Vcs.Log.Tabs.Properties"> + <option name="TAB_STATES"> + <map> + <entry key="MAIN"> + <value> + <State /> + </value> + </entry> + </map> + </option> + </component> +</project> \ No newline at end of file diff --git a/frontend/src/components/webSocketStreaming.vue b/frontend/src/components/webSocketStreaming.vue index fe284fdc100b3abdcdb844ee4ff674fda013523a..a9dfbeac7d6f724979799467c29451020cff325f 100644 --- a/frontend/src/components/webSocketStreaming.vue +++ b/frontend/src/components/webSocketStreaming.vue @@ -35,8 +35,26 @@ ></v-file-input> <v-btn color="primary" text @click="submitFiles">SUBMIT</v-btn> + <v-checkbox v-model="KNNcheckbox" :label="`use KNN`"></v-checkbox> + + <v-spacer></v-spacer> + </v-layout> + <v-layout row> <v-spacer></v-spacer> + <h3>Point chunk size:</h3> + <v-text-field + v-model="pointChunkSize" + hide-details + single-line + type="number" + max="10000" + min="1" + value="100" + onblur + /> + <v-spacer></v-spacer> + </v-layout> <v-layout row> <v-spacer></v-spacer> @@ -51,6 +69,9 @@ value="1" onblur /> + <h3>KNN word:</h3> + <v-text-field v-model="KNNword" clearable></v-text-field> + <v-btn color="primary" text @click="submitWord">SUBMIT</v-btn> <v-spacer></v-spacer> </v-layout> @@ -65,10 +86,13 @@ export default { data() { return { projectorAmount: 1, + pointChunkSize:100, succesAlert: false, launched: false, file: null, connection: null, + KNNcheckbox: true, + KNNword: "King", }; }, @@ -88,10 +112,8 @@ export default { }; this.connection.onmessage = function (event) { - const pointJSON = JSON.parse(event.data); - if (pointJSON) { - self.$emit("newPoint", pointJSON); - } + const jsonMessage = JSON.parse(event.data); + self.parseMessage(jsonMessage) }; this.connection.onclose = function (event) { @@ -99,9 +121,35 @@ export default { console.log(event); }; }, + + methods: { - //Estimate to quickly calculate size + parseMessage(jsonMessage){ + switch (jsonMessage["type"]) { + case "error": + var error = jsonMessage["message"]; + console.log(error); + break; + case "points": + var points = jsonMessage["points"]; + if (!points) { + console.log("empty message") + return + } + points.forEach(point => {this.$emit("newPoint", point);}); + break; + case "neighbour": + var neighbour = jsonMessage["neighbour"]; + var distance = jsonMessage["distance"]; + console.log("Word: " + neighbour + " Distance: " + distance); + break; + default: + console.log("No implementation for " + jsonMessage["type"]); + } + }, + + //Estimate to quickly calculate size getLineCountEstimate(file, callback) { var totalSize = 0; var lineCount = 0; @@ -109,25 +157,24 @@ export default { //Use papaparse with preview Papa.parse(file, { worker: true, - delimiter: " ", - preview: 1000, + delimiter: " ", + preview: 1000, step: function (row) { lineCount += 1; - row.data.forEach(string => { - totalSize+= Buffer.byteLength(string, 'utf8'); + row.data.forEach((string) => { + totalSize += Buffer.byteLength(string, "utf8"); }); }, complete: function () { - var estimate = Math.round((lineCount/totalSize)*file.size*0.92); - console.log("Estimated size: " + estimate) + var estimate = Math.round((lineCount / totalSize) * file.size * 0.92); + console.log("Estimated size: " + estimate); callback(estimate); }, }); }, - parseData(file, lineCount) { + parseData(lineCount) { let self = this; - //Send necessary information self.connection.send( JSON.stringify({ @@ -135,6 +182,7 @@ export default { amount: this.projectorAmount, }) ); + self.connection.send( JSON.stringify({ type: "setLineCount", @@ -142,27 +190,90 @@ export default { }) ); + if (this.KNNcheckbox) { + self.connection.send( + JSON.stringify({ + type: "getKNN", + }) + ); + } + + this.uploadFileStream(lineCount) + return; + }, + + uploadFileStream(lineCount) { + console.log("Filesize: ", this.file.size); console.log(lineCount + " lines"); + var rowBuffer = []; var currLineIndex = 0; + let self = this; + + Papa.parse(this.file, { worker: true, delimiter: " ", - fastMode: true, + fastMode: false, step: function (row) { + rowBuffer.push(row.data); + currLineIndex += 1; + + if (rowBuffer.length % self.pointChunkSize != 0) { + return; + } + self.connection.send( - JSON.stringify({ type: "sendDataRow", row: row.data, lineIndex: currLineIndex }) + JSON.stringify({ + type: "sendDataRow", + row: rowBuffer, + lineIndex: currLineIndex, + }) ); - currLineIndex += 1 + + rowBuffer = []; }, complete: function () { - console.log("All done!"); + //Flush the remaining buffer + if (rowBuffer.length == 0) { + return; + } + + self.connection.send( + JSON.stringify({ + type: "sendDataRow", + row: rowBuffer, + lineIndex: currLineIndex, + }) + ); + + console.log("All done!"); }, }); - return; }, + + submitWord() { + if (this.KNNword == "") { + console.log("No word entered"); + return; + } + if (this.KNNword.includes(" ")) { + console.log("Dont use spaces inbetween words, for example use King-man+woman"); + return; + } + + this.connection.send( + JSON.stringify({ + type: "getKNNNeighbours", + k: "10", + word: this.KNNword, + }) + ); + console.log("Send request for: " + this.KNNword); + }, + submitFiles() { //Start up normal connection if (!this.file) { @@ -172,7 +283,7 @@ export default { } //First get the lineCount, (we need that beforehand to know where to distribute the points) this.getLineCountEstimate(this.file, (lineCount) => { - this.parseData(this.file, lineCount); + this.parseData(lineCount); }); }, }, diff --git a/protos/build.sh b/protos/build.sh index cd978c8b4ab906fa3256e48639a28f0c90ad3959..d777c0bc070d72d3863dba10602f285b9ec99517 100755 --- a/protos/build.sh +++ b/protos/build.sh @@ -1,18 +1,27 @@ #!/bin/bash - DIR="$(dirname "$0")" +PROTOS_DIRECTORY="$PWD/protos" +PROJECTOR_PROTO_FILE="$PROTOS_DIRECTORY/projector.proto" +KNN_PROTO_FILE="$PROTOS_DIRECTORY/knn.proto" -PROJECTOR_PROTO_FILE="$DIR/projector.proto" - -FRONTEND_GENERATED_DIR="$DIR/../frontend/src/generated" -BACKEND_GATEWAY_GENERATED_DIR="$DIR/../backend/gateway/src/generated" +BACKEND_PROJECTOR_GENERATED_DIR="$PWD/../backend/gRPCProjector/python" +BACKEND_KNN_GENERATED_DIR="$PWD/../backend/grpcKNN" +BACKEND_GATEWAY_PROTO_DIR="$PWD/../backend/webSocketGateway/protos/v3" +#Projector +python -m grpc_tools.protoc \ + -I "$PROTOS_DIRECTORY" \ + --python_out="$BACKEND_PROJECTOR_GENERATED_DIR" \ + --grpc_python_out="$BACKEND_PROJECTOR_GENERATED_DIR" \ + "$PROJECTOR_PROTO_FILE" -protoc \ - "$PROJECTOR_PROTO_FILE" \ - --js_out=import_style=commonjs:"$FRONTEND_GENERATED_DIR" \ - --grpc-web_out=import_style=commonjs,mode=grpcwebtext:"$FRONTEND_GENERATED_DIR" +#KNN +python -m grpc_tools.protoc \ + -I "$PROTOS_DIRECTORY" \ + --python_out="$BACKEND_KNN_GENERATED_DIR" \ + --grpc_python_out="$BACKEND_KNN_GENERATED_DIR" \ + "$KNN_PROTO_FILE" -grpc_tools_node_protoc \ - "$PROJECTOR_PROTO_FILE" \ - --js_out=import_style=commonjs:"$BACKEND_GATEWAY_GENERATED_DIR" \ - --grpc_out=grpc_js:"$BACKEND_GATEWAY_GENERATED_DIR" \ No newline at end of file +#Copy for the gateway +cp -fr "$PROJECTOR_PROTO_FILE" "$BACKEND_GATEWAY_PROTO_DIR" +cp -fr "$KNN_PROTO_FILE" "$BACKEND_GATEWAY_PROTO_DIR" +echo "Done" \ No newline at end of file diff --git a/protos/protos/knn.proto b/protos/protos/knn.proto new file mode 100644 index 0000000000000000000000000000000000000000..9189ac2ed17668f13a8b2eaa5b5b6f53386fed68 --- /dev/null +++ b/protos/protos/knn.proto @@ -0,0 +1,54 @@ +syntax = "proto3"; + +package provee; + +import "google/protobuf/empty.proto"; + +option java_multiple_files = true; +option java_package = "nl.uuvig.provee"; +option java_outer_classname = "ProveProjectorGRCP"; +option objc_class_prefix = "PROVEE"; + + +// Interface exported by the server. +service KNN { + + + rpc sendProjectionPoints(stream TrainingChunk) returns (google.protobuf.Empty) {} + rpc getKNNRequest(knnRequest) returns (Neighbours) {} + rpc getIDfromServer(google.protobuf.Empty) returns (ID) {} +} + +message ID{ + string id = 1; +} + +message knnRequest{ + int32 k = 1; + string words = 2; +} + + +// Needs description +message TrainingChunk { + repeated TrainingSetRow rows = 2; +} + +message Neighbours { + repeated Row rows = 1; +} + +message Row { + string id = 1; + float distance = 2; +} + +// Needs documentation +message TrainingSetRow { + // The id of the row, e.g., row index. + string id = 1; + + // The hd vector of the item. + repeated double hdvector = 2 [packed=true]; +} + diff --git a/protos/projector.proto b/protos/protos/projector.proto similarity index 88% rename from protos/projector.proto rename to protos/protos/projector.proto index a2a1dab64955b20ba82e8482070f4131dc483881..f5ae159c988fdeec0b2ac138227fb2e67475edec 100644 --- a/protos/projector.proto +++ b/protos/protos/projector.proto @@ -25,14 +25,14 @@ service Projector { // Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field). - rpc getProjectionPoints(TrainingSet) returns (stream Point) {} + rpc getProjectionPoints(stream TrainingChunk) returns (stream PointChunk) {} // A server-to-client streaming RPC. // // Obtains the Projection Points in 2D given the trainings values stored in a row format. Results are // streamed rather than returned at once (e.g. in a response message with a // repeated field). - rpc getUpdates(google.protobuf.Empty) returns (stream Point) {} + rpc getUpdates(google.protobuf.Empty) returns (stream PointChunk) {} // // A client-to-server streaming RPC. // // @@ -50,13 +50,16 @@ service Projector { // Points are represented as x-y pairs message Point { int32 id = 1; - int32 x = 2; - int32 y = 3; + float x = 2; + float y = 3; +} + +message PointChunk{ + repeated Point points = 1; } // Needs description -message TrainingSet { - string modelid = 1; +message TrainingChunk { repeated TrainingSetRow rows = 2; }