main.py 7.87 KB
Newer Older
1 2 3 4 5
from flask import Flask, jsonify, request
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
from flask_cors import CORS
6
from collections import defaultdict, Counter
7
from time import time
8 9 10 11 12 13 14
import dask.dataframe as dd
import os.path
import json
from sklearn import preprocessing
from functools import partial
from itertools import groupby
from multiprocessing import Pool
15 16
import rapidjson
import orjson
17 18 19 20 21 22 23 24 25 26

app = Flask(__name__)
CORS(app)

@app.route('/', methods=['GET'])
def index():
    return "hi"

@app.route('/read-data', methods=['GET'])
def read_data():
27 28 29 30 31 32 33 34 35 36 37 38
    filename = 'processed-data.pkl'
    if (not os.path.isfile(filename)):
        print("start")
        df = dd.read_csv("NW_Ground_Stations_2016.csv", usecols=['number_sta', 'date', 't'])
        print("read file")
        df = df.loc[df['number_sta'] == 14066001]
        print("split rows")
        df = df.compute()
        df.to_pickle(filename)
        print("to_pandas")
    df = pd.read_pickle(filename)
    df.dropna(subset=['t'], inplace=True)
39
    response = {
40 41
        "index": json.dumps(df.loc[:, 'date'].values.astype(str).tolist()),
        "values": json.dumps(df.loc[:, 't'].values.astype(str).tolist())
42
    }
43
    print("response ready")
44 45 46
    response = jsonify(response)
    return response

47 48 49 50 51 52 53 54 55 56 57 58
# @app.route('/read-data', methods=['GET'])
# def read_data():
#     df = pd.read_csv("1.csv", index_col=3)
#     df.index = pd.to_datetime(df.index)
#     df.sort_index(inplace=True)
#     meantemp = df.loc[:, 7].copy()
#     response = {
#         "index": meantemp.index.values.astype(str).tolist(),
#         "values": meantemp.values.tolist()
#     }
#     response = jsonify(response)
#     return response
59 60 61

@app.route('/create-windows', methods=['POST'])
def create_windows():
62
    t0 = time()
63 64
    raw_data = request.json
    values = raw_data["values"]
65
    window_size = int(raw_data['parameters']["windowsize"])
66 67
    data = [values[i:i+window_size] for i in range(len(values) - window_size)]
    data = preprocessing.minmax_scale(data, (-1, 1), axis=1)
68 69 70 71 72 73 74
    print("Created windows: " + str(time()-t0))
    data = data.tolist()
    print("data converted: " + str(time()-t0))
    # response = {'data': data}
    print("Sending response: " + str(time()-t0))
    response = orjson.dumps(data)
    print("Sending response: " + str(time()-t0))
75 76
    return response

77 78
def fill_table(data, tables_hash_function, index):
    print(index)
79
    table = defaultdict(list)
80
    signatures = [''.join(list(map(lambda x: '1' if x > 0 else '0', np.dot(data[window_index], tables_hash_function[index])))) for window_index in
81
                  range(data.shape[0])]
82 83
    for i in range(len(signatures)):
        table[signatures[i]].append(i)
84 85
    return table

86 87 88
@app.route('/create-tables', methods=['POST'])
def create_tables():
    t0 = time()
89 90 91
    raw_data = orjson.loads(request.data)
    print(time()-t0)
    global data
92
    data = raw_data["windows"]
93 94 95
    window_size = int(raw_data['parameters']["windowsize"])
    hash_size = int(raw_data['parameters']["hashsize"])
    table_size = int(raw_data['parameters']["tablesize"])
96
    data = np.array(data)
97 98
    print('Starting: ' + str(time()-t0))
    global tables_hash_function
