Commit 778969bb authored by Yuncong Yu's avatar Yuncong Yu
Browse files

Complete docstring; minor refactoring.

parent d56df385
import logging
import math
import os.path
import random
from collections import defaultdict
from time import time
from pathlib import Path
from time import time, perf_counter
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union
import _lsh
import _ucrdtw
......@@ -12,47 +14,74 @@ from tslearn.metrics import dtw
from libs.DBA_multivariate import performDBA
def get_lsh_parameters(data, window_size):
"""
data: 3d array [m][t][d]
def get_lsh_parameters(data: np.ndarray, window_size: int) -> Tuple[float, float, float]:
"""Get optimal LSH parameters
If the optimal LSH parameters are already calculated, load it from the cache, otherwise calculate them and save them as cache.
Parameters
----------
data : np.ndarray
Preprocessed data, 3d array [m][t][d].
window_size : int
Window length
Returns
-------
Tuple[float, float, float]
Optimal LSH parameters r, a and sd.
"""
if (not os.path.isfile('cache/parameters_' + str(window_size) + '.npy')):
parameters = preprocess(data)
np.save('cache/parameters_' + str(window_size), [float(parameters[0]), float(parameters[1]), float(parameters[2])])
return np.load('cache/parameters_' + str(window_size) + '.npy').tolist()
path_parameters_npy = Path(f"cache/parameters_{window_size}.npy")
if not path_parameters_npy.is_file():
parameters = _calculate_lsh_parameters(data)
np.save(str(path_parameters_npy), list(map(float, parameters)))
return np.load(str(path_parameters_npy)).tolist() # r, a, sd
def lsh(data, query, parameters=None, weights=None):
def lsh(
data: np.ndarray, query: np.ndarray, parameters: Optional[Sequence[float]] = None, weights: Optional[Sequence[float]] = None
) -> Dict[str, Any]:
"""
Run locality-sensitive hashing.
Parameters
----------
data : np.ndarray
Windowized data with the shape 3d array [m][t][d].
query : np.ndarray
Query with the shape 2d array [t][d]
Preprocessed data with the shape 3d array [m][t][d].
query : Optional[np.ndarray]
Query with the shape 2d array [t][d].
parameters : Optional[Sequence[float]]
Optimal LSH parameters r, a and sd.
weights : Optional[Sequence[float]]
Channel weights, 1d array [d].
Returns
-------
Dict[str, Any]
response.
"""
if parameters is None:
parameters = preprocess(data)
parameters = _calculate_lsh_parameters(data)
r = parameters[0]
a = parameters[1]
sd = parameters[2]
data = np.array(data, dtype='float32')
query = np.array(query, dtype='float32')
data = np.array(data, dtype="float32")
query = np.array(query, dtype="float32")
if weights is None:
candidates, distances, hf = _lsh.lsh(data, query, r, a, sd, 1)
else:
candidates, distances, hf = _lsh.lsh(data, query, r, a, sd, 1, weights)
dict = defaultdict(int)
dict_ = defaultdict(int)
for l in range(len(candidates)):
for k in range(len(candidates[0])):
for i in range(len(candidates[0][0])):
dict[candidates[l][k][i]] += distances[l][k][i]
sorted_dict = {k: v for k, v in sorted(dict.items(), key=lambda item: item[1])}
dict_[candidates[l][k][i]] += distances[l][k][i]
sorted_dict = {k: v for k, v in sorted(dict_.items(), key=lambda item: item[1])}
average_candidates = np.array(list(sorted_dict.keys())).tolist()
average_distances = np.array(list(sorted_dict.values())).tolist()
......@@ -63,14 +92,14 @@ def lsh(data, query, parameters=None, weights=None):
for k in range(len(candidates[0])):
samples_set.update(candidates[l][k][0:5])
samples_set.update(random.sample(candidates[l][k], 5))
dict = defaultdict(list)
dict_ = defaultdict(list)
length = len(distances[l][k])
median = distances[l][k][math.ceil(length / 2)]
stepsize = median / 10
indices = list(map(lambda x: 19 if x > median * 2 else math.floor(x / stepsize), distances[l][k]))
for i in range(len(candidates[0][0])):
dict[str(indices[i])].append(candidates[l][k][i])
tables.append(dict)
dict_[str(indices[i])].append(candidates[l][k][i])
tables.append(dict_)
length = len(average_distances)
median = average_distances[math.ceil(length / 2)]
......@@ -86,11 +115,7 @@ def lsh(data, query, parameters=None, weights=None):
std_values = np.std(windows, 0)
max_values = average_values + std_values
min_values = average_values - std_values
prototype = {
'average': average_values.tolist(),
'max': max_values.tolist(),
'min': min_values.tolist()
}
prototype = {"average": average_values.tolist(), "max": max_values.tolist(), "min": min_values.tolist()}
samples = np.array(list(filter(lambda x: x in samples_set, average_candidates))).tolist()
......@@ -102,16 +127,31 @@ def lsh(data, query, parameters=None, weights=None):
"average_distances": average_distances,
"tables": tables,
"average_table": average_table,
"samples": list(samples),
"samples": samples,
"prototype": prototype,
"parameters": [float(r), float(a), float(sd)]
"parameters": [float(r), float(a), float(sd)],
}
return response
def preprocess(data, r=None):
def _calculate_lsh_parameters(data: np.ndarray, r: Optional[float] = None) -> Tuple[float, float, float]:
"""Calculate optimal LSH parameters.
Parameters
----------
data : np.ndarray
Preprocessed data with the shape 3d array [m][t][d].
r : Optional[float]
LSH parameter r.
Returns
-------
Tuple[float]
Optimal LSH parameters r, a and sd.
"""
subset = []
t0 = time()
time_start = perf_counter()
if r is None:
r = data.shape[2]
started = False
......@@ -132,7 +172,7 @@ def preprocess(data, r=None):
i = i + 1
if len(subset) > 400:
print('bigger')
print("bigger")
if not started:
first = r
last = last * 2
......@@ -143,14 +183,14 @@ def preprocess(data, r=None):
i = 0
print("r = " + str(r))
elif (i == 10000 or i == len(data)) and len(subset) < 10:
print('smaller')
print("smaller")
started = True
last = r
r = (first + last) / 2 # r / 2
subset = []
i = 0
print("r = " + str(r))
elif (i == len(data)):
elif i == len(data):
break
dtw_distances = []
......@@ -160,12 +200,12 @@ def preprocess(data, r=None):
if index_1 == index_2:
continue
e = np.linalg.norm(data[index_1] - data[index_2])
if (math.isnan(e) or e == 0):
if math.isnan(e) or e == 0:
eq_distances.append(0.0001)
dtw_distances.append(0.0001)
continue
eq_distances.append(e)
d = dtw(data[index_1], data[index_2], global_constraint='sakoe_chiba', sakoe_chiba_radius=int(0.05 * 120))
d = dtw(data[index_1], data[index_2], global_constraint="sakoe_chiba", sakoe_chiba_radius=int(0.05 * 120))
dtw_distances.append(d)
ratios = np.array(dtw_distances) / np.array(eq_distances)
......@@ -173,37 +213,67 @@ def preprocess(data, r=None):
sd_dtw = np.std(dtw_distances)
mean_eq = np.mean(eq_distances)
sd_eq = np.std(eq_distances)
a = np.mean(ratios)
sd = np.std(ratios)
theta = mean_dtw + -2.58 * sd_dtw
a = float(np.mean(ratios))
sd = float(np.std(ratios))
theta = float(mean_dtw + -2.58 * sd_dtw)
# theta = mean_eq + -2.58 * sd_eq
r = theta / ((a - sd) * math.sqrt(120))
r = float(theta / ((a - sd) * math.sqrt(120)))
if r < 0:
print('Actual r ' + str(r))
logging.info(f"Actual r: {r}")
r = mean_dtw / 100
# r = theta / (math.sqrt(120))
print('Mean: ' + str(mean_dtw))
print('Stdev: ' + str(sd_dtw))
print('Ratio mean: ' + str(a))
print('Ratio stdev: ' + str(sd))
print('Theta: ' + str(theta))
print('r: ' + str(r))
print('Preprocessing time: ' + str(time() - t0))
logging.info(f"Mean: {mean_dtw}")
logging.info(f"Std: {sd_dtw}")
logging.info(f"Ratio mean: {a}")
logging.info(f"Ratio std: {sd}")
logging.info(f"Theta: {theta}")
logging.info(f"r: {r}")
logging.info(f"Preprocessing time: {perf_counter()-time_start}")
return r, a, sd
def weights(data, query, old_weights, labels, hash_functions):
alpha = 0.2
d = len(query)
print(d)
def weights(
data: np.ndarray, query: np.ndarray, old_weights: Sequence[float], labels: np.ndarray, hash_functions: np.ndarray
) -> List[float]:
"""Update channel weights.
all_good_windows = data[[[int(index) for index, value in labels.items() if value is True]]]
Parameters
----------
data : np.ndarray
Preprocessed data in shape 3d array [m][t][d].
query : Optional[np.ndarray]
Query in shape 2d array [t][d].
old_weights : List[float]
Old channel weights in shape 1d array [d].
labels : np.ndarray
True positive labels in shape 1d array [?].
hash_functions : np.ndarray
Correct hash functions in shape 2d array [?][d]
Returns
-------
List[float]
Updated channel weights in shape 1d array [d].
"""
def normalize(array):
def normalize(array: np.ndarray) -> np.ndarray:
"""Normalize a vector with its first norm."""
array /= np.sum(array)
array *= d
return np.sqrt(array)
def normalize_hash_functions(hfs: np.ndarray) -> np.ndarray:
"""Normalize hash functions"""
return normalize(np.square(np.sum(hash_functions, axis=0)))
alpha = 0.2
d = len(query)
logging.info(f"Dimensions: {d}")
all_good_windows = data[[[int(index) for index, value in labels.items() if value is True]]]
good_distances = np.zeros(len(query))
for window in all_good_windows:
for i in range(len(all_good_windows[0])):
......@@ -215,32 +285,53 @@ def weights(data, query, old_weights, labels, hash_functions):
good_distances = np.ones(len(query)) - good_distances
good_distances = normalize(good_distances)
if len(hash_functions) != 0:
summed_hash_functions = np.sum(hash_functions, axis=0)
summed_hash_functions = np.square(summed_hash_functions)
normalized_hash_functions = normalize(summed_hash_functions)
# if len(hash_functions) != 0:
# summed_hash_functions = np.sum(hash_functions, axis=0)
# summed_hash_functions = np.square(summed_hash_functions)
# normalized_hash_functions = normalize(summed_hash_functions)
if len(hash_functions) + len(all_good_windows) == 0:
print("no update")
logging.info(f"No update required.")
new_weights = old_weights
elif len(hash_functions) == 0:
print("only windows")
logging.info("Only update with the true positive labels.")
new_weights = alpha * np.array(old_weights) + (1 - alpha) * good_distances
new_weights = normalize(np.square(new_weights))
elif len(all_good_windows) == 0:
print("only tables")
new_weights = alpha * np.array(old_weights) + (1 - alpha) * normalized_hash_functions
logging.info("Only update with the correct hash tables.")
new_weights = alpha * np.array(old_weights) + (1 - alpha) * normalize_hash_functions(hash_functions)
new_weights = normalize(np.square(new_weights))
else:
print("tables & windows")
new_weights = alpha * np.array(old_weights) + 0.5 * (1 - alpha) * good_distances + 0.5 * (1 - alpha) * normalized_hash_functions
logging.info("Update with both the true positive labels and the correct hash tables.")
new_weights = (
alpha * np.array(old_weights)
+ 0.5 * (1 - alpha) * good_distances
+ 0.5 * (1 - alpha) * normalize_hash_functions(hash_functions)
)
new_weights = normalize(np.square(new_weights))
print(new_weights)
logging.info(f"New weights: {new_weights}")
# print(new_weights)
return new_weights.tolist()
def table_info(data, table):
def table_info(data: np.ndarray, table: Sequence[Sequence[int]]) -> Dict[str, Any]:
"""Create prototypes.
Parameters
----------
data : np.ndarray
Preprocessed data in shape 3d array [m][t][d].
table : Sequence[Sequence[int]]
Hash tables in shape 2d array [x][?]
Returns
-------
Dict[str, Any]
Prototypes and distances.
"""
prototypes = []
for cluster in table:
windows = data[cluster]
......@@ -248,16 +339,28 @@ def table_info(data, table):
std_values = np.std(windows, 0)
max_values = average_values + std_values
min_values = average_values - std_values
prototypes.append({
'average': average_values.tolist(),
'max': max_values.tolist(),
'min': min_values.tolist()
})
return {'prototypes': prototypes, 'distances': []}
prototypes.append({"average": average_values.tolist(), "max": max_values.tolist(), "min": min_values.tolist()})
return {"prototypes": prototypes, "distances": []}
def query(data: np.ndarray, window_indices: Union[int, Dict[int, bool]]) -> List[List[float]]:
"""Prepare the query.
Define the query as the chosen window or update the query using DBA with true positive labels.
def query(data, window_indices):
"""Prepare the query."""
Parameters
----------
data : np.ndarray
Preprocessed data in shape 3d array [m][t][d].
window_indices : Union[int, Dict[int, bool]]
Indices of true positive labels.
Returns
-------
List[List[float]]
Updated query.
"""
if isinstance(window_indices, int):
output = data[window_indices]
print(output.tolist())
......@@ -269,14 +372,14 @@ def query(data, window_indices):
def debug_test_lsh():
data = np.load('cache/preprocessed_data.npy')
data = np.load("cache/preprocessed_data.npy")
# data = np.repeat(data, repeats=7, axis=1)
print(data.shape)
data = np.reshape(data, (len(data), len(data[0][0]), len(data[0])))
data = np.array(data, dtype='float32')
data = np.array(data, dtype="float32")
print(data.shape)
r, a, sd = preprocess(data, 11.25)
r, a, sd = _calculate_lsh_parameters(data, 11.25)
query_n = 1234
t0 = time()
query = data[query_n]
......@@ -295,7 +398,7 @@ def debug_test_lsh():
t0 = time()
# distances = [dtw_ndim.distance_fast(window, query) for window in data]
distances = [dtw(window, query, global_constraint='sakoe_chiba', sakoe_chiba_radius=int(0.05 * 120)) for window in data]
distances = [dtw(window, query, global_constraint="sakoe_chiba", sakoe_chiba_radius=int(0.05 * 120)) for window in data]
topk_dtw = sorted(range(len(distances)), key=lambda k: distances[k])
print("Calculated exact dtw in: " + str(time() - t0))
print(topk_dtw[0:20])
......
......@@ -9,14 +9,27 @@ import yaml
from flask import Flask, request
from flask_cors import CORS
from src import pseudo
from src import lsh
from src.preprocessing import Preprocessor
logging.basicConfig(level=logging.INFO)
# Read config
def read_config(path_global_config_yml: Union[Path, str] = Path("../config.yml")) -> Dict[str, Any]:
def load_config(path_global_config_yml: Union[Path, str] = Path("../config.yml")) -> Dict[str, Any]:
"""Read the global configuration file
Parameters
----------
path_global_config_yml : Path | str, default Path("../config.yml")
Path of the global configuration YAML-file.
Returns
-------
Dict[str, Any]
Global configuration, including which dataset to use.
"""
with open(path_global_config_yml, "r") as fp:
config = yaml.load(fp, yaml.FullLoader)
# reload = False
......@@ -28,7 +41,7 @@ def read_config(path_global_config_yml: Union[Path, str] = Path("../config.yml")
return config
config = read_config()
config = load_config()
preprossessor = Preprocessor(
dataset=config["dataset"],
path_data_hdf=Path(config["path_data_hdf"]),
......@@ -47,18 +60,17 @@ def index():
return "hi"
@app.route("/read-data", methods=["GET"])
def read_data():
"""
Load raw data.
@app.route("/load-data", methods=["GET"])
def load_data():
"""Load raw data.
Returns
-------
response : List[Dict[str, np.ndarray]]
bytes
Loaded data with the following interface
{
index: 1d array [x]
values: 1d array [x]
index: 1d array [x],
values: 1d array [x],
name: str
}[]
......@@ -67,30 +79,29 @@ def read_data():
logging.info("Loading data ...")
time_start = perf_counter()
response = preprossessor.create_response()
response = preprossessor.load_data()
response = orjson.dumps(response)
logging.info(f"Completed loading data with {perf_counter() - time_start:.2f} second(s).")
logging.info(f"Completed loading data in {perf_counter() - time_start:.2f} second(s).")
return response
@app.route("/create-windows", methods=["POST"])
def create_windows():
"""
Creates windows to transform the local pattern search problem to time series indexing.
"""Creates windows to transform the local pattern search problem to time series indexing.
Returns
-------
'1'
Use dynamic input from request with the interface
Input from request with interface
{
parameters: {
windowssize: int
}
}
Returns
-------
'1'
"""
logging.info("Creating window ...")
......@@ -101,41 +112,40 @@ def create_windows():
window_size = int(raw_data["parameters"]["windowsize"])
preprossessor.create_windows(window_size)
logging.info(f"Completed windows with {perf_counter() - time_start:.2f} second(s).")
logging.info(f"Completed windows in {perf_counter() - time_start:.2f} second(s).")
return "1"
@app.route("/initialize", methods=["POST"])
def initialize():
"""
Conduct the initial LSH.
"""Conduct the initial LSH.
Input from request with interface
{
query: 2d array [d][t]
}
Returns
-------
response : Dict[str, Any]
Response with the interface
bytes
Response with interface
{
hash_functions: 3d array [k][l][d]
candidates: 3d array [k][l][i]
distances: 3d array [k][l][i]
average_candidates: 1d array [i]
average_distances: 1d array [i]
hash_functions: 3d array [k][l][d],
candidates: 3d array [k][l][i],
distances: 3d array [k][l][i],
average_candidates: 1d array [i],
average_distances: 1d array [i],
tables: [{
bucket: 1d array
}]
}],
average_table: {
bucket: 1d array
}
samples: 1d array
},
samples: 1d array,
parameters: 1d array
}
Use dynamic input from request have the interface
{
query: 2d array [d][t]
}
"""
logging.info("Starting the initial LSH ...")
......@@ -143,8 +153,8 @@ def initialize():
# Read windows
raw_data = orjson.loads(request.data)
data_windowized = np.load(preprossessor.path_preprocessed_data_npy_)
data_windowized = np.swapaxes(data_windowized, 1, 2) # Use a column vector for each channel
data = np.load(preprossessor.path_preprocessed_data_npy_)
data = np.swapaxes(data, 1, 2) # Use a column vector for each channel
# Read the query
query = raw_data["query"]
......@@ -153,64 +163,69 @@ def initialize():
# Run the initial LSH
logging.info("Executing the initial LSH ...")
lsh_data = pseudo.lsh(data_windowized, query)
lsh_data = lsh.lsh(data, query)
response = orjson.dumps(lsh_data)
logging.info(f"Completed the initial LSH with {perf_counter() - time_start:2f} second(s)")
logging.info(f"Completed the initial LSH in {perf_counter() - time_start:2f} second(s)")
return response
@app.route("/get-lsh-parameters", methods=["POST"])
def get_lsh_parameters():
"""