const WebSocket = require('ws'); var grpc = require('@grpc/grpc-js'); var protoLoader = require('@grpc/proto-loader'); const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call'); //Setup webserver const webSocketHost = "0.0.0.0" const webSocketPort = 9898 var projectorTarget = process.env.PROJECTOR_TARGET ||'127.0.0.1:50051'; //var target = "proveeprojectorservice:50051" var knnTarget = process.env.KNN_TARGET || '127.0.0.1:50052'; //"proveeknnservice:50052" const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort }); console.log("Projector target " +projectorTarget); console.log("KNN target: " + knnTarget) var projectorPackage = getGRPCPackage( __dirname + '/../protos/v3/projector.proto'); var KNNPackage = getGRPCPackage(__dirname + '/../protos/v3/knn.proto'); var count = 0; var pointID = 0; class ProjectionRequest{ constructor(ws){ this.index = 0; count += 1; this.ws = ws; this.projectorConn = null; this.knnConn = null; this.lineAmount = 0; } } class GrpcConnection{ constructor(client){ this.client = client this.calls = null this.id = null } } wsServer.on('connection', function connection(ws) { console.log(`Client connected with websocket`); var currConnection = new ProjectionRequest(ws); ws.on('message', function incoming(message) { parseMessage(message, currConnection); }); ws.on('close', (code, reason) => { console.log("Closed"); if (currConnection.client) { grpc.closeClient(client); } }) }); function getGRPCPackage(PROTO_PATH){ var packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); return packageDefinition; } function getGRPCClient(_target,package,name){ let pkg = grpc.loadPackageDefinition(package); let clientClass = pkg.provee[name]; client = new clientClass(_target, grpc.credentials.createInsecure()); let connection = new GrpcConnection(client); return connection; } //Connects to grpc after first message function getProjectorConnection(ws,client){ call = client.getProjectionPoints(); console.log(`Connecting to grpc Server`) call.on("data", (response) => { sendDataStreamToClient(response, ws) }); call.on("error", (err) => { console.log( `Unexpected stream error: code = ${err.code}` + `, message = "${err.message}"` ); console.log("Retrying in 5 seconds"); setTimeout(function () { getConnection(ws) }, 5000); }); call.on("end", () => { console.log("GRPC connection closed.") }) return call; } function getKNNConnection(connection){ connection.knnConn.client.getIDfromServer({},function(error,response){ if(!response){ return; } console.log(error); connection.knnConn.id = response["id"]; const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){ console.log("finished points"); console.log(error); }); connection.knnConn.calls=[call] }); } function sendError(ws,message){ ws.send(JSON.stringify({type: "error", message: message})); } function KNNNeighbourRequest(connection,words,k){ console.log("Requesting " + words); const meta = new grpc.Metadata(); meta.add('id',connection.knnConn.id); connection.knnConn.client.getKNNRequest({k:k,words: words},meta,function(err,response){ if(response){ console.log("got response"); sendNeighbour(response,connection.ws); } else{ sendError(connection.ws,"No valid request: " + words) return; } console.log(err); }); } //Parse the message from browser function parseMessage(message, connection) { const jsonMessage = JSON.parse(message); switch (jsonMessage["type"]) { case "requestPointStream": console.log("Requested stream"); sendRandomPointStream(connection); break; case "sendDataRow": sendRowToServer(jsonMessage["row"], jsonMessage["lineIndex"], connection); break; case "setLineCount": connection.lineAmount = parseInt(jsonMessage["amount"]); break; case "getKNNNeighbours": var k = parseInt(jsonMessage["amount"]); var words = jsonMessage["word"] if(!connection.knnConn){ console.log("No KNN microservice"); sendError(connection.ws,"No KNN microservice") break; } KNNNeighbourRequest(connection,words,k); break; case "getKNN": console.log("setup KNN"); connection.knnConn = getGRPCClient(knnTarget,KNNPackage,"KNN"); getKNNConnection(connection) break; case "setProjectorAmount": var amount = parseInt(jsonMessage["amount"]); SetupKProjectors(amount,connection) break; default: console.log("Error! Unknown request:" + jsonMessage["type"]); } } function SetupKProjectors(amount,connection){ connection.projectorConn = getGRPCClient(projectorTarget,projectorPackage,"Projector"); var allCalls = [] //Create that amount of grpc connections for(var i = 0; i< amount; i+=1){ var grpcConnection = getProjectorConnection(connection.ws,connection.projectorConn.client); console.log("test") allCalls.push(grpcConnection) } connection.projectorConn.calls = allCalls; } function generatePoint() { const id = pointID++; const x = Math.floor(Math.random() * 100); const y = Math.floor(Math.random() * 100); return { id, x, y }; } //Get the stream function sendRandomPointStream(connection) { for (var i = 0; i < 100000; i++) { sendDataStreamToClient(JSON.stringify(generatePoint()), connection.ws); } } //Send row to each microservice function sendRowToServer(allRows, lineIndex, connection) { var trainingRows = [] for (var i = 0; i < allRows.length; i++) { var row = allRows[i]; if (row.length == 0) { continue; } var hdvector = []; for (var j = 1; j < row.length; j++) { hdvector.push(parseFloat(row[j])); } var trainingRow = { id: row[0], hdvector: hdvector }; trainingRows.push(trainingRow); } var trainingChunk = {rows: trainingRows}; //Linearly distribute rows to the projectors connection.projectorConn.calls[0].write(trainingChunk) } //Send client to browser function sendDataStreamToClient(data, ws) { var rows = data["points"] ws.send(JSON.stringify({type: "point", point: rows})); return; for (var i = 0; i < rows.length; i++) { try { var response = rows[i]; ws.send(JSON.stringify({type: "point", point: response})); } catch (e) { console.log("error"); console.log(e); } } }