Source code for dsipts.data_structure.utils

from enum import Enum
from typing import Union
import pandas as pd
from torch.utils.data import Dataset
import numpy as np
from pytorch_lightning import Callback
import torch
import os
import logging
from typing import Union
[docs] def beauty_string(message:str,type:str,verbose:bool): size = 150 if verbose is True: if type=='block': characters = len(message) border = max((100-characters)//2-5,0) logging.info('\n') logging.info(f"{'#'*size}") logging.info(f"{'#'*border}{' '*(size-border*2)}{'#'*border}") logging.info(f"{ message:^{size}}") logging.info(f"{'#'*border}{' '*(size-border*2)}{'#'*border}") logging.info(f"{'#'*size}") elif type=='section': logging.info('\n') logging.info(f"{'#'*size}") logging.info(f"{ message:^{size}}") logging.info(f"{'#'*size}") elif type=='info': logging.info(f"{ message:^{size}}") else: logging.info(message)
[docs] def extend_time_df(x:pd.DataFrame,freq:Union[str,int],group:Union[str,None]=None,global_minmax:bool=False)-> pd.DataFrame: """Utility for generating a full dataset and then merge the real data Args: x (pd.DataFrame): dataframe containing the column time freq (str): frequency (in pandas notation) of the resulting dataframe group (string or None): if not None the min max are computed by the group column, default None global_minmax (bool): if True the min_max is computed globally for each group. Usually used for stacked model Returns: pd.DataFrame: a dataframe with the column time ranging from thr minumum of x to the maximum with frequency `freq` """ if group is None: if isinstance(freq,int): empty = pd.DataFrame({'time':list(range(x.time.min(),x.time.max(),freq))}) else: empty = pd.DataFrame({'time':pd.date_range(x.time.min(),x.time.max(),freq=freq)}) else: if global_minmax: _min = pd.DataFrame({group:x[group].unique(),'time':x.time.min()}) _max = pd.DataFrame({group:x[group].unique(),'time':x.time.max()}) else: _min = x.groupby(group).time.min().reset_index() _max = x.groupby(group).time.max().reset_index() empty = [] for c in x[group].unique(): if isinstance(freq,int): empty.append(pd.DataFrame({group:c,'time':np.arange(_min.time[_min[group]==c].values[0],_max.time[_max[group]==c].values[0],freq)})) else: empty.append(pd.DataFrame({group:c,'time':pd.date_range(_min.time[_min[group]==c].values[0],_max.time[_max[group]==c].values[0],freq=freq)})) empty = pd.concat(empty,ignore_index=True) return empty
class MetricsCallback(Callback): """PyTorch Lightning metric callback. :meta private: """ def __init__(self,dirpath): super().__init__() self.dirpath = dirpath self.metrics = {'val_loss':[],'train_loss':[]} def on_validation_end(self, trainer, pl_module): for c in trainer.callback_metrics: self.metrics[c].append(trainer.callback_metrics[c].item()) ##Write csv in a convenient way tmp = self.metrics.copy() tmp['val_loss'] = tmp['val_loss'][2:] losses = pd.DataFrame(tmp) losses.to_csv(os.path.join(self.dirpath,'loss.csv'),index=False) def on_train_end(self, trainer, pl_module): losses = self.metrics ##non so perche' le prime due le chiama prima del train losses['val_loss'] = losses['val_loss'][2:] losses = pd.DataFrame(losses) ##accrocchio per quando ci sono piu' gpu! losses.to_csv(os.path.join(self.dirpath,f'{np.random.randint(10000)}__losses__.csv'),index=False) print("Saving losses on file because multigpu not working")
[docs] class MyDataset(Dataset):
[docs] def __init__(self, data:dict,t:np.array,groups:np.array,idx_target:Union[np.array,None],idx_target_future:Union[np.array,None])->torch.utils.data.Dataset: """ Extension of Dataset class. While training the returned item is a batch containing the standard keys Args: data (dict): a dictionary. Each key is a np.array containing the data. The keys are: y : the target variable(s) x_num_past: the numerical past variables x_num_future: the numerical future variables x_cat_past: the categorical past variables x_cat_future: the categorical future variables idx_target: index of target features in the past array t (np.array): the time array related to the target variables idx_target (Union[np.array,None]): you can specify the index in the past data that represent the input features (for differntial analysis or detrending strategies) idx_target_future (Union[np.array,None]): you can specify the index in the future data that represent the input features (for differntial analysis or detrending strategies) Returns: torch.utils.data.Dataset: a torch Dataset to be used in a Dataloader """ self.data = data self.t = t self.groups = groups self.idx_target = np.array(idx_target) if idx_target is not None else None self.idx_target_future = np.array(idx_target_future) if idx_target_future is not None else None
def __len__(self): return len(self.data['y']) def __getitem__(self, idxs): sample = {} for k in self.data: sample[k] = self.data[k][idxs] if self.idx_target is not None: sample['idx_target'] = self.idx_target if self.idx_target_future is not None: sample['idx_target_future'] = self.idx_target_future return sample
class ActionEnum(Enum): """action of categorical variable :meta private: """ multiplicative: str = 'multiplicative' additive: str = 'additive'