const WebSocket = require('ws'); const webSocketHost = "0.0.0.0" const webSocketPort = 9898 const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort }); const http = require("http"); var PROTO_PATH = __dirname + '/../protos/v3/projector.proto'; var grpc = require('@grpc/grpc-js'); var protoLoader = require('@grpc/proto-loader'); const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call'); var packageDefinition = protoLoader.loadSync( PROTO_PATH, { keepCase: true, longs: String, enums: String, defaults: true, oneofs: true }); var target = '127.0.0.1:50051'; var count = 0; class Connection{ constructor(ws,client,calls,lineAmount){ this.index = count; count += 1; this.ws = ws; this.client = client; this.calls = calls; this.lineAmount = lineAmount } } wsServer.on('connection', function connection(ws) { console.log(`Client connected with websocket`); var currConnection = new Connection(ws,null,null,0); ws.on('message', function incoming(message) { parseMessage(message, currConnection); }); }); function getGRPCCLient(){ let pkg = grpc.loadPackageDefinition(packageDefinition); let projector = pkg.provee["Projector"]; client = new projector(target, grpc.credentials.createInsecure()); return client } //Connects to grpc after first message function getConnection(ws,client){ call = client.getProjectionPoints(); console.log(`Connecting to grpc Server at: ${target}`) call.on("data", (response) => { sendDataStreamToClient(JSON.stringify(response),ws) }); call.on("error", (err) => { console.log( `Unexpected stream error: code = ${err.code}` + `, message = "${err.message}"` ); console.log("Retrying in 5 seconds"); setTimeout(function(){getConnection(ws)}, 5000); }); call.on("end", () => {console.log("GRPC connection closed.")}) return call; } //Parse the message from browser function parseMessage(message, connection) { const jsonMessage = JSON.parse(message); switch (jsonMessage["type"]) { case "requestPointStream": console.log("Requested stream"); getPointStream(connection); break; case "sendDataRow": sendRowToServer(jsonMessage["row"],jsonMessage["lineIndex"],connection); break; case "setLineCount": connection.lineAmount = parseInt(jsonMessage["amount"]); break; case "setProjectorAmount": var amount = parseInt(jsonMessage["amount"]); connection.client = getGRPCCLient(); var allCalls = [] //Create that amount of grpc connections for(var i = 0; i< amount; i+=1){ var grpcConnection = getConnection(connection.ws,connection.client); allCalls.push(grpcConnection) } connection.calls = allCalls; break; default: console.log("Error! Unknown request:" + jsonMessage["type"]); } } //Get the stream function getPointStream(connection) { var trainingSetRow = {id: "0",hdvector: null}; connection.call[0].write(trainingSetRow); console.log("stream") } //Send row to each microservice function sendRowToServer(row,lineIndex,connection) { var hdvector = []; for (var i =1; i < row.length; i++){ hdvector.push(parseFloat(row[i])); } var trainingSetRow = {id: row[0] ,hdvector: hdvector}; for(var i =0; i< connection.calls.length;i++){ if( lineIndex < (connection.lineAmount / (connection.calls.length-i))){ connection.calls[i].write(trainingSetRow) } } } //Send client to browser function sendDataStreamToClient(data, ws) { var list = data.toString().trim().split("\n"); for (var i = 0; i < list.length; i++) { try { var response = list[i]; ws.send(response); } catch (e) { console.log(e); } } }