from flask import Flask, jsonify, request import matplotlib.pyplot as plt import pandas as pd import numpy as np from flask_cors import CORS from collections import defaultdict, Counter from time import time import dask.dataframe as dd import os.path import json from sklearn import preprocessing from functools import partial from itertools import groupby from multiprocessing import Pool import rapidjson import orjson app = Flask(__name__) CORS(app) @app.route('/', methods=['GET']) def index(): return "hi" @app.route('/read-data', methods=['GET']) def read_data(): filename = 'processed-data.pkl' if (not os.path.isfile(filename)): print("start") df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't']) print("read file") df = df.loc[df['number_sta'] == 14066001] print("split rows") df = df.compute() df.to_pickle(filename) print("to_pandas") df = pd.read_pickle(filename) df.dropna(subset=['t'], inplace=True) response = { "index": json.dumps(df.loc[:, 'date'].values.astype(str).tolist()), "values": json.dumps(df.loc[:, 't'].values.astype(str).tolist()) } print("response ready") response = jsonify(response) return response # @app.route('/read-data', methods=['GET']) # def read_data(): # df = pd.read_csv("1.csv", index_col=3) # df.index = pd.to_datetime(df.index) # df.sort_index(inplace=True) # meantemp = df.loc[:, 7].copy() # response = { # "index": meantemp.index.values.astype(str).tolist(), # "values": meantemp.values.tolist() # } # response = jsonify(response) # return response @app.route('/create-windows', methods=['POST']) def create_windows(): t0 = time() raw_data = request.json values = raw_data["values"] window_size = int(raw_data['parameters']["windowsize"]) data = [values[i:i+window_size] for i in range(len(values) - window_size)] data = preprocessing.minmax_scale(data, (-1, 1), axis=1) print("Created windows: " + str(time()-t0)) data = data.tolist() print("data converted: " + str(time()-t0)) # response = {'data': data} print("Sending response: " + str(time()-t0)) response = orjson.dumps(data) print("Sending response: " + str(time()-t0)) return response def fill_table(data, tables_hash_function, index): print(index) table = defaultdict(list) signatures = [''.join(list(map(lambda x: '1' if x > 0 else '0', np.dot(data[window_index], tables_hash_function[index])))) for window_index in range(data.shape[0])] for i in range(len(signatures)): table[signatures[i]].append(i) return table @app.route('/create-tables', methods=['POST']) def create_tables(): t0 = time() raw_data = orjson.loads(request.data) print(time()-t0) global data data = raw_data["windows"] window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) data = np.array(data) print('Starting: ' + str(time()-t0)) global tables_hash_function tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)] print('Init time: ' + str(time() - t0)) tables = [] for index in range(table_size): t1 = time() print('------------') print(index) table = defaultdict(list) print(time()-t1) signatures1 = [ np.dot(data[window_index], tables_hash_function[index]) > 0 for window_index in range(data.shape[0])] print(time() - t1) signatures = [''.join(['1' if x else '0' for x in lst]) for lst in signatures1] print(time()-t1) for i in range(len(signatures)): table[signatures[i]].append(i) print(time()-t1) tables.append(table) # try: # pool = Pool() # func = partial(fill_table, data, tables_hash_function) # print('Starting pool: ' + str(time() - t0)) # tables = pool.map(func, range(table_size)) # finally: # pool.close() # pool.join() print('Creation time: ' + str(time() - t0)) hash_functions = np.array(tables_hash_function).tolist() response = {} for table_index in range(table_size): response[str(table_index)] = { "hash": hash_functions[table_index], "entries": tables[table_index] } response = orjson.dumps(response) return response @app.route('/query', methods=['POST']) def query(): raw_data = request.json window = raw_data["window"] tables = raw_data["tables"] neighbours = [] output = {} for t in tables.values(): signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str')) neighbours.extend(t["entries"][signature]) neighbours_with_frequency = dict(Counter(neighbours)) for index, frequency in neighbours_with_frequency.items(): if not frequency in output: output[frequency] = [] output[frequency].append(index) response = orjson.dumps(output) return response def create_valid_table(data, window_size, hash_size, correct_indices, incorrect_indices, index): entries = defaultdict(list) while True: hash_function = np.random.randn(window_size, hash_size) correct_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in correct_indices] incorrect_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in incorrect_indices] if correct_signatures.count(correct_signatures[0]) == len(correct_signatures) and incorrect_signatures.count( correct_signatures[0]) == 0: break for window_index in range(data.shape[0]): signature = ''.join((np.dot(data[window_index], hash_function) > 0).astype('int').astype('str')) entries[signature].append(window_index) return { "hash": hash_function.tolist(), "entries": entries } @app.route('/update', methods=['POST']) def update(): t0 = time() raw_data = request.json data = raw_data["windows"] data = np.array(data) label_data = raw_data["labelData"] tables = raw_data["tables"] window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) new_tables = [] correct_indices = [int(index) for index, value in label_data.items() if value is True] incorrect_indices = [int(index) for index, value in label_data.items() if value is False] window = data[correct_indices[0]] for t in tables.values(): valid = True signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str')) neighbours = t["entries"][signature] for index in correct_indices: if index not in neighbours: valid = False break for index in incorrect_indices: if index in neighbours: valid = False break if valid: new_tables.append(t) try: pool = Pool() func = partial(create_valid_table, data, window_size, hash_size, correct_indices, incorrect_indices) print('Starting pool: ' + str(time() - t0)) new_tables.extend(pool.map(func, range(table_size - len(new_tables)))) finally: pool.close() pool.join() print('Update time: ' + str(time() - t0)) response = {} for table_index in range(len(new_tables)): response[table_index] = { "hash": new_tables[table_index]["hash"], "entries": new_tables[table_index]["entries"] } response = jsonify(response) return response