from flask import Flask, jsonify, request import pandas as pd import numpy as np from flask_cors import CORS from collections import defaultdict, Counter from time import time import os.path import json from sklearn import preprocessing import orjson import dask.dataframe as dd app = Flask(__name__) CORS(app) @app.route('/', methods=['GET']) def index(): return "hi" @app.route('/read-data', methods=['GET']) def read_data(): filename = 'data.pkl' if (not os.path.isfile(filename)): print("start") df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't']) print("read file") df = df.loc[df['number_sta'] == 14066001] print("split rows") df = df.compute() df.to_pickle(filename) print("to_pandas") df = pd.read_pickle(filename) df.dropna(subset=['t'], inplace=True) response = { "index": json.dumps(df.loc[:, 'date'].values.astype(str).tolist()), "values": json.dumps(df.loc[:, 't'].values.astype(str).tolist()) } print("response ready") response = jsonify(response) return response @app.route('/read-mts-data', methods=['GET']) def read_mts_data(): filename = 'data.pkl' if (not os.path.isfile(filename)): print("start") df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't', 'hu', 'td']) print("read file") df = df.loc[df['number_sta'] == 14066001] print("split rows") df = df.compute() df.to_pickle(filename) print("to_pandas") df = pd.read_pickle(filename) df.dropna(subset=['t'], inplace=True) response = [ { "index": df.loc[:, 'date'].values.astype(str).tolist(), "values": df.loc[:, 't'].values.tolist() }, { "index": df.loc[:, 'date'].values.astype(str).tolist(), "values": df.loc[:, 'hu'].values.tolist() }, { "index": df.loc[:, 'date'].values.astype(str).tolist(), "values": df.loc[:, 'td'].values.tolist() } ] print("response ready") response = orjson.dumps(response) return response @app.route('/create-windows', methods=['POST']) def create_windows(): t0 = time() if (not os.path.isfile('processed-data.npy')): filename = 'data.pkl' df = pd.read_pickle(filename) values = df.loc[:, 't'].values.astype(str).tolist() print("Data read: " + str(time()-t0)) raw_data = request.json window_size = int(raw_data['parameters']["windowsize"]) print("Processing: " + str(time()-t0)) data = [values[i:i+window_size] for i in range(len(values) - window_size)] data = preprocessing.minmax_scale(data, (-1, 1), axis=1) print("Preprocessed: " + str(time()-t0)) np.save('processed-data', data) print("Sending response: " + str(time()-t0)) return '1' @app.route('/create-mts-windows', methods=['POST']) def create_mts_windows(): t0 = time() if (not os.path.isfile('processed-data.npy')): filename = 'data.pkl' df = pd.read_pickle(filename) channels = list() channels.append(df.loc[:, 't'].values.tolist()) channels.append(df.loc[:, 'hu'].values.tolist()) channels.append(df.loc[:, 'td'].values.tolist()) print("Data read: " + str(time()-t0)) raw_data = request.json window_size = int(raw_data['parameters']["windowsize"]) print("Processing: " + str(time()-t0)) data = [([values[i:i+window_size] for values in channels]) for i in range(len(channels[0]) - window_size)] print("Raw windows: " + str(time()-t0)) windows = [] for i in range(len(data)): if i % 5000 == 0: print(i) windows.append(preprocessing.minmax_scale(data[i], (-1, 1), axis=1)) print("Preprocessed: " + str(time()-t0)) np.save('processed-data', windows) print("Sending response: " + str(time()-t0)) return '1' @app.route('/create-tables', methods=['POST']) def create_tables(): t0 = time() print("loading") data = np.load('processed-data.npy') print(time()-t0) raw_data = orjson.loads(request.data) print(time()-t0) window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) data = np.array(data) print('Starting: ' + str(time()-t0)) tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)] print('Init time: ' + str(time() - t0)) tables = [] for index in range(table_size): t1 = time() table = defaultdict(list) signatures_bool = np.dot(data, tables_hash_function[index]) > 0 signatures = [''.join(['1' if x else '0' for x in lst]) for lst in signatures_bool] for i in range(len(signatures)): table[signatures[i]].append(i) print(time()-t1) tables.append(table) print('Creation time: ' + str(time() - t0)) hash_functions = np.array(tables_hash_function).tolist() response = {} for table_index in range(table_size): response[str(table_index)] = { "hash": hash_functions[table_index], "entries": tables[table_index] } response = orjson.dumps(response) return response @app.route('/create-mts-tables', methods=['POST']) def create_mts_tables(): t0 = time() print("loading") data = np.load('processed-data.npy') print(time()-t0) raw_data = orjson.loads(request.data) print(time()-t0) window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) data = np.array(data) print(data.shape) print('Starting: ' + str(time()-t0)) tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)] print('Init time: ' + str(time() - t0)) tables = [] for index in range(table_size): t1 = time() table = defaultdict(list) # signatures_bool = [] # for window in data: # signatures_bool.append(np.dot([1, 1, 1], np.dot(window, tables_hash_function[index])) > 0) signatures_bool = np.dot([1, 1, 1], np.dot(data, tables_hash_function[index])) > 0 signatures = [''.join(['1' if x else '0' for x in lst]) for lst in signatures_bool] for i in range(len(signatures)): table[signatures[i]].append(i) print(time()-t1) tables.append(table) print('Creation time: ' + str(time() - t0)) hash_functions = np.array(tables_hash_function).tolist() response = {} for table_index in range(table_size): response[str(table_index)] = { "hash": hash_functions[table_index], "entries": tables[table_index] } response = orjson.dumps(response) return response @app.route('/query', methods=['POST']) def query(): t0 = time() raw_data = orjson.loads(request.data) window = raw_data['window'] output = preprocessing.minmax_scale(window, (-1, 1), axis=1) response = orjson.dumps(output.tolist()) print("Query done: " + str(time()-t0)) return response @app.route('/similarity', methods=['POST']) def similarity(): t0 = time() raw_data = orjson.loads(request.data) window = raw_data['query'] tables = raw_data["tables"] neighbours = [] output = defaultdict(list) for t in tables.values(): signature_bool = np.dot([1, 1, 1], np.dot(window, t["hash"])) > 0 signature = ''.join(['1' if x else '0' for x in signature_bool]) neighbours.extend(t["entries"][signature]) neighbours_with_frequency = dict(Counter(neighbours)) for index, frequency in neighbours_with_frequency.items(): output[str(frequency)].append(index) response = orjson.dumps(output) print("Similarity done: " + str(time()-t0)) return response @app.route('/average-progress', methods=['POST']) def average_progress(): t0 = time() raw_data = orjson.loads(request.data) all_windows = raw_data['windows'] data = np.load('processed-data.npy') output = [] actual_windows = [] print("Initialized: " + str(time() - t0)) for windows in all_windows: t1 = time() actual_windows.extend(data[windows]) if len(actual_windows) == 0: output.append([]) continue max_values = np.maximum.reduce(actual_windows).tolist() min_values = np.minimum.reduce(actual_windows).tolist() average_values = (np.sum(actual_windows, 0)/len(actual_windows)).tolist() output.append({ 'average': average_values, 'max': max_values, 'min': min_values }) print("Average calculated: " + str(time() - t1)) response = orjson.dumps(output) print("Averages calculated: " + str(time() - t0)) return response @app.route('/average-table', methods=['POST']) def average_table(): t0 = time() raw_data = orjson.loads(request.data) all_windows = raw_data['windows'] data = np.load('processed-data.npy') output = [] print("Initialized: " + str(time() - t0)) for windows in all_windows: t1 = time() actual_windows = data[windows] print(len(actual_windows)) average_values = np.average(actual_windows, 0) # average_values = (np.sum(actual_windows, 0) / len(actual_windows)) std_values = np.std(actual_windows, 0) max_values = average_values + std_values min_values = average_values - std_values # max_values = np.maximum.reduce(actual_windows).tolist() # min_values = np.minimum.reduce(actual_windows).tolist() output.append({ 'average': average_values.tolist(), 'max': max_values.tolist(), 'min': min_values.tolist() }) print("Average calculated: " + str(time() - t1)) response = orjson.dumps(output) print("Averages calculated: " + str(time() - t0)) return response @app.route('/update', methods=['POST']) def update(): t0 = time() print("Start") raw_data = orjson.loads(request.data) print("Data loaded: " + str(time() - t0)) data = np.load('processed-data.npy') label_data = raw_data["labelData"] tables = raw_data["tables"] window = raw_data["query"] window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) new_tables = [] correct_indices = [int(index) for index, value in label_data.items() if value is True] incorrect_indices = [int(index) for index, value in label_data.items() if value is False] print("Initialized: " + str(time() - t0)) for t in tables.values(): valid = True signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str')) neighbours = t["entries"][signature] for index in correct_indices: if index not in neighbours: valid = False break for index in incorrect_indices: if index in neighbours: valid = False break if valid: new_tables.append(t) print("Filtered good tables: " + str(time() - t0)) for index in range(table_size - len(new_tables)): entries = defaultdict(list) t1 = time() while True: hash_function = np.random.randn(window_size, hash_size) correct_signatures = [''.join((np.dot(data[i], hash_function) > 0).astype('int').astype('str')) for i in correct_indices] incorrect_signatures = [''.join((np.dot(data[i], hash_function) > 0).astype('int').astype('str')) for i in incorrect_indices] if correct_signatures.count(correct_signatures[0]) == len( correct_signatures) and incorrect_signatures.count( correct_signatures[0]) == 0: break print("first: " + str(time() - t1)) t2 = time() signatures_bool = np.dot(data, hash_function) > 0 signatures = [''.join(['1' if x else '0' for x in lst]) for lst in signatures_bool] for i in range(len(signatures)): entries[signatures[i]].append(i) print("second: " + str(time() - t2)) new_tables.append({ "hash": hash_function.tolist(), "entries": entries }) print('Update time: ' + str(time() - t0)) response = {} for table_index in range(len(new_tables)): response[table_index] = { "hash": new_tables[table_index]["hash"], "entries": new_tables[table_index]["entries"] } response = jsonify(response) return response