From 3913847287625a6b51a5901d3900a97ca427646e Mon Sep 17 00:00:00 2001 From: IsolatedSushi <simen.vanherpt@gmail.com> Date: Sun, 28 Feb 2021 01:27:41 +0100 Subject: [PATCH] Small additional refactoring --- .../webSocketGateway/src/grpcConnection.js | 40 ++++++++-- .../webSocketGateway/src/webSocketGateway.js | 74 +++++++------------ 2 files changed, 61 insertions(+), 53 deletions(-) diff --git a/backend/webSocketGateway/src/grpcConnection.js b/backend/webSocketGateway/src/grpcConnection.js index 7a56299..47e4b5b 100644 --- a/backend/webSocketGateway/src/grpcConnection.js +++ b/backend/webSocketGateway/src/grpcConnection.js @@ -10,10 +10,25 @@ class GrpcConnection{ } } + +function rowToTrainingRow(row){ + if (row.length == 0) { + return null; + } + + var hdvector = []; + for (var j = 1; j < row.length; j++) { + hdvector.push(parseFloat(row[j])); + } + return { id: row[0], hdvector: hdvector }; +} + + module.exports = { closeClient : function(client){ grpc.closeClient(client); }, + getGRPCPackage : function (PROTO_PATH){ var packageDefinition = protoLoader.loadSync( PROTO_PATH, @@ -52,26 +67,41 @@ module.exports = { setTimeout(function () { getGRPCBidirect(ws,clientCall,callback) }, 5000); }); - call.on("end", () => { console.log("GRPC connection closed.") }) - + call.on("end", () => { console.log("GRPC connection closed.") }) return call; }, + //Send row to each microservice + sendRowToServer : function (allRows,servers) { + var trainingRows = [] + for (var i = 0; i < allRows.length; i++) { + var trainingRow = rowToTrainingRow(allRows[i]); + if(trainingRow){ + trainingRows.push(trainingRow); + } + } + + var trainingChunk = {rows: trainingRows}; + servers.forEach(server => { + server.calls[0].write(trainingChunk) + }); + }, + //TODO, extract general setup, and move back to webSocketGateway.js getKNNConnection: function (connection){ connection.knnConn.client.getIDfromServer({},function(error,response){ if(error){ console.log("error") console.log(error) } - if(!response){ - + if(!response){ return; } - console.log(error); + connection.knnConn.id = response["id"]; const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); + let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){ console.log("finished points"); console.log(error); diff --git a/backend/webSocketGateway/src/webSocketGateway.js b/backend/webSocketGateway/src/webSocketGateway.js index 640d9a0..cbf43bf 100644 --- a/backend/webSocketGateway/src/webSocketGateway.js +++ b/backend/webSocketGateway/src/webSocketGateway.js @@ -1,5 +1,6 @@ const WebSocket = require('ws'); var grpc = require("./grpcConnection") +var grpcJS = require('@grpc/grpc-js'); //Setup webserver const webSocketHost = "0.0.0.0" @@ -17,7 +18,6 @@ var KNNPackage = grpc.getGRPCPackage(__dirname + '/../protos/v3/knn.proto'); var pointID = 0; - class ProjectionRequest{ constructor(ws){ this.index = 0; @@ -32,12 +32,6 @@ wsServer.on('connection', function connection(ws) { console.log(`Client connected with websocket`); var currConnection = new ProjectionRequest(ws); - - console.log("setup KNN"); - currConnection.knnConn = grpc.getGRPCClient(knnTarget,KNNPackage,"KNN"); - grpc.getKNNConnection(currConnection) - - ws.on('message', function incoming(message) { parseMessage(message, currConnection); }); @@ -50,9 +44,7 @@ wsServer.on('connection', function connection(ws) { }) }); - - - +//Notify client of error function sendError(ws,message){ ws.send(JSON.stringify({type: "error", message: message})); } @@ -60,20 +52,19 @@ function sendError(ws,message){ function KNNNeighbourRequest(connection,words,k){ console.log("Requesting " + words); - const meta = new grpc.Metadata(); + const meta = new grpcJS.Metadata(); meta.add('id',connection.knnConn.id); + connection.knnConn.client.getKNNRequest({k:k,words: words},meta,function(err,response){ - if(response){ - console.log("got response"); - sendNeighbour(response,connection.ws); - } - else{ - sendError(connection.ws,"No valid request: " + words) - return; - } - - console.log(err); - }); + if(response){ + console.log("got response"); + sendNeighbour(response,connection.ws); + return; + } + + sendError(connection.ws,"No valid request: " + words) + console.log(err); + }); } //Parse the message from browser @@ -84,12 +75,15 @@ function parseMessage(message, connection) { console.log("Requested stream"); sendRandomPointStream(connection); break; + case "sendDataRow": - sendRowToServer(jsonMessage["row"], jsonMessage["lineIndex"], connection); + grpc.sendRowToServer(jsonMessage["row"], [connection.projectorConn,connection.knnConn]); break; + case "setLineCount": connection.lineAmount = parseInt(jsonMessage["amount"]); break; + case "getKNNNeighbours": var k = parseInt(jsonMessage["amount"]); var words = jsonMessage["word"] @@ -101,20 +95,25 @@ function parseMessage(message, connection) { } KNNNeighbourRequest(connection,words,k); break; + case "getKNN": - + console.log("setup KNN"); + connection.knnConn = grpc.getGRPCClient(knnTarget,KNNPackage,"KNN"); + grpc.getKNNConnection(connection) break; + case "setProjectorAmount": var amount = parseInt(jsonMessage["amount"]); connection.projectorConn =grpc.getGRPCClient(projectorTarget,projectorPackage,"Projector"); - SetupKProjectors(amount,connection,sendDataStreamToClient) + SetupKProjectors(amount,connection) break; + default: console.log("Error! Unknown request:" + jsonMessage["type"]); } } -function SetupKProjectors (amount,connection,callback){ +function SetupKProjectors (amount,connection){ var allCalls = [] //Create that amount of grpc connections @@ -145,28 +144,7 @@ function sendRandomPointStream(connection) { } } -//Send row to each microservice -function sendRowToServer(allRows, lineIndex, connection) { - var trainingRows = [] - for (var i = 0; i < allRows.length; i++) { - var row = allRows[i]; - if (row.length == 0) { - continue; - } - - var hdvector = []; - for (var j = 1; j < row.length; j++) { - hdvector.push(parseFloat(row[j])); - } - var trainingRow = { id: row[0], hdvector: hdvector }; - trainingRows.push(trainingRow); - } - var trainingChunk = {rows: trainingRows}; - //Linearly distribute rows to the projectors - connection.projectorConn.calls[0].write(trainingChunk); - connection.knnConn.calls[0].write(trainingChunk); -} - +//Result from KNN request function sendNeighbour(response,ws){ console.log(response); if(!response || response["rows"].length == 0){ -- GitLab