main.py 6.81 KB
Newer Older
1 2 3 4 5
from flask import Flask, jsonify, request
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from flask_cors import CORS
6
from collections import defaultdict, Counter
7
from time import time
8 9 10 11 12 13 14
import dask.dataframe as dd
import os.path
import json
from sklearn import preprocessing
from functools import partial
from itertools import groupby
from multiprocessing import Pool
15 16 17 18 19 20 21 22 23 24

app = Flask(__name__)
CORS(app)

@app.route('/', methods=['GET'])
def index():
    return "hi"

@app.route('/read-data', methods=['GET'])
def read_data():
25 26 27 28 29 30 31 32 33 34 35 36
    filename = 'processed-data.pkl'
    if (not os.path.isfile(filename)):
        print("start")
        df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't'])
        print("read file")
        df = df.loc[df['number_sta'] == 14066001]
        print("split rows")
        df = df.compute()
        df.to_pickle(filename)
        print("to_pandas")
    df = pd.read_pickle(filename)
    df.dropna(subset=['t'], inplace=True)
37
    response = {
38 39
        "index": json.dumps(df.loc[:, 'date'].values.astype(str).tolist()),
        "values": json.dumps(df.loc[:, 't'].values.astype(str).tolist())
40
    }
41
    print("response ready")
42 43 44
    response = jsonify(response)
    return response

45 46 47 48 49 50 51 52 53 54 55 56
# @app.route('/read-data', methods=['GET'])
# def read_data():
#     df = pd.read_csv("1.csv", index_col=3)
#     df.index = pd.to_datetime(df.index)
#     df.sort_index(inplace=True)
#     meantemp = df.loc[:, 7].copy()
#     response = {
#         "index": meantemp.index.values.astype(str).tolist(),
#         "values": meantemp.values.tolist()
#     }
#     response = jsonify(response)
#     return response
57 58 59 60 61

@app.route('/create-windows', methods=['POST'])
def create_windows():
    raw_data = request.json
    values = raw_data["values"]
62
    window_size = int(raw_data['parameters']["windowsize"])
63 64 65
    data = [values[i:i+window_size] for i in range(len(values) - window_size)]
    data = preprocessing.minmax_scale(data, (-1, 1), axis=1)
    response = jsonify(data.tolist())
66 67
    return response

68 69 70 71 72 73 74 75 76
def fill_table(data, hash_functions, index):
    table = defaultdict(list)
    signatures = [''.join((np.dot(data[window_index], hash_functions[index]) > 0).astype('int').astype('str')) for window_index in
                  range(data.shape[0])]
    counted_sig = enumerate(signatures)
    for i, x in counted_sig:
        table[x].append(i)
    return table

77 78 79
@app.route('/create-tables', methods=['POST'])
def create_tables():
    t0 = time()
80 81
    raw_data = request.json
    data = raw_data["windows"]
82 83 84
    window_size = int(raw_data['parameters']["windowsize"])
    hash_size = int(raw_data['parameters']["hashsize"])
    table_size = int(raw_data['parameters']["tablesize"])
85
    data = np.array(data)
86 87 88 89 90 91 92 93 94 95 96
    tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)]
    print('Init time: ' + str(time() - t0))

    try:
        pool = Pool()
        func = partial(fill_table, data, tables_hash_function)
        print('Starting pool: ' + str(time() - t0))
        tables = pool.map(func, range(table_size))
    finally:
        pool.close()
        pool.join()
97

98 99 100 101 102 103 104 105
    print('Creation time: ' + str(time() - t0))
    hash_functions = np.array(tables_hash_function).tolist()
    response = {}
    for table_index in range(table_size):
        response[table_index] = {
            "hash": hash_functions[table_index],
            "entries": tables[table_index]
        }
106 107 108 109 110 111 112 113 114 115
    response = jsonify(response)
    return response

@app.route('/query', methods=['POST'])
def query():
    raw_data = request.json
    window = raw_data["window"]
    tables = raw_data["tables"]
    neighbours = []

116 117 118
    output = {}

    for t in tables.values():
119 120
        signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str'))
        neighbours.extend(t["entries"][signature])
121 122 123 124 125 126 127 128
    neighbours_with_frequency = dict(Counter(neighbours))
    for index, frequency in neighbours_with_frequency.items():
        if not frequency in output:
            output[frequency] = []
        output[frequency].append(index)
    response = jsonify(output)
    return response

129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147
def create_valid_table(data, window_size, hash_size, correct_indices, incorrect_indices, index):
    entries = defaultdict(list)
    while True:
        hash_function = np.random.randn(window_size, hash_size)
        correct_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in
                              correct_indices]
        incorrect_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index
                                in incorrect_indices]
        if correct_signatures.count(correct_signatures[0]) == len(correct_signatures) and incorrect_signatures.count(
                correct_signatures[0]) == 0:
            break
    for window_index in range(data.shape[0]):
        signature = ''.join((np.dot(data[window_index], hash_function) > 0).astype('int').astype('str'))
        entries[signature].append(window_index)
    return {
        "hash": hash_function.tolist(),
        "entries": entries
    }

148 149 150 151
@app.route('/update', methods=['POST'])
def update():
    t0 = time()
    raw_data = request.json
152

153 154 155 156 157 158 159 160 161 162
    data = raw_data["windows"]
    data = np.array(data)
    label_data = raw_data["labelData"]
    tables = raw_data["tables"]

    window_size = int(raw_data['parameters']["windowsize"])
    hash_size = int(raw_data['parameters']["hashsize"])
    table_size = int(raw_data['parameters']["tablesize"])
    new_tables = []

163 164
    correct_indices = [int(index) for index, value in label_data.items() if value is True]
    incorrect_indices = [int(index) for index, value in label_data.items() if value is False]
165 166 167 168 169

    window = data[correct_indices[0]]

    for t in tables.values():
        valid = True
170 171
        signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str'))
        neighbours = t["entries"][signature]
172 173 174 175 176 177 178 179 180 181 182
        for index in correct_indices:
            if index not in neighbours:
                valid = False
                break
        for index in incorrect_indices:
            if index in neighbours:
                valid = False
                break
        if valid:
            new_tables.append(t)

183 184 185 186 187 188 189 190 191
    try:
        pool = Pool()
        func = partial(create_valid_table, data, window_size, hash_size, correct_indices, incorrect_indices)
        print('Starting pool: ' + str(time() - t0))
        new_tables.extend(pool.map(func, range(table_size - len(new_tables))))
    finally:
        pool.close()
        pool.join()

192 193 194 195 196 197 198 199
    print('Update time: ' + str(time() - t0))
    response = {}
    for table_index in range(len(new_tables)):
        response[table_index] = {
            "hash": new_tables[table_index]["hash"],
            "entries": new_tables[table_index]["entries"]
        }
    response = jsonify(response)
200
    return response