From 1b15ed4e08f873741e1f0c82b0018bfacc2846b9 Mon Sep 17 00:00:00 2001
From: IsolatedSushi <simen.vanherpt@gmail.com>
Date: Sat, 27 Feb 2021 23:21:37 +0100
Subject: [PATCH] Implemented Chunking (not yet for knn)

---
 backend/gRPCProjector/python/grpcServer.py    | 18 +++++-------
 backend/gRPCProjector/python/projector_pb2.py | 23 +++++----------
 .../python/projector_pb2_grpc.py              |  6 ++--
 .../protos/v3/projector.proto                 |  4 +--
 .../webSocketGateway/src/webSocketGateway.js  | 20 +++++--------
 .../src/components/webSocketStreaming.vue     | 29 +++++++++++++++++--
 protos/build.sh                               |  5 ++--
 protos/protos/projector.proto                 | 11 ++++---
 8 files changed, 63 insertions(+), 53 deletions(-)

diff --git a/backend/gRPCProjector/python/grpcServer.py b/backend/gRPCProjector/python/grpcServer.py
index 356aaf8..bcfd168 100644
--- a/backend/gRPCProjector/python/grpcServer.py
+++ b/backend/gRPCProjector/python/grpcServer.py
@@ -8,26 +8,22 @@ class ProjectorService(rpc.ProjectorServicer):
 
     def __init__(self):
         self.pointID = 0
-        self.pointChunkAmount = 100
-        self.pointChunkBuffer = []
+        
     
     def getProjectionPoints(self, request_iterator, context):
         print("Connected")
         for trainingChunk in request_iterator:
+            pointChunkBuffer = []
             for row in trainingChunk.rows:
-
-                self.pointChunkBuffer.append(self.rowToPoint(row))
-                if(self.pointChunkBuffer%self.pointChunkAmount==0):
-                    print(1)
-                    pointChunk = projector.PointChunk(points=self.pointChunkBuffer)
-                    print(pointChunkBuffer)
-                    yield pointChunk
-                    self.pointChunkBuffer = []
+                pointChunkBuffer.append(self.rowToPoint(row))
                 
+         
+            print("Sending back",len(pointChunkBuffer),"Points")
+            pointChunk = projector.PointChunk(points=pointChunkBuffer)
+            yield pointChunk            
                  
 
     def rowToPoint(self, row):
-        print(row)
         hdvector = row.hdvector
         returnPoint = projector.Point(id=self.pointID, x = float(hdvector[0]), y=float(hdvector[1]))
         self.pointID += 1
diff --git a/backend/gRPCProjector/python/projector_pb2.py b/backend/gRPCProjector/python/projector_pb2.py
index df44d8d..70e8055 100644
--- a/backend/gRPCProjector/python/projector_pb2.py
+++ b/backend/gRPCProjector/python/projector_pb2.py
@@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
   syntax='proto3',
   serialized_options=b'\n\017nl.uuvig.proveeB\022ProveProjectorGRCPP\001\242\002\006PROVEE',
   create_key=_descriptor._internal_create_key,
-  serialized_pb=b'\n\x0fprojector.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\")\n\x05Point\x12\n\n\x02id\x18\x01 \x01(\x05\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\"+\n\nPointChunk\x12\x1d\n\x06points\x18\x01 \x03(\x0b\x32\r.provee.Point\"F\n\rTrainingChunk\x12\x0f\n\x07modelid\x18\x01 \x01(\t\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\xfd\x01\n\tProjector\x12\x37\n\x05start\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x36\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x46\n\x13getProjectionPoints\x12\x15.provee.TrainingChunk\x1a\x12.provee.PointChunk\"\x00(\x01\x30\x01\x12\x37\n\ngetUpdates\x12\x16.google.protobuf.Empty\x1a\r.provee.Point\"\x00\x30\x01\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3'
+  serialized_pb=b'\n\x0fprojector.proto\x12\x06provee\x1a\x1bgoogle/protobuf/empty.proto\")\n\x05Point\x12\n\n\x02id\x18\x01 \x01(\x05\x12\t\n\x01x\x18\x02 \x01(\x02\x12\t\n\x01y\x18\x03 \x01(\x02\"+\n\nPointChunk\x12\x1d\n\x06points\x18\x01 \x03(\x0b\x32\r.provee.Point\"5\n\rTrainingChunk\x12$\n\x04rows\x18\x02 \x03(\x0b\x32\x16.provee.TrainingSetRow\"2\n\x0eTrainingSetRow\x12\n\n\x02id\x18\x01 \x01(\t\x12\x14\n\x08hdvector\x18\x02 \x03(\x01\x42\x02\x10\x01\x32\x82\x02\n\tProjector\x12\x37\n\x05start\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x36\n\x04stop\x12\x16.google.protobuf.Empty\x1a\x16.google.protobuf.Empty\x12\x46\n\x13getProjectionPoints\x12\x15.provee.TrainingChunk\x1a\x12.provee.PointChunk\"\x00(\x01\x30\x01\x12<\n\ngetUpdates\x12\x16.google.protobuf.Empty\x1a\x12.provee.PointChunk\"\x00\x30\x01\x42\x30\n\x0fnl.uuvig.proveeB\x12ProveProjectorGRCPP\x01\xa2\x02\x06PROVEEb\x06proto3'
   ,
   dependencies=[google_dot_protobuf_dot_empty__pb2.DESCRIPTOR,])
 