99 100
    tables_hash_function = [np.random.uniform(-1, 1, size=(window_size, hash_size)) for _ in range(table_size)]
    print('Init time: ' + str(time() - t0))
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
    tables = []
    for index in range(table_size):
        t1 = time()
        print('------------')
        print(index)
        table = defaultdict(list)
        print(time()-t1)
        signatures1 = [
            np.dot(data[window_index], tables_hash_function[index]) > 0
            for window_index in
            range(data.shape[0])]
        print(time() - t1)
        signatures = [''.join(['1' if x else '0' for x in lst]) for lst in signatures1]
        print(time()-t1)
        for i in range(len(signatures)):
            table[signatures[i]].append(i)
        print(time()-t1)
        tables.append(table)
    # try:
    #     pool = Pool()
    #     func = partial(fill_table, data, tables_hash_function)
    #     print('Starting pool: ' + str(time() - t0))
    #     tables = pool.map(func, range(table_size))
    # finally:
    #     pool.close()
    #     pool.join()
127

128 129 130 131
    print('Creation time: ' + str(time() - t0))
    hash_functions = np.array(tables_hash_function).tolist()
    response = {}
    for table_index in range(table_size):
132
        response[str(table_index)] = {
133 134 135
            "hash": hash_functions[table_index],
            "entries": tables[table_index]
        }
136
    response = orjson.dumps(response)
137 138 139 140 141 142 143 144 145
    return response

@app.route('/query', methods=['POST'])
def query():
    raw_data = request.json
    window = raw_data["window"]
    tables = raw_data["tables"]
    neighbours = []

146 147 148
    output = {}

    for t in tables.values():
149 150
        signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str'))
        neighbours.extend(t["entries"][signature])
151 152 153 154 155
    neighbours_with_frequency = dict(Counter(neighbours))
    for index, frequency in neighbours_with_frequency.items():
        if not frequency in output:
            output[frequency] = []
        output[frequency].append(index)
156
    response = orjson.dumps(output)
157 158
    return response

159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177
def create_valid_table(data, window_size, hash_size, correct_indices, incorrect_indices, index):
    entries = defaultdict(list)
    while True:
        hash_function = np.random.randn(window_size, hash_size)
        correct_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index in
                              correct_indices]
        incorrect_signatures = [''.join((np.dot(data[index], hash_function) > 0).astype('int').astype('str')) for index
                                in incorrect_indices]
        if correct_signatures.count(correct_signatures[0]) == len(correct_signatures) and incorrect_signatures.count(
                correct_signatures[0]) == 0:
            break
    for window_index in range(data.shape[0]):
        signature = ''.join((np.dot(data[window_index], hash_function) > 0).astype('int').astype('str'))
        entries[signature].append(window_index)
    return {
        "hash": hash_function.tolist(),
        "entries": entries
    }

178 179 180 181
@app.route('/update', methods=['POST'])
def update():
    t0 = time()
    raw_data = request.json
182

183 184 185 186 187 188 189 190 191 192
    data = raw_data["windows"]
    data = np.array(data)
    label_data = raw_data["labelData"]
    tables = raw_data["tables"]

    window_size = int(raw_data['parameters']["windowsize"])
    hash_size = int(raw_data['parameters']["hashsize"])
    table_size = int(raw_data['parameters']["tablesize"])
    new_tables = []

193 194
    correct_indices = [int(index) for index, value in label_data.items() if value is True]
    incorrect_indices = [int(index) for index, value in label_data.items() if value is False]
195 196 197 198 199

    window = data[correct_indices[0]]

    for t in tables.values():
        valid = True
200 201
        signature = ''.join((np.dot(window, t["hash"]) > 0).astype('int').astype('str'))
        neighbours = t["entries"][signature]
202 203 204 205 206 207 208 209 210 211 212
        for index in correct_indices:
            if index not in neighbours:
                valid = False
                break
        for index in incorrect_indices:
            if index in neighbours:
                valid = False
                break
        if valid:
            new_tables.append(t)

213 214 215 216 217 218 219 220 221
    try:
        pool = Pool()
        func = partial(create_valid_table, data, window_size, hash_size, correct_indices, incorrect_indices)
        print('Starting pool: ' + str(time() - t0))
        new_tables.extend(pool.map(func, range(table_size - len(new_tables))))
    finally:
        pool.close()
        pool.join()

222 223 224 225 226 227 228 229
    print('Update time: ' + str(time() - t0))
    response = {}
    for table_index in range(len(new_tables)):
        response[table_index] = {
            "hash": new_tables[table_index]["hash"],
            "entries": new_tables[table_index]["entries"]
        }
    response = jsonify(response)
230
    return response