Skip to content
Snippets Groups Projects
webSocketGateway.js 7.57 KiB
const WebSocket = require('ws');
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call');

//Setup webserver
const webSocketHost = "0.0.0.0"
const webSocketPort = 9898
var projectorTarget = process.env.PROJECTOR_TARGET ||'127.0.0.1:50051';  //var target = "proveeprojectorservice:50051"
var knnTarget = process.env.KNN_TARGET || '127.0.0.1:50052';    //"proveeknnservice:50052"
const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort });


console.log("Projector target " +projectorTarget);
console.log("KNN target: " + knnTarget)
var projectorPackage = getGRPCPackage( __dirname + '/../protos/v3/projector.proto');
var KNNPackage = getGRPCPackage(__dirname + '/../protos/v3/knn.proto');

var count = 0;
var pointID = 0;

class ProjectionRequest{
  constructor(ws){
    this.index = 0;
    count += 1;
    this.ws = ws;
    this.projectorConn = null;
    this.knnConn = null;
    this.lineAmount = 0;
  }
}

class GrpcConnection{
  constructor(client){
    this.client = client
    this.calls = null
    this.id = null
  }
}


wsServer.on('connection', function connection(ws) {
  console.log(`Client connected with websocket`);
  var currConnection = new ProjectionRequest(ws);
  ws.on('message', function incoming(message) {
    parseMessage(message, currConnection);
  });

  ws.on('close', (code,reason) =>{
    console.log("Closed");
    if (currConnection.client){
      grpc.closeClient(client);
    }    
  })
});

function getGRPCPackage(PROTO_PATH){
  var packageDefinition = protoLoader.loadSync(
    PROTO_PATH,
    {
      keepCase: true,
      longs: String,
      enums: String,
      defaults: true,
      oneofs: true
    });

    return packageDefinition;
}

function getGRPCClient(_target,package,name){
  let pkg = grpc.loadPackageDefinition(package);
  let clientClass = pkg.provee[name];
  client = new clientClass(_target, grpc.credentials.createInsecure());
  let connection = new GrpcConnection(client);
  return connection;
}

//Connects to grpc after first message
function getProjectorConnection(ws,client){ 
  call = client.getProjectionPoints();
  console.log(`Connecting  to grpc Server`)

  call.on("data", (response) => {
    sendDataStreamToClient(JSON.stringify(response),ws)
  });
  
  call.on("error", (err) => {  
    console.log(
      `Unexpected stream error: code = ${err.code}` +
      `, message = "${err.message}"`
    );
    console.log("Retrying in 5 seconds");
    setTimeout(function(){getConnection(ws)}, 5000);
  });

  call.on("end", () => {console.log("GRPC connection closed.")})

  return call;
}


function getKNNConnection(connection){
  connection.knnConn.client.getIDfromServer({},function(error,response){
    console.log(error);
    connection.knnConn.id = response["id"];
    const meta = new grpc.Metadata();
    meta.add('id',connection.knnConn.id);
    let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){
      console.log("finished points");
      console.log(error);
    });

    connection.knnConn.calls=[call]
  });

}
function sendError(ws,message){
  ws.send(JSON.stringify({type: "error", message: message}));
  }


function KNNNeighbourRequest(connection,words,k){
  console.log("Requesting " + words);
  const meta = new grpc.Metadata();
  meta.add('id',connection.knnConn.id);
  connection.knnConn.client.getKNNRequest({k:k,words: words},meta,function(err,response){
         if(response){
          console.log("got response");
          sendNeighbour(response,connection.ws);
         }
         else{
              sendError(connection.ws,"No valid request: " + words)
              return;
         }
         
         console.log(err);
       });
}

