const WebSocket = require('ws'); var grpc = require('@grpc/grpc-js'); var protoLoader = require('@grpc/proto-loader'); const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call'); //Setup webserver const webSocketHost = "0.0.0.0" const webSocketPort = 9898 var projectorTarget = process.env.PROJECTOR_TARGET ||'127.0.0.1:50051'; //var target = "proveeprojectorservice:50051" var knnTarget = process.env.KNN_TARGET || '127.0.0.1:50052'; //"proveeknnservice:50052" const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort }); var projectorPackage = getGRPCPackage( __dirname + '/../protos/v3/projector.proto'); var KNNPackage = getGRPCPackage(__dirname + '/../protos/v3/knn.proto'); var count = 0; var pointID = 0; class ProjectionRequest{ constructor(ws){ this.index = 0; count += 1; this.ws = ws; this.projectorConn = null; this.knnConn = null; this.lineAmount = 0; } } class GrpcConnection{ constructor(client){ this.client = client this.calls = null this.id = null } } wsServer.on('connection', function connection(ws) { console.log(`Client connected with websocket`); var currConnection = new ProjectionRequest(ws); ws.on('message', function incoming(message) { parseMessage(message, currConnection); }); ws.on('close', (code,reason) =>{ console.log("Closed"); if (currConnection.client){ grpc.closeClient(client); } }) }); function getGRPCPackage(PROTO_PATH){ var packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); return packageDefinition; } function getGRPCClient(_target,package,name){ let pkg = grpc.loadPackageDefinition(package); let clientClass = pkg.provee[name]; client = new clientClass(_target, grpc.credentials.createInsecure()); let connection = new GrpcConnection(client); return connection; } //Connects to grpc after first message function getProjectorConnection(ws,client){ call = client.getProjectionPoints(); console.log(`Connecting to grpc Server`) call.on("data", (response) => { sendDataStreamToClient(JSON.stringify(response),ws) }); call.on("error", (err) => { console.log( `Unexpected stream error: code = ${err.code}` + `, message = "${err.message}"` ); console.log("Retrying in 5 seconds"); setTimeout(function(){getConnection(ws)}, 5000); }); call.on("end", () => {console.log("GRPC connection closed.")}) return call; } //Parse the message from browser function parseMessage(message, connection) { const jsonMessage = JSON.parse(message); switch (jsonMessage["type"]) { case "requestPointStream": console.log("Requested stream"); sendRandomPointStream(connection); break; case "sendDataRow": sendRowToServer(jsonMessage["row"],jsonMessage["lineIndex"],connection); break; case "setLineCount": connection.lineAmount = parseInt(jsonMessage["amount"]); break; case "getKNNNeighbours": var k = parseInt(jsonMessage["amount"]); console.log(jsonMessage); var words = jsonMessage["word"].split(' '); if(!connection.knnConn){ console.log("No KNN microservice"); break; } console.log("Request for: " + words) console.log(k); const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); if(words.length == 1){ connection.knnConn.client.getKNNSingle({k: k, word: words[0]},meta, function(err,response){ console.log("got response"); sendNeighbour(response,connection.ws); }) } else if(words.length == 3){ connection.knnConn.client.getKNNSemantic({k:k,words: words},meta,function(err,response){ console.log("got response"); sendNeighbour(response,connection.ws); }) } else{ console.log("No valid request: " + words) } break; case "getKNN": console.log("setup KNN"); connection.knnConn = getGRPCClient(knnTarget,KNNPackage,"KNN"); connection.knnConn.client.getIDfromServer({},function(error,response){ console.log(response); console.log(error); console.log("KNN id is " + response["id"]); connection.knnConn.id = response["id"]; const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){ console.log("finished points"); }); connection.knnConn.calls=[call] }); break; case "setProjectorAmount": var amount = parseInt(jsonMessage["amount"]); connection.projectorConn = getGRPCClient(projectorTarget,projectorPackage,"Projector"); var allCalls = [] //Create that amount of grpc connections for(var i = 0; i< amount; i+=1){ var grpcConnection = getProjectorConnection(connection.ws,connection.projectorConn.client); allCalls.push(grpcConnection) } connection.projectorConn.calls = allCalls; break; default: console.log("Error! Unknown request:" + jsonMessage["type"]); } } function generatePoint() { const id = pointID++; const x = Math.floor(Math.random() * 100); const y = Math.floor(Math.random() * 100); return { id, x, y }; } //Get the stream function sendRandomPointStream(connection) { for(var i = 0; i < 100000; i++){ sendDataStreamToClient(JSON.stringify(generatePoint()),connection.ws); } } //Send row to each microservice function sendRowToServer(row,lineIndex,connection) { var hdvector = []; for (var i =1; i < row.length; i++){ hdvector.push(parseFloat(row[i])); } var trainingSetRow = {id: row[0] ,hdvector: hdvector}; distributeRowToProjectors(trainingSetRow,lineIndex,connection); sendRowToKNN(trainingSetRow,connection); } function sendRowToKNN(trainingSetRow,connection){ if(!connection.knnConn){ return; } const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); connection.knnConn.calls[0].write(trainingSetRow, meta) } function distributeRowToProjectors(trainingSetRow,lineIndex,connection){ var calls = connection.projectorConn.calls; if(!calls){ return; } //Linearly distribute rows to the projectors for(var i =0; i<calls.length;i++){ if( lineIndex < (connection.lineAmount / (calls.length-i))){ calls[i].write(trainingSetRow) } } } function sendNeighbour(response,ws){ console.log(response); for(var i = 0; i < response["rows"].length; i++){ console.log(response["rows"][i]["id"] + " " + response["rows"][i]["distance"]); ws.send(JSON.stringify({type: "neighbour", neighbour: response["rows"][i]["id"],distance: response["rows"][i]["distance"].toString()})); } } //Send client to browser function sendDataStreamToClient(data, ws) { var list = data.toString().trim().split("\n"); for (var i = 0; i < list.length; i++) { try { var response = list[i]; ws.send(JSON.stringify({type: "point", point: response})); } catch (e) { console.log(e); } } }