-
IsolatedSushi authoredIsolatedSushi authored
webSocketGateway.js 7.57 KiB
const WebSocket = require('ws');
var grpc = require('@grpc/grpc-js');
var protoLoader = require('@grpc/proto-loader');
const { callErrorFromStatus } = require('@grpc/grpc-js/build/src/call');
//Setup webserver
const webSocketHost = "0.0.0.0"
const webSocketPort = 9898
var projectorTarget = process.env.PROJECTOR_TARGET ||'127.0.0.1:50051'; //var target = "proveeprojectorservice:50051"
var knnTarget = process.env.KNN_TARGET || '127.0.0.1:50052'; //"proveeknnservice:50052"
const wsServer = new WebSocket.Server({ host: webSocketHost, port: webSocketPort });
console.log("Projector target " +projectorTarget);
console.log("KNN target: " + knnTarget)
var projectorPackage = getGRPCPackage( __dirname + '/../protos/v3/projector.proto');
var KNNPackage = getGRPCPackage(__dirname + '/../protos/v3/knn.proto');
var count = 0;
var pointID = 0;
class ProjectionRequest{
constructor(ws){
this.index = 0;
count += 1;
this.ws = ws;
this.projectorConn = null;
this.knnConn = null;
this.lineAmount = 0;
}
}
class GrpcConnection{
constructor(client){
this.client = client
this.calls = null
this.id = null
}
}
wsServer.on('connection', function connection(ws) {
console.log(`Client connected with websocket`);
var currConnection = new ProjectionRequest(ws);
ws.on('message', function incoming(message) {
parseMessage(message, currConnection);
});
ws.on('close', (code,reason) =>{
console.log("Closed");
if (currConnection.client){
grpc.closeClient(client);
}
})
});
function getGRPCPackage(PROTO_PATH){
var packageDefinition = protoLoader.loadSync(
PROTO_PATH,
{
keepCase: true,
longs: String,
enums: String,
defaults: true,
oneofs: true
});
return packageDefinition;
}
function getGRPCClient(_target,package,name){
let pkg = grpc.loadPackageDefinition(package);
let clientClass = pkg.provee[name];
client = new clientClass(_target, grpc.credentials.createInsecure());
let connection = new GrpcConnection(client);
return connection;
}
//Connects to grpc after first message
function getProjectorConnection(ws,client){
call = client.getProjectionPoints();
console.log(`Connecting to grpc Server`)
call.on("data", (response) => {
sendDataStreamToClient(JSON.stringify(response),ws)
});
call.on("error", (err) => {
console.log(
`Unexpected stream error: code = ${err.code}` +
`, message = "${err.message}"`
);
console.log("Retrying in 5 seconds");
setTimeout(function(){getConnection(ws)}, 5000);
});
call.on("end", () => {console.log("GRPC connection closed.")})
return call;
}
function getKNNConnection(connection){
connection.knnConn.client.getIDfromServer({},function(error,response){
console.log(error);
connection.knnConn.id = response["id"];
const meta = new grpc.Metadata();
meta.add('id',connection.knnConn.id);
let call = connection.knnConn.client.sendProjectionPoints(meta,function(error,response){
console.log("finished points");
console.log(error);
});
connection.knnConn.calls=[call]
});
}
function sendError(ws,message){
ws.send(JSON.stringify({type: "error", message: message}));
}
function KNNNeighbourRequest(connection,words,k){
console.log("Requesting " + words);
const meta = new grpc.Metadata();
meta.add('id',connection.knnConn.id);
connection.knnConn.client.getKNNRequest({k:k,words: words},meta,function(err,response){
if(response){
console.log("got response");
sendNeighbour(response,connection.ws);
}
else{
sendError(connection.ws,"No valid request: " + words)
return;
}
console.log(err);
});
}
//Parse the message from browser
function parseMessage(message, connection) {
const jsonMessage = JSON.parse(message);
switch (jsonMessage["type"]) {
case "requestPointStream":
console.log("Requested stream");
sendRandomPointStream(connection);
break;
case "sendDataRow":
sendRowToServer(jsonMessage["row"],jsonMessage["lineIndex"],connection);
break;
case "setLineCount":
connection.lineAmount = parseInt(jsonMessage["amount"]);
break;
case "getKNNNeighbours":
var k = parseInt(jsonMessage["amount"]);
var words = jsonMessage["word"]
if(!connection.knnConn){
console.log("No KNN microservice");
sendError(connection.ws,"No KNN microservice")
break;
}
KNNNeighbourRequest(connection,words,k);
break;
case "getKNN":
console.log("setup KNN");
connection.knnConn = getGRPCClient(knnTarget,KNNPackage,"KNN");
getKNNConnection(connection)
break;
case "setProjectorAmount":
var amount = parseInt(jsonMessage["amount"]);
SetupKProjectors(amount,connection)
break;
default:
console.log("Error! Unknown request:" + jsonMessage["type"]);
}
}
function SetupKProjectors(amount,connection){
connection.projectorConn = getGRPCClient(projectorTarget,projectorPackage,"Projector");
var allCalls = []
//Create that amount of grpc connections
for(var i = 0; i< amount; i+=1){
var grpcConnection = getProjectorConnection(connection.ws,connection.projectorConn.client);
allCalls.push(grpcConnection)
}
connection.projectorConn.calls = allCalls;
}
function generatePoint() {
const id = pointID++;
const x = Math.floor(Math.random() * 100);
const y = Math.floor(Math.random() * 100);
return { id, x, y };
}
//Get the stream
function sendRandomPointStream(connection) {
for(var i = 0; i < 100000; i++){
sendDataStreamToClient(JSON.stringify(generatePoint()),connection.ws);
}
}
//Send row to each microservice
function sendRowToServer(row,lineIndex,connection) {
var hdvector = [];
for (var i =1; i < row.length; i++){
hdvector.push(parseFloat(row[i]));
}
var trainingSetRow = {id: row[0] ,hdvector: hdvector};
distributeRowToProjectors(trainingSetRow,lineIndex,connection);
sendRowToKNN(trainingSetRow,connection);
pointID+=1;
if(pointID%1000==0){
console.log("Processed " + pointID + " points");
}
}
function sendRowToKNN(trainingSetRow,connection){
if(!connection.knnConn){
return;
}
const meta = new grpc.Metadata();
meta.add('id',connection.knnConn.id);
connection.knnConn.calls[0].write(trainingSetRow, meta)
}
function distributeRowToProjectors(trainingSetRow,lineIndex,connection){
var calls = connection.projectorConn.calls;
if(!calls){
return;
}
calls[0].write(trainingSetRow)
//Linearly distribute rows to the projectors
// for(var i =0; i<calls.length;i++){
// if( lineIndex < (connection.lineAmount / (calls.length-i))){
// calls[i].write(trainingSetRow)
// }
// }
}
function sendNeighbour(response,ws){
console.log(response);
if(!response || response["rows"].length == 0){
const errMSG = "The word (or one of the words) was not in the set (or hasnt been received yet)";
console.log(errMSG);
sendError(ws,errMSG)
return;
}
for(var i = 0; i < response["rows"].length; i++){
console.log(response["rows"][i]["id"] + " " + response["rows"][i]["distance"]);
ws.send(JSON.stringify({type: "neighbour", neighbour: response["rows"][i]["id"],distance: response["rows"][i]["distance"].toString()}));
}
}
//Send client to browser
function sendDataStreamToClient(data, ws) {
var list = data.toString().trim().split("\n");
for (var i = 0; i < list.length; i++) {
try {
var response = list[i];
ws.send(JSON.stringify({type: "point", point: response}));
} catch (e) {
console.log(e);
}
}
}