@@ -114,14 +114,7 @@ _TRAININGCHUNK = _descriptor.Descriptor(
   create_key=_descriptor._internal_create_key,
   fields=[
     _descriptor.FieldDescriptor(
-      name='modelid', full_name='provee.TrainingChunk.modelid', index=0,
-      number=1, type=9, cpp_type=9, label=1,
-      has_default_value=False, default_value=b"".decode('utf-8'),
-      message_type=None, enum_type=None, containing_type=None,
-      is_extension=False, extension_scope=None,
-      serialized_options=None, file=DESCRIPTOR,  create_key=_descriptor._internal_create_key),
-    _descriptor.FieldDescriptor(
-      name='rows', full_name='provee.TrainingChunk.rows', index=1,
+      name='rows', full_name='provee.TrainingChunk.rows', index=0,
       number=2, type=11, cpp_type=10, label=3,
       has_default_value=False, default_value=[],
       message_type=None, enum_type=None, containing_type=None,
@@ -140,7 +133,7 @@ _TRAININGCHUNK = _descriptor.Descriptor(
   oneofs=[
   ],
   serialized_start=144,
-  serialized_end=214,
+  serialized_end=197,
 )
 
 
@@ -178,8 +171,8 @@ _TRAININGSETROW = _descriptor.Descriptor(
   extension_ranges=[],
   oneofs=[
   ],
-  serialized_start=216,
-  serialized_end=266,
+  serialized_start=199,
+  serialized_end=249,
 )
 
 _POINTCHUNK.fields_by_name['points'].message_type = _POINT
@@ -229,8 +222,8 @@ _PROJECTOR = _descriptor.ServiceDescriptor(
   index=0,
   serialized_options=None,
   create_key=_descriptor._internal_create_key,
-  serialized_start=269,
-  serialized_end=522,
+  serialized_start=252,
+  serialized_end=510,
   methods=[
   _descriptor.MethodDescriptor(
     name='start',
@@ -268,7 +261,7 @@ _PROJECTOR = _descriptor.ServiceDescriptor(
     index=3,
     containing_service=None,
     input_type=google_dot_protobuf_dot_empty__pb2._EMPTY,
-    output_type=_POINT,
+    output_type=_POINTCHUNK,
     serialized_options=None,
     create_key=_descriptor._internal_create_key,
   ),
diff --git a/backend/gRPCProjector/python/projector_pb2_grpc.py b/backend/gRPCProjector/python/projector_pb2_grpc.py
index 4b09c88..e4b7402 100644
--- a/backend/gRPCProjector/python/projector_pb2_grpc.py
+++ b/backend/gRPCProjector/python/projector_pb2_grpc.py
@@ -34,7 +34,7 @@ class ProjectorStub(object):
         self.getUpdates = channel.unary_stream(
                 '/provee.Projector/getUpdates',
                 request_serializer=google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-                response_deserializer=projector__pb2.Point.FromString,
+                response_deserializer=projector__pb2.PointChunk.FromString,
                 )
 
 
@@ -101,7 +101,7 @@ def add_ProjectorServicer_to_server(servicer, server):
             'getUpdates': grpc.unary_stream_rpc_method_handler(
                     servicer.getUpdates,
                     request_deserializer=google_dot_protobuf_dot_empty__pb2.Empty.FromString,
-                    response_serializer=projector__pb2.Point.SerializeToString,
+                    response_serializer=projector__pb2.PointChunk.SerializeToString,
             ),
     }
     generic_handler = grpc.method_handlers_generic_handler(
@@ -178,6 +178,6 @@ class Projector(object):
             metadata=None):
         return grpc.experimental.unary_stream(request, target, '/provee.Projector/getUpdates',
             google_dot_protobuf_dot_empty__pb2.Empty.SerializeToString,
-            projector__pb2.Point.FromString,
+            projector__pb2.PointChunk.FromString,
             options, channel_credentials,
             insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
diff --git a/backend/webSocketGateway/protos/v3/projector.proto b/backend/webSocketGateway/protos/v3/projector.proto
index 4ecc05d..f5ae159 100644
--- a/backend/webSocketGateway/protos/v3/projector.proto
+++ b/backend/webSocketGateway/protos/v3/projector.proto
@@ -32,7 +32,7 @@ service Projector {
   // Obtains the Projection Points in 2D given the trainings values stored in a row format.  Results are
   // streamed rather than returned at once (e.g. in a response message with a
   // repeated field).
-  rpc getUpdates(google.protobuf.Empty) returns (stream Point) {}
+  rpc getUpdates(google.protobuf.Empty) returns (stream PointChunk) {}
 
 //   // A client-to-server streaming RPC.
 //   //
@@ -57,9 +57,9 @@ message Point {
 message PointChunk{
   repeated Point points = 1;
 }
+
 // Needs description
 message TrainingChunk {
-  string modelid = 1;
   repeated TrainingSetRow rows = 2;
 }
 
diff --git a/backend/webSocketGateway/src/webSocketGateway.js b/backend/webSocketGateway/src/webSocketGateway.js
index d8aec6e..09c06ce 100644
--- a/backend/webSocketGateway/src/webSocketGateway.js
+++ b/backend/webSocketGateway/src/webSocketGateway.js
@@ -83,7 +83,7 @@ function getProjectorConnection(ws,client){
   console.log(`Connecting  to grpc Server`)
 
   call.on("data", (response) => {
-    sendDataStreamToClient(JSON.stringify(response), ws)
+    sendDataStreamToClient(response, ws)
   });
 
   call.on("error", (err) => {
@@ -224,23 +224,19 @@ function sendRowToServer(allRows, lineIndex, connection) {
     var trainingRow = { id: row[0], hdvector: hdvector };
     trainingRows.push(trainingRow);
   }
-  var trainingChunk = {id: 0, rows: trainingRows};
+  var trainingChunk = {rows: trainingRows};
   //Linearly distribute rows to the projectors
-  for (var i = 0; i < connection.calls.length; i++) {
-    if (lineIndex < (connection.lineAmount / (connection.calls.length - i))) {
-      connection.calls[i].write(trainingChunk)
-    }
-  }
+  connection.projectorConn.calls[0].write(trainingChunk)
 }
 
 //Send client to browser
 function sendDataStreamToClient(data, ws) {
-  console.log("TESDFGUSDYFGBSDFGODSIFGDUSYGFDS");
-  console.log(data);
-  var list = data.toString().trim().split("\n");
-  for (var i = 0; i < list.length; i++) {
+  var rows = data["points"]
+  ws.send(JSON.stringify({type: "point", point: rows}));
+  return;
+  for (var i = 0; i < rows.length; i++) {
     try {
-      var response = list[i];
+      var response = rows[i];
       ws.send(JSON.stringify({type: "point", point: response}));
     } catch (e) {
       console.log("error");
diff --git a/frontend/src/components/webSocketStreaming.vue b/frontend/src/components/webSocketStreaming.vue
index 7ae779b..4d04dc5 100644
--- a/frontend/src/components/webSocketStreaming.vue
+++ b/frontend/src/components/webSocketStreaming.vue
@@ -36,7 +36,25 @@
 
       <v-btn color="primary" text @click="submitFiles">SUBMIT</v-btn>
       <v-checkbox v-model="KNNcheckbox" :label="`use KNN`"></v-checkbox>
+      
       <v-spacer></v-spacer>
+
+    </v-layout>
+    <v-layout row>
+      <v-spacer></v-spacer>
+      <h3>Point chunk size:</h3>
+      <v-text-field
+        v-model="pointChunkSize"
+        hide-details
+        single-line
+        type="number"
+        max="10000"
+        min="1"
+        value="100"
+        onblur
+      />
+          <v-spacer></v-spacer>
+
     </v-layout>
     <v-layout row>
       <v-spacer></v-spacer>
@@ -68,6 +86,7 @@ export default {
   data() {
     return {
       projectorAmount: 1,
+      pointChunkSize:100,
       succesAlert: false,
       launched: false,
       file: null,
@@ -100,9 +119,12 @@ export default {
           console.log(error);
           break;
         case "point":
-          var point = JSON.parse(jsonMessage["point"]);
+          var point = jsonMessage["point"];
+          console.log(point.length)
           if (point) {
-            self.$emit("newPoint", point);
+            for(var i = 0; i < point.length; i++){
+                self.$emit("newPoint", point[i]);
+            }
           }
           break;
         case "neighbour":
@@ -181,13 +203,14 @@ export default {
       console.log(lineCount + " lines");
       var rowBuffer = [];
       var currLineIndex = 0;
+      let self = this;
       Papa.parse(this.file, {
         worker: true,
         delimiter: " ",
         fastMode: false,
         step: function (row) {
           rowBuffer.push(row.data);
-          if (rowBuffer.length % 100 == 0) {
+          if (rowBuffer.length % self.pointChunkSize == 0) {
             self.connection.send(
               JSON.stringify({
                 type: "sendDataRow",
diff --git a/protos/build.sh b/protos/build.sh
index 40fedf7..d777c0b 100755
--- a/protos/build.sh
+++ b/protos/build.sh
@@ -1,5 +1,4 @@
 #!/bin/bash
-echo "test"
 DIR="$(dirname "$0")"
 PROTOS_DIRECTORY="$PWD/protos"
 PROJECTOR_PROTO_FILE="$PROTOS_DIRECTORY/projector.proto"
@@ -23,6 +22,6 @@ python -m grpc_tools.protoc \
         "$KNN_PROTO_FILE"
 
 #Copy for the gateway
-echo "$BACKEND_GATEWAY_PROTO_DIR"
 cp -fr "$PROJECTOR_PROTO_FILE" "$BACKEND_GATEWAY_PROTO_DIR"
-cp -fr "$KNN_PROTO_FILE" "$BACKEND_GATEWAY_PROTO_DIR"
\ No newline at end of file
+cp -fr "$KNN_PROTO_FILE" "$BACKEND_GATEWAY_PROTO_DIR"
+echo "Done"
\ No newline at end of file
diff --git a/protos/protos/projector.proto b/protos/protos/projector.proto
index de425c0..f5ae159 100644
--- a/protos/protos/projector.proto
+++ b/protos/protos/projector.proto
@@ -25,14 +25,14 @@ service Projector {
   // Obtains the Projection Points in 2D given the trainings values stored in a row format.  Results are
   // streamed rather than returned at once (e.g. in a response message with a
   // repeated field).
-  rpc getProjectionPoints(stream TrainingSetRow) returns (stream Point) {}
+  rpc getProjectionPoints(stream TrainingChunk) returns (stream PointChunk) {}
 
   // A server-to-client streaming RPC.
   //
   // Obtains the Projection Points in 2D given the trainings values stored in a row format.  Results are
   // streamed rather than returned at once (e.g. in a response message with a
   // repeated field).
-  rpc getUpdates(google.protobuf.Empty) returns (stream Point) {}
+  rpc getUpdates(google.protobuf.Empty) returns (stream PointChunk) {}
 
 //   // A client-to-server streaming RPC.
 //   //
@@ -54,9 +54,12 @@ message Point {
   float y = 3;
 }
 
+message PointChunk{
+  repeated Point points = 1;
+}
+
 // Needs description
-message TrainingSet {
-  string modelid = 1;
+message TrainingChunk {
   repeated TrainingSetRow rows = 2;
 }
 
-- 
GitLab