from flask import Flask, jsonify, request import matplotlib.pyplot as plt import pandas as pd import numpy as np from flask_cors import CORS from collections import defaultdict, Counter from time import time import dask.dataframe as dd import os.path import json from sklearn import preprocessing from functools import partial from itertools import groupby from multiprocessing import Pool app = Flask(__name__) CORS(app) @app.route('/', methods=['GET']) def index(): return "hi" @app.route('/read-data', methods=['GET']) def read_data(): filename = 'processed-data.pkl' if (not os.path.isfile(filename)): print("start") df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't']) print("read file") df = df.loc[df['number_sta'] == 14066001] print("split rows") df = df.compute() df.to_pickle(filename) print("to_pandas") df = pd.read_pickle(filename) df.dropna(subset=['t'], inplace=True) response = { "index": json.dumps(df.loc[:, 'date'].values.astype(str).tolist()), "values": json.dumps(df.loc[:, 't'].values.astype(str).tolist()) } print("response ready") response = jsonify(response) return response # @app.route('/read-data', methods=['GET']) # def read_data(): # df = pd.read_csv("1.csv", index_col=3) # df.index = pd.to_datetime(df.index) # df.sort_index(inplace=True) # meantemp = df.loc[:, 7].copy() # response = { # "index": meantemp.index.values.astype(str).tolist(), # "values": meantemp.values.tolist() # } # response = jsonify(response) # return response @app.route('/create-windows', methods=['POST']) def create_windows(): raw_data = request.json values = raw_data["values"] window_size = int(raw_data['parameters']["windowsize"]) data = [values[i:i+window_size] for i in range(len(values) - window_size)] data = preprocessing.minmax_scale(data, (-1, 1), axis=1) response = jsonify(data.tolist()) return response def fill_table(data, hash_functions, index): table = defaultdict(list) signatures = [''.join((np.dot(data[window_index], hash_functions[index]) > 0).astype('int').astype('str')) for window_index in range(data.shape[0])] counted_sig = enumerate(signatures) for i, x in counted_sig: table[x].append(i) return table @app.route('/create-tables', methods=['POST']) def create_tables(): t0 = time() raw_data = request.json data = raw_data["windows"] window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) data = np.array(data) tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)] print('Init time: ' + str(time() - t0)) try: pool = Pool() func = partial(fill_table, data, tables_hash_function) print('Starting pool: ' + str(time() - t0)) tables = pool.map(func, range(table_size)) finally: pool.close() pool.join() print('Creation time: ' + str(time() - t0)) hash_functions = np.array(tables_hash_function).tolist() response = {} for table_index in range(table_size): response[table_index] = { "hash": hash_functions[table_index], "entries": tables[table_index] } response = jsonify(response) return response @app.route('/query', methods=['POST']) def query(): raw_data = request.json window = raw_data["window"] tables = raw_data["tables"] neighbours = [] output = {} for t in tables.values(): signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str')) neighbours.extend(t["entries"][signature]) neighbours_with_frequency = dict(Counter(neighbours)) for index, frequency in neighbours_with_frequency.items(): if not frequency in output: output[frequency] = [] output[frequency].append(index) response = jsonify(output) return response def create_valid_table(data, window_size, hash_size, correct_indices, incorrect_indices, index): entries = defaultdict(list) while True: hash_function = np.random.randn(window_size, hash_size) correct_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in correct_indices] incorrect_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in incorrect_indices] if correct_signatures.count(correct_signatures[0]) == len(correct_signatures) and incorrect_signatures.count( correct_signatures[0]) == 0: break for window_index in range(data.shape[0]): signature = ''.join((np.dot(data[window_index], hash_function) > 0).astype('int').astype('str')) entries[signature].append(window_index) return { "hash": hash_function.tolist(), "entries": entries } @app.route('/update', methods=['POST']) def update(): t0 = time() raw_data = request.json data = raw_data["windows"] data = np.array(data) label_data = raw_data["labelData"] tables = raw_data["tables"] window_size = int(raw_data['parameters']["windowsize"]) hash_size = int(raw_data['parameters']["hashsize"]) table_size = int(raw_data['parameters']["tablesize"]) new_tables = [] correct_indices = [int(index) for index, value in label_data.items() if value is True] incorrect_indices = [int(index) for index, value in label_data.items() if value is False] window = data[correct_indices[0]] for t in tables.values(): valid = True signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str')) neighbours = t["entries"][signature] for index in correct_indices: if index not in neighbours: valid = False break for index in incorrect_indices: if index in neighbours: valid = False break if valid: new_tables.append(t) try: pool = Pool() func = partial(create_valid_table, data, window_size, hash_size, correct_indices, incorrect_indices) print('Starting pool: ' + str(time() - t0)) new_tables.extend(pool.map(func, range(table_size - len(new_tables)))) finally: pool.close() pool.join() print('Update time: ' + str(time() - t0)) response = {} for table_index in range(len(new_tables)): response[table_index] = { "hash": new_tables[table_index]["hash"], "entries": new_tables[table_index]["entries"] } response = jsonify(response) return response