//Parse the message from browser
function parseMessage(message, connection) {
  const jsonMessage = JSON.parse(message);
  switch (jsonMessage["type"]) {
    case "requestPointStream":
      console.log("Requested stream");
      sendRandomPointStream(connection);
      break;
    case "sendDataRow":
      sendRowToServer(jsonMessage["row"],jsonMessage["lineIndex"],connection);
      break;
    case "setLineCount":
      connection.lineAmount =  parseInt(jsonMessage["amount"]);
      break;
    case "getKNNNeighbours":
      var k = parseInt(jsonMessage["amount"]);
      var words = jsonMessage["word"]

      if(!connection.knnConn){
        console.log("No KNN microservice");
        sendError(connection.ws,"No KNN microservice")
        break;
      }
      KNNNeighbourRequest(connection,words,k);
      break;
    case "getKNN":
      console.log("setup KNN");
      connection.knnConn = getGRPCClient(knnTarget,KNNPackage,"KNN");      
      getKNNConnection(connection)
      break;
    case "setProjectorAmount":
      var amount =  parseInt(jsonMessage["amount"]);
      SetupKProjectors(amount,connection)
      break; 
    default:
      console.log("Error! Unknown request:" + jsonMessage["type"]);
  }
}

function SetupKProjectors(amount,connection){
  connection.projectorConn = getGRPCClient(projectorTarget,projectorPackage,"Projector");
  var allCalls = []
  
  //Create that amount of grpc connections
  for(var i = 0; i< amount; i+=1){
    var grpcConnection = getProjectorConnection(connection.ws,connection.projectorConn.client);
    allCalls.push(grpcConnection)
  }
  connection.projectorConn.calls = allCalls;
}

function generatePoint() {
  const id = pointID++;
  const x = Math.floor(Math.random() * 100);
  const y = Math.floor(Math.random() * 100);
  return { id, x, y };
}

//Get the stream
function sendRandomPointStream(connection) {
  for(var i = 0; i < 100000; i++){
    sendDataStreamToClient(JSON.stringify(generatePoint()),connection.ws);
  }
}

//Send row to each microservice
function sendRowToServer(row,lineIndex,connection) {

  var hdvector = [];
  for (var i =1; i < row.length; i++){
    hdvector.push(parseFloat(row[i]));
  }
  var trainingSetRow = {id: row[0] ,hdvector: hdvector};

  distributeRowToProjectors(trainingSetRow,lineIndex,connection);
  sendRowToKNN(trainingSetRow,connection);
  pointID+=1;
  if(pointID%1000==0){
    console.log("Processed " + pointID + " points");
  }
}

function sendRowToKNN(trainingSetRow,connection){
  if(!connection.knnConn){
    return;
  }

  const meta = new grpc.Metadata();
  meta.add('id',connection.knnConn.id);
  connection.knnConn.calls[0].write(trainingSetRow, meta)
}

function distributeRowToProjectors(trainingSetRow,lineIndex,connection){
  var calls = connection.projectorConn.calls;
  if(!calls){
    return;
  }

  calls[0].write(trainingSetRow)
  //Linearly distribute rows to the projectors
  // for(var i =0; i<calls.length;i++){
  //   if( lineIndex < (connection.lineAmount / (calls.length-i))){
  //     calls[i].write(trainingSetRow)
  //   }
  // }
}

function sendNeighbour(response,ws){
  console.log(response);
  if(!response || response["rows"].length == 0){
    const errMSG = "The word (or one of the words) was not in the set (or hasnt been received yet)";
    console.log(errMSG);
    sendError(ws,errMSG)
    return;
  }
  for(var i = 0; i < response["rows"].length; i++){
    console.log(response["rows"][i]["id"] +  " " + response["rows"][i]["distance"]);
    ws.send(JSON.stringify({type: "neighbour", neighbour: response["rows"][i]["id"],distance: response["rows"][i]["distance"].toString()}));
  }
}

//Send client to browser
function sendDataStreamToClient(data, ws) {
  var list = data.toString().trim().split("\n");
  for (var i = 0; i < list.length; i++) {
    try {
      var response = list[i];
      ws.send(JSON.stringify({type: "point", point: response}));
    } catch (e) {
      console.log(e);
    }
  }
}