Commit 33ced1b8 authored by Yuncong Yu's avatar Yuncong Yu
Browse files

Unify input APIs; add EEG eye state dataset.

parent be2e7008
import logging
import yaml
from pathlib import Path
from time import time, perf_counter
from typing import Any, Dict, List, Union
import numpy as np
import orjson
import yaml
from flask import Flask, request
from flask_cors import CORS
from src import preprocessing
from src import pseudo
from src.preprocessing import Preprocessor
logging.basicConfig(level=logging.INFO)
# Read config
......@@ -27,13 +29,14 @@ def read_config(path_global_config_yml: Union[Path, str] = Path("../config.yml")
config = read_config()
# path_preprocessed_data_npy = 'cache/preprocessed-data.npy'
path_data_hdf = Path(config["path_data_hdf"])
path_meta_json = Path(config["path_meta_json"])
path_preprocessed_data_npy = Path(config["dir_cache"]) / "preprocessed_data.npy"
channel_names = config["channels"]
dir_in_hdf = config["dir_in_hdf"]
logging.basicConfig(level=logging.INFO)
preprossessor = Preprocessor(
dataset=config["dataset"],
path_data_hdf=Path(config["path_data_hdf"]),
path_meta_json=Path(config["path_meta_json"]),
channel_names=config["channels"],
dir_in_hdf=config["dir_in_hdf"],
dir_cache=Path(config["dir_cache"]),
)
app = Flask(__name__)
CORS(app)
......@@ -64,11 +67,7 @@ def read_data():
logging.info("Loading data ...")
time_start = perf_counter()
# response = preprocessing.read_weather_data()
# response = preprocessing.read_egr_data()
response = preprocessing.read_data(
path_data_hdf=path_data_hdf, path_meta_json=path_meta_json, channel_names=channel_names, dir_in_hdf=dir_in_hdf
)
response = preprossessor.create_response()
response = orjson.dumps(response)
logging.info(f"Completed loading data with {perf_counter() - time_start:.2f} second(s).")
......@@ -97,11 +96,10 @@ def create_windows():
logging.info("Creating window ...")
time_start = perf_counter()
if not Path(path_preprocessed_data_npy).is_file():
if not preprossessor.path_preprocessed_data_npy_.is_file():
raw_data = request.json
window_size = int(raw_data["parameters"]["windowsize"])
# preprocessing.create_eeg_windows(window_size, 5)
preprocessing.create_egr_windows(window_size)
preprossessor.create_windows(window_size)
logging.info(f"Completed windows with {perf_counter() - time_start:.2f} second(s).")
......@@ -145,7 +143,7 @@ def initialize():
# Read windows
raw_data = orjson.loads(request.data)
data_windowized = np.load(path_preprocessed_data_npy)
data_windowized = np.load(preprossessor.path_preprocessed_data_npy_)
data_windowized = np.swapaxes(data_windowized, 1, 2) # Use a column vector for each channel
# Read the query
......@@ -179,7 +177,7 @@ def get_lsh_parameters():
t0 = time()
raw_data = orjson.loads(request.data)
window_size = raw_data["windowsize"]
data = np.load(path_preprocessed_data_npy)
data = np.load(preprossessor.path_preprocessed_data_npy_)
data = np.swapaxes(data, 1, 2)
parameters = pseudo.get_lsh_parameters(data, window_size)
......@@ -214,7 +212,7 @@ def update():
"""
t0 = time()
raw_data = orjson.loads(request.data)
data = np.load(path_preprocessed_data_npy)
data = np.load(preprossessor.path_preprocessed_data_npy_)
data = np.swapaxes(data, 1, 2)
query = raw_data["query"]
query = np.swapaxes(query, 0, 1)
......@@ -247,7 +245,7 @@ def weights():
hash_functions = raw_data["hash_functions"]
query = raw_data["query"]
old_weights = raw_data["weights"]
data = np.load(path_preprocessed_data_npy)
data = np.load(preprossessor.path_preprocessed_data_npy_)
new_weights = pseudo.weights(data, query, old_weights, labels, hash_functions)
......@@ -271,17 +269,15 @@ def query():
time_start = perf_counter()
raw_data = orjson.loads(request.data)
# print(raw_data)
start_index = raw_data["start_index"]
query_size = raw_data["query_size"]
window_indices = raw_data["indices"]
if start_index is not None:
# preprocessing.create_weather_windows(query_size)
preprocessing.create_egr_windows(query_size)
preprossessor.create_windows(query_size)
window_indices = int(start_index)
data_windowized = np.load(path_preprocessed_data_npy)
data_windowized = np.load(preprossessor.path_preprocessed_data_npy_)
response = pseudo.query(data_windowized, window_indices)
response = orjson.dumps(response)
......@@ -305,7 +301,7 @@ def window():
raw_data = orjson.loads(request.data)
indices = raw_data["indices"]
output = np.load(path_preprocessed_data_npy)[indices]
output = np.load(preprossessor.path_preprocessed_data_npy_)[indices]
response = orjson.dumps(output.tolist())
print("Window(s) done: " + str(time() - t0))
......@@ -333,7 +329,7 @@ def table_info():
t0 = time()
raw_data = orjson.loads(request.data)
table = raw_data["table"]
data = np.load(path_preprocessed_data_npy)
data = np.load(preprossessor.path_preprocessed_data_npy_)
response = pseudo.table_info(data, table)
......
import json
import logging
import os.path
from pathlib import Path
from typing import Any, Dict, List, Union
from typing import Dict, List, Union
# from libs import bigwig
# import bbi
import dask.dataframe as dd
# import dask.dataframe as dd
import numpy as np
import pandas as pd
# import pandas as pd
import tables
from sklearn.preprocessing import minmax_scale
......@@ -16,249 +16,363 @@ logging.basicConfig(level=logging.INFO)
processed_data_path = "cache/preprocessed-data.npy"
# def read_data():
# size = bbi.chromsizes("test.bigWig")["chr1"]
# bins = 100000
# data = bigwig.get("data/test.bigWig", "chr1", 0, size, bins)
# print(data.shape)
# response = [
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# ]
# return response
def create_peax_windows_12kb(window_size):
data = bigwig.chunk("test.bigWig", 12000, int(12000 / window_size), int(12000 / 6), ["chr1"], verbose=True,)
data = np.reshape(data, (len(data), 1, len(data[0])))
np.save(processed_data_path, data)
return "1"
def create_peax_windows_12kb_mts(window_size):
data = bigwig.chunk("test.bigWig", 12000, int(12000 / window_size), int(12000 / 6), ["chr1"], verbose=True,)
data = np.reshape(data, (len(data), 1, len(data[0])))
data2 = np.copy(data)
np.random.shuffle(data2)
data3 = np.copy(data)
np.random.shuffle(data3)
data = np.concatenate((data, data2), axis=1)
data = np.concatenate((data, data3), axis=1)
np.save(processed_data_path, data)
return "1"
def read_eeg_data(nr_of_channels):
response = []
datafile = "../data/21.csv"
data = pd.read_csv(datafile, header=None)
npdata = np.array(data, dtype="float32")
del data
for i in range(4, 4 + nr_of_channels):
response.append({"index": list(range(0, len(npdata), 100)), "values": npdata[::100, i].tolist()})
print(npdata.shape)
return response
def create_eeg_windows(window_size, nr_of_channels):
data_path = "../data/processed-data_" + str(window_size) + ".npy"
datafile = "../data/21.csv"
if not os.path.isfile(data_path):
data = pd.read_csv(datafile, header=None)
npdata = np.array(data, dtype="float32")
del data
np_window_data = np.array(
[
minmax_scale(npdata[i : i + window_size, 4 : nr_of_channels + 4])
for i in range(0, npdata.shape[0] - window_size, int(window_size / 8))
]
)
del npdata
data = np.reshape(np_window_data, (len(np_window_data), nr_of_channels, len(np_window_data[0])))
np.save(data_path, data)
np.save("../data/processed-data", np.load(data_path))
return "1"
def read_eeg_data(nr_of_channels):
response = []
datafile = "../data/21.csv"
data = pd.read_csv(datafile, header=None)
npdata = np.array(data, dtype="float32")
del data
for i in range(4, 4 + nr_of_channels):
response.append({"index": list(range(0, len(npdata), 100)), "values": npdata[::100, i].tolist()})
print(npdata.shape)
return response
def read_weather_data():
filename = "../data/weather.pkl"
if not os.path.isfile(filename):
print("start")
df = dd.read_csv("../data/NW_Ground_Stations_2016.csv", usecols=["number_sta", "date", "t", "hu", "td", "dd", "ff", "psl", "precip"])
print("read file")
df = df.loc[df["number_sta"].isin([14066001, 14137001, 14216001, 14372001, 22092001, 22113006, 22135001])].fillna(0)
df["date"] = dd.to_datetime(df["date"], format="%Y%m%d %H:%M")
print("split rows")
df = df.compute()
df.to_pickle(filename)
print("to_pandas")
df = pd.read_pickle(filename)
# df.dropna(subset=['t'], inplace=True)
response = [
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "t"].values.tolist(),
"name": "t",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "hu"].values.tolist(),
"name": "hu",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "td"].values.tolist(),
"name": "td",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "dd"].values.tolist(),
"name": "dd",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "ff"].values.tolist(),
"name": "ff",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "psl"].values.tolist(),
"name": "psl",
},
{
"index": (df.loc[df["number_sta"] == 14137001].loc[:, "date"].values.astype(int) / 10 ** 6).tolist(),
"values": df.loc[df["number_sta"] == 14137001].loc[:, "precip"].values.tolist(),
"name": "precip",
},
]
return response
def create_weather_windows(window_size):
if not os.path.isfile("cache/weather-" + str(window_size) + ".npy"):
filename = "../data/weather.pkl"
df = pd.read_pickle(filename)
channels = list()
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "t"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "hu"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "td"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "dd"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "ff"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "psl"].fillna(0).values.tolist())
channels.append(df.loc[df["number_sta"] == 14137001].loc[:, "precip"].fillna(0).values.tolist())
data = [([values[i : i + window_size] for values in channels]) for i in range(0, len(channels[0]) - window_size, 1)]
windows = []
for i in range(len(data)):
if i % 5000 == 0:
print(i)
windows.append(minmax_scale(data[i], (-1, 1), axis=1))
print("dims:")
print(windows[0].size)
np.save("../data/weather-" + str(window_size), windows)
data = np.load("../data/weather-" + str(window_size) + ".npy")
np.save("../data/processed-data", data)
return "1"
def read_egr_data():
"""Read EGR data from Roxane and Kai from TP."""
# Config
path_data_original: Union[
Path, str
] = "../data/egr/201207_IAVHeKu_212-SM-9221_WMA4ID41_DS18_TestV_10_EU5FM_800m_0C_freie_Fahrt_nrm_01_compressed.h5"
channel_names = ["time", "ACM_Egrrate_demand_managed", "ACM_Egrrate_feedback_filt", "ACM_Egr_enable"]
# Load data
logging.info(f"Loading data from {path_data_original}")
print(os.getcwd())
with tables.open_file(path_data_original) as fp:
data: np.ndarray = fp.root.resampled[:, :]
logging.info(f"Completed loading data with {data.shape[1] - 1} channels and {data.shape[0]} time steps.")
# Create response
response = [
{"index": list(range(0, len(data))), "values": channel.tolist(), "name": channel_name}
for channel, channel_name in zip(data.T, channel_names)
]
return response
def create_egr_windows(window_size):
"""Create windows for EGR dataset."""
# Config
path_data_original_hdf = "../data/egr/201207_IAVHeKu_212-SM-9221_WMA4ID41_DS18_TestV_10_EU5FM_800m_0C_freie_Fahrt_nrm_01_compressed.h5"
path_data_cached_npy = f"cache/egr_cached_{window_size}.npy"
path_data_preprocessed_npy = f"cache/preprocessed_data.npy"
# Created cached data
if not Path(path_data_cached_npy).is_file():
with tables.open_file(path_data_original_hdf) as fp:
data = fp.root.resampled[:, :]
data = [minmax_scale(data[id_time_step : (id_time_step + window_size)]).T for id_time_step in range(len(data) - window_size)]
np.save(path_data_cached_npy, data)
data = np.load(path_data_cached_npy)
np.save(path_data_preprocessed_npy, data)
return "1"
def read_data(path_data_hdf: Union[Path, str], path_meta_json: Union[Path, str], channel_names: List[str], dir_in_hdf: str) -> List[Dict[str, Any]]:
"""Read named channels in the given data file.
class Preprocessor:
"""Load and preprocess the data.
Parameters
----------
dataset : str
Name of the dataset. It is used in cached files.
path_data_hdf : Path | str
Path of the HDF file with data values. Rows corresponds to time steps and columns to channels.
path_meta_json : Path | str
Path of the JSON file with meta information.
channel_names : List[str]
Names of channels.
dir_in_hdf : str
Directory in the HDF-file.
dir_cache : Path | str
Cache directory
Returns
-------
List[Dict]
Response with loaded data. It has the shape {"index": [0, 1, ..., n_time_steps-1], "values": 1D-array, "name": str}[].
Attributes
----------
path_preprocessed_data_npy_ : Path
Path of the preprocessed npy data.
path_cached_data_npy_ : Path | None
Path of the cached npy data.
"""
# Load data
logging.info(f"Loading data from {path_data_hdf}")
with tables.open_file(path_data_hdf) as fp:
data: np.ndarray = getattr(fp.root, dir_in_hdf)[:, :]
logging.info(f"Completed loading data with {data.shape[1] - 1} channels and {data.shape[0]} time steps.")
# Load channel names
with open(path_meta_json, "r") as fp:
meta = json.load(fp)
channel_names_in_file = ["time"] + meta["short_names"][1:]
# Sort channels
sorted_indices = [channel_names_in_file.index(channel_name) for channel_name in channel_names]
data = data[:, sorted_indices]
# Create response
response = [
{"index": list(range(0, len(data))), "values": channel.tolist(), "name": channel_name}
for channel, channel_name in zip(data.T, channel_names)
]
return response
# def create_windows()
def __init__(
self,
dataset: str,
path_data_hdf: Union[Path, str],
path_meta_json: Union[Path, str],
channel_names: List[str],
dir_in_hdf: str,
dir_cache: Union[Path, str],
):
self.dataset = dataset
self.path_data_hdf = Path(path_data_hdf)
self.path_meta_json = Path(path_meta_json)
self.channel_names = channel_names
self.dir_in_hdf = dir_in_hdf
self.dir_cache = Path(dir_cache)
self.path_preprocessed_data_npy_ = self.dir_cache / "preprocessed_data.npy"
self.path_cached_data_npy_ = None
def create_response(self):
"""Pack the loaded data into response.
Returns
-------
List[Dict]
Response with loaded data. It has the shape {"index": [0, 1, ..., n_time_steps-1], "values": 1D-array, "name": str}[].
"""
data = self._load_data()
response = [
{"index": list(range(0, len(data))), "values": channel.tolist(), "name": channel_name}
for channel, channel_name in zip(data.T, self.channel_names)
]
return response
def create_windows(self, window_size):
"""Create slices of the time series with sliding windows.
Returns
-------
"1"
"""
self.path_cached_data_npy_ = self.dir_cache / f"{self.dataset}_cached_{window_size}.npy"
if not self.path_cached_data_npy_.is_file():
data = self._load_data()
data = [minmax_scale(data[id_time_step : (id_time_step + window_size)]).T for id_time_step in range(len(data) - window_size)]
np.save(self.path_cached_data_npy_, data)
data = np.load(self.path_cached_data_npy_)
np.save(self.path_preprocessed_data_npy_, data)
return "1"
def _load_data(self):
"""Load data
Returns
-------
np.ndarray
Loaded data, rows corresponds to time steps and columns channels. The first column is time.
"""
# Load raw data
logging.info(f"Loading data from {self.path_data_hdf}")
with tables.open_file(self.path_data_hdf) as fp:
data: np.ndarray = getattr(fp.root, self.dir_in_hdf)[:, :]
logging.info(f"Completed loading data with {data.shape[1] - 1} channels and {data.shape[0]} time steps.")
# Load channel names
with open(self.path_meta_json, "r") as fp:
meta = json.load(fp)
channel_names_in_file = ["time"] + meta["short_names"][1:]
# Sort channels
sorted_indices = [channel_names_in_file.index(channel_name) for channel_name in self.channel_names]
data = data[:, sorted_indices]
return data
# Archived code below
# def read_data(
# path_data_hdf: Union[Path, str], path_meta_json: Union[Path, str], channel_names: List[str], dir_in_hdf: str
# ) -> List[Dict[str, Any]]:
# """Read named channels in the given data file.
#
# Parameters
# ----------
# path_data_hdf : Path | str
# Path of the HDF file with data values. Rows corresponds to time steps and columns to channels.
# path_meta_json : Path | str
# Path of the JSON file with meta information.
# channel_names : List[str]
# Names of channels.
# dir_in_hdf : str
# Directory in the HDF-file.
#
# Returns
# -------
# List[Dict]
# Response with loaded data. It has the shape {"index": [0, 1, ..., n_time_steps-1], "values": 1D-array, "name": str}[].
#
# """
# # Load data
# logging.info(f"Loading data from {path_data_hdf}")
# with tables.open_file(path_data_hdf) as fp:
# data: np.ndarray = getattr(fp.root, dir_in_hdf)[:, :]
# logging.info(f"Completed loading data with {data.shape[1] - 1} channels and {data.shape[0]} time steps.")
#
# # Load channel names
# with open(path_meta_json, "r") as fp:
# meta = json.load(fp)
# channel_names_in_file = ["time"] + meta["short_names"][1:]
#
# # Sort channels
# sorted_indices = [channel_names_in_file.index(channel_name) for channel_name in channel_names]
# data = data[:, sorted_indices]
#
# # Create response
# response = [
# {"index": list(range(0, len(data))), "values": channel.tolist(), "name": channel_name}
# for channel, channel_name in zip(data.T, channel_names)
# ]
#
# return response
#
#
# def read_data():
# size = bbi.chromsizes("test.bigWig")["chr1"]
# bins = 100000
# data = bigwig.get("data/test.bigWig", "chr1", 0, size, bins)
# print(data.shape)
# response = [
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# {"index": list(range(0, size, int(size / (bins)))), "values": data.tolist()},
# ]
# return response
#
#
# def create_peax_windows_12kb(window_size):
# data = bigwig.chunk("test.bigWig", 12000, int(12000 / window_size), int(12000 / 6), ["chr1"], verbose=True,)
# data = np.reshape(data, (len(data), 1, len(data[0])))
# np.save(processed_data_path, data)
# return "1"
#
#
# def create_peax_windows_12kb_mts(window_size):
# data = bigwig.chunk("test.bigWig", 12000, int(12000 / window_size), int(12000 / 6), ["chr1"], verbose=True,)
# data = np.reshape(data, (len(data), 1, len(data[0])))
# data2 = np.copy(data)
# np.random.shuffle(data2)
# data3 = np.copy(data)
# np.random.shuffle(data3)
#
# data = np.concatenate((data, data2), axis=1)
# data = np.concatenate((data, data3), axis=1)
# np.save(processed_data_path, data)
# return "1"
#
#
# def read_eeg_data(nr_of_channels):
# response = []
# datafile = "../data/21.csv"
# data = pd.read_csv(datafile, header=None)
# npdata = np.array(data, dtype="float32")
# del data
# for i in range(4, 4 + nr_of_channels):
# response.append({"index": list(range(0, len(npdata), 100)), "values": npdata[::100, i].tolist()})
# print(npdata.shape)
#
# return response
#
#
# def create_eeg_windows(window_size, nr_of_channels):
# data_path = "../data/processed-data_" + str(window_size) + ".npy"
# datafile = "../data/21.csv"
#
# if not os.path.isfile(data_path):
# data = pd.read_csv(datafile, header=None)
# npdata = np.array(data, dtype="float32")
# del data