Source code for dsipts.data_structure.data_structure

import numpy as np
import plotly.express as px
import pandas as pd
from typing import List
from sklearn.preprocessing import LabelEncoder
from sklearn.preprocessing import * 
from torch.utils.data import DataLoader
from pytorch_lightning.callbacks import ModelCheckpoint
import pytorch_lightning as pl
from pytorch_lightning.loggers import CSVLogger
from typing import Union
import os
import torch
import pickle
from .utils import extend_time_df,MetricsCallback, MyDataset, ActionEnum,beauty_string
from datetime import datetime
from ..models.base import Base
from ..models.utils import weight_init_zeros,weight_init
import logging 
from .modifiers import *
from aim.pytorch_lightning import AimLogger
import time


pd.options.mode.chained_assignment = None 
log = logging.getLogger(__name__)
log.addHandler(logging.NullHandler())      
      
      
[docs] class Categorical():
[docs] def __init__(self,name:str, frequency: int,duration: List[int], classes: int, action: ActionEnum, level: List[float]): """Class for generating toy categorical data Args: name (str): name of the categorical signal frequency (int): frequency of the signal duration (List[int]): duration of each class classes (int): number of classes action (str): one between additive or multiplicative level (List[float]): intensity of each class """ self.name = name self.frequency = frequency self.duration = duration self.classes = classes self.action = action self.level = level self.validate()
def validate(self): """Validate, maybe there will be other checks in the future :meta private: """ if len(self.level) == self.classes: pass else: raise ValueError("Length must match")
[docs] def generate_signal(self,length:int)->None: """Generate the resposne signal Args: length (int): length of the signal """ if self.action == 'multiplicative': signal = np.ones(length) elif self.action == 'additive': signal = np.zeros(length) classes = [] _class = 0 _level = self.level[0] _duration = self.duration[0] count = 0 count_freq = 0 for i in range(length): if count_freq%self.frequency == 0: signal[i] = _level classes.append(_class) count+=1 if count == _duration: #change class count = 0 _class+=1 count_freq+= _duration _class = _class%self.classes _level = self.level[_class] _duration = self.duration[_class] else: classes.append(-1) count_freq+=1 self.classes_array = classes self.signal_array = signal
[docs] def plot(self)->None: """Plot the series """ tmp = pd.DataFrame({'time':range(len(self.classes_array)),'signal':self.signal,'class':self.classes_array}) fig = px.scatter(tmp,x='time',y='signal',color='class',title=self.name) fig.show()
[docs] class TimeSeries():
[docs] def __init__(self,name:str,stacked:bool=False): """Class for generating time series object. If you don't have any time series you can build one fake timeseries using some helping classes (Categorical for instance). Args: name (str): name of the series stacked (bool): if true it is a stacked model Usage: For example we can generate a toy timeseries:\n - add a multiplicative categorical feature (weekly)\n >>> settimana = Categorical('settimanale',1,[1,1,1,1,1,1,1],7,'multiplicative',[0.9,0.8,0.7,0.6,0.5,0.99,0.99])\n - an additive montly feature (here a year is composed by 5 months)\n >>> mese = Categorical('mensile',1,[31,28,20,10,33],5,'additive',[10,20,-10,20,0])\n - a spotted categorical variable that happens every 100 days and lasts 1 day\n >>> spot = Categorical('spot',100,[7],1,'additive',[10])\n >>> ts = TimeSeries('prova')\n >>> ts.generate_signal(length = 5000,categorical_variables = [settimana,mese,spot],noise_mean=1,type=0) ##we can add also noise\n >>> ts.plot()\n """ self.is_trained = False self.name = name self.stacked = stacked self.verbose = True self.group = None
def __str__(self) -> str: return f"Timeseries named {self.name} of length {self.dataset.shape[0]}.\n Categorical variable: {self.cat_var},\n Future variables: {self.future_variables},\n Past variables: {self.past_variables},\n Target variables: {self.target_variables} \n With {'no group' if self.group is None else self.group+' as group' }" def __repr__(self) -> str: return f"Timeseries named {self.name} of length {self.dataset.shape[0]}.\n Categorical variable: {self.cat_var},\n Future variables: {self.future_variables},\n Past variables: {self.past_variables},\n Target variables: {self.target_variables}\n With {'no group' if self.group is None else self.group+' as group' }"
[docs] def set_verbose(self,verbose:bool): self.verbose = verbose
def _generate_base(self,length:int,type:int=0)-> None: """Generate a basic timeseries Args: length (int): length type (int, optional): Type of the generated timeseries. Defaults to 0. """ if type==0: self.base_signal = 10*np.cos(np.arange(length)/(2*np.pi*length/100)) self.out_vars = 1 else: beauty_string('Please implement your own method','block',True) """ """
[docs] def generate_signal(self,length:int=5000,categorical_variables:List[Categorical]=[],noise_mean:int=1,type:int=0)->None: """This will generate a syntetic signal with a selected length, a noise level and some categorical variables. The additive series are added at the end while the multiplicative series acts on the original signal The TS structure will be populated Args: length (int, optional): length of the signal. Defaults to 5000. categorical_variables (List[Categorical], optional): list of Categorical variables. Defaults to []. noise_mean (int, optional): variance of the noise to add at the end. Defaults to 1. type (int, optional): type of the timeseries (only type=0 available right now). Defaults to 0. """ dataset = pd.DataFrame({'time':range(length)}) self._generate_base(length,type) signal = self.base_signal.copy() tot = None self.cat_var = [] for c in categorical_variables: c.generate_signal(length) _signal = c.signal_array classes = c.classes_array dataset[c.name] = classes self.cat_var.append(c.name) if c.action=='multiplicative': signal*=_signal else: if tot is None: additive = _signal else: additive+=_signal signal+=additive dataset['signal'] = signal + noise_mean*np.random.randn(len(signal)) self.dataset = dataset self.past_variables = ['signal'] self.future_variables = [] self.target_variables = ['signal'] self.num_var = list(set(self.past_variables).union(set(self.future_variables)).union(set(self.target_variables)))
[docs] def enrich(self,dataset,columns): if columns =='hour': dataset[columns] = dataset.time.dt.hour elif columns=='dow': dataset[columns] = dataset.time.dt.weekday elif columns=='month': dataset[columns] = dataset.time.dt.month elif columns=='minute': dataset[columns] = dataset.time.dt.minute else: if columns not in dataset.columns: beauty_string(f'I can not automatically enrich column {columns}. Please contact the developers or add it manually to your dataset.','section',True)
[docs] def load_signal(self,data:pd.DataFrame, enrich_cat:List[str] = [], past_variables:List[str]=[], future_variables:List[str]=[], target_variables:List[str]=[], cat_var:List[str]=[], check_past:bool=True, group:Union[None,str]=None, check_holes_and_duplicates:bool=True, silly_model:bool=False)->None: """ This is a crucial point in the data structure. We expect here to have a dataset with time as timestamp. There are some checks: 1- the duplicates will tbe removed taking the first instance 2- the frequency will the inferred taking the minumum time distance between samples 3- the dataset will be filled completing the missing timestamps Args: data (pd.DataFrame): input dataset the column indicating the time must be called `time` enrich_cat (List[str], optional): it is possible to let this function enrich the dataset for example adding the standard columns: hour, dow, month and minute. Defaults to []. past_variables (List[str], optional): list of column names of past variables not available for future times . Defaults to []. future_variables (List[str], optional): list of future variables available for tuture times. Defaults to []. target_variables (List[str], optional): list of the target variables. They will added to past_variables by default unless `check_past` is false. Defaults to []. cat_var (List[str], optional): list of the categortial variables (same for past and future). Defaults to []. check_past (bool, optional): see `target_variables`. Defaults to True. group (str or None, optional): if not None the time serie dataset is considered composed by omogeneus timeseries coming from different realization (for example point of sales, cities, locations) and the relative series are not splitted during the sample generation. Defaults to None check_holes_and_duplicates (bool, optional): if False duplicates or holes will not checked, the dataloader can not correctly work, disable at your own risk. Defaults True silly_model (bool, optional): if True, target variables will be added to the pool of the future variables. This can be useful to see if information passes throught the decoder part of your model (if any) """ dataset = data.copy() dataset.sort_values(by='time',inplace=True) if check_holes_and_duplicates: beauty_string('I will drop duplicates, I dont like them','section',self.verbose) dataset.drop_duplicates(subset=['time'] if group is None else [group,'time'], keep='first', inplace=True, ignore_index=True) if group is None: differences = dataset.time.diff()[1:] else: differences = dataset[dataset[group]==dataset[group].unique()[0]].time.diff()[1:] if isinstance(dataset.time[0], datetime): freq = pd.to_timedelta(differences.min()) else: if int(dataset.time[0])==dataset.time[0]: ##ONLY THINK THAT WORKS IN GENERAL freq = int(differences.min()) else: raise TypeError("time must be integer or datetime") self.freq = freq if differences.nunique()>1: beauty_string("There are holes in the dataset i will try to extend the dataframe inserting NAN",'info',self.verbose) beauty_string(f'Detected minumum frequency: {freq}','section',self.verbose) dataset = extend_time_df(dataset,freq,group).merge(dataset,how='left') else: beauty_string("I will compute the frequency as minimum of the time difference",'info',self.verbose) self.freq = dataset.time.diff()[1:].min() if isinstance(dataset.time.dtype, datetime): self.freq = pd.to_timedelta(self.freq) assert len(target_variables)>0, 'Provide at least one column for target' assert 'time' in dataset.columns, 'The temporal column must be called time' if set(target_variables).intersection(set(past_variables))!= set(target_variables): if check_past: beauty_string('I will update past column adding all target columns, if you want to avoid this beahviour please use check_pass as false','info',self.verbose) past_variables = list(set(past_variables).union(set(target_variables))) self.cat_var = cat_var self.group = group if group is not None: if group not in cat_var: beauty_string(f'I will add {group} to the categorical variables','info',self.verbose) self.cat_var.append(group) self.enrich_cat = enrich_cat for c in enrich_cat: self.cat_var = list(set(self.cat_var+[c])) if c in dataset.columns: beauty_string('Categorical {c} already present, it will be added to categorical variable but not call the enriching function','info',self.verbose) else: self.enrich(dataset,c) self.dataset = dataset self.past_variables =past_variables self.future_variables = future_variables self.target_variables = target_variables self.out_vars = len(target_variables) self.num_var = list(set(self.past_variables).union(set(self.future_variables)).union(set(self.target_variables))) if silly_model: beauty_string('YOU ARE TRAINING A SILLY MODEL WITH THE TARGETS IN THE INPUTS','section',self.verbose) self.future_variables+=self.target_variables
[docs] def plot(self): """ Easy way to control the loaded data Returns: plotly.graph_objects._figure.Figure: figure of the target variables """ beauty_string('Plotting only target variables','block',self.verbose) if self.group is None: tmp = self.dataset[['time']+self.target_variables].melt(id_vars=['time']) fig = px.line(tmp,x='time',y='value',color='variable',title=self.name) fig.show() else: tmp = self.dataset[['time',self.group]+self.target_variables].melt(id_vars=['time',self.group]) fig = px.line(tmp,x='time',y='value',color='variable',title=self.name,facet_row=self.group) fig.show() return fig
[docs] def create_data_loader(self,data:pd.DataFrame, past_steps:int, future_steps:int, shift:int=0, keep_entire_seq_while_shifting:bool=False, starting_point:Union[None,dict]=None, skip_step:int=1, is_inference:bool=False )->MyDataset: """ Create the dataset for the training/inference step Args: data (pd.DataFrame): input dataset, usually a subset of self.data past_steps (int): past context length future_steps (int): future lags to predict shift (int, optional): if >0 the future input variables will be shifted (categorical and numerical). For example for attention model it is better to start with a know value of y and use it during the process. Defaults to 0. keep_entire_seq_while_shifting (bool, optional): if the dataset is shifted, you may want the future data be of length future_step+shift (like informer), default false starting_point (Union[None,dict], optional): a dictionary indicating if a sample must be considered. It is checked for the first lag in the future (useful in the case your model has to predict only starting from hour 12). Defaults to None. skip_step (int, optional): list of the categortial variables (same for past and future). Usual there is a skip of one between two saples but for debugging or training time purposes you can skip some samples. Defaults to 1. Returns: MyDataset: class that extends torch.utils.data.Dataset (see utils) keys of a batch: y : the target variable(s) x_num_past: the numerical past variables x_num_future: the numerical future variables x_cat_past: the categorical past variables x_cat_future: the categorical future variables idx_target: index of target features in the past array """ beauty_string('Creating data loader','block',self.verbose) x_num_past_samples = [] x_num_future_samples = [] x_cat_past_samples = [] x_cat_future_samples = [] y_samples = [] t_samples = [] g_samples = [] if starting_point is not None: kk = list(starting_point.keys())[0] assert kk not in self.cat_var, beauty_string('CAN NOT USE FEATURE {kk} as starting point it may have a different value due to the normalization step, please add a second column with a suitable name','info',True) ##overwrite categorical columns for c in self.cat_var: self.enrich(data,c) if self.group is None: data['_GROUP_'] = '1' else: data['_GROUP_'] = data[self.group].values if self.normalize_per_group: tot = [] groups = data[self.group].unique() data[self.group] = self.scaler_cat[self.group].transform(data[self.group].values.ravel()).flatten() for group in groups: tmp = data[data['_GROUP_']==group].copy() for c in self.num_var: tmp[c] = self.scaler_num[f'{c}_{group}'].transform(tmp[c].values.reshape(-1,1)).flatten() for c in self.cat_var: if c!=self.group: tmp[c] = self.scaler_cat[f'{c}_{group}'].transform(tmp[c].values.ravel()).flatten() tot.append(tmp) data = pd.concat(tot,ignore_index=True) else: for c in self.cat_var: data[c] = self.scaler_cat[c].transform(data[c].values.ravel()).flatten() for c in self.num_var: data[c] = self.scaler_num[c].transform(data[c].values.reshape(-1,1)).flatten() idx_target = [] for c in self.target_variables: idx_target.append(self.past_variables.index(c)) idx_target_future = [] for c in self.target_variables: if c in self.future_variables: idx_target_future.append(self.future_variables.index(c)) if len(idx_target_future)==0: idx_target_future = None if self.stacked: skip_stacked = future_steps*future_steps-future_steps else: skip_stacked = 0 for group in data['_GROUP_'].unique(): tmp = data[data['_GROUP_']==group] groups = tmp['_GROUP_'].values t = tmp.time.values x_num_past = tmp[self.past_variables].values if len(self.future_variables)>0: x_num_future = tmp[self.future_variables].values if len(self.cat_var)>0: x_cat = tmp[self.cat_var].values y_target = tmp[self.target_variables].values if starting_point is not None: check = tmp[list(starting_point.keys())[0]].values == starting_point[list(starting_point.keys())[0]] else: check = [True]*len(y_target) for i in range(past_steps,tmp.shape[0]-future_steps-skip_stacked,skip_step): if check[i]: if len(self.future_variables)>0: if keep_entire_seq_while_shifting: xx = x_num_future[i-shift+skip_stacked:i+future_steps+skip_stacked].mean() else: xx = x_num_future[i-shift+skip_stacked:i+future_steps-shift+skip_stacked].mean() else: xx = 0.0 if is_inference is False: xx+=y_target[i+skip_stacked:i+future_steps+skip_stacked].min() if np.isfinite(x_num_past[i-past_steps:i].min() + xx): x_num_past_samples.append(x_num_past[i-past_steps:i]) if len(self.future_variables)>0: if keep_entire_seq_while_shifting: x_num_future_samples.append(x_num_future[i-shift+skip_stacked:i+future_steps+skip_stacked]) else: x_num_future_samples.append(x_num_future[i-shift+skip_stacked:i+future_steps-shift+skip_stacked]) if len(self.cat_var)>0: x_cat_past_samples.append(x_cat[i-past_steps:i]) if keep_entire_seq_while_shifting: x_cat_future_samples.append(x_cat[i-shift+skip_stacked:i+future_steps+skip_stacked]) else: x_cat_future_samples.append(x_cat[i-shift+skip_stacked:i+future_steps-shift+skip_stacked]) y_samples.append(y_target[i+skip_stacked:i+future_steps+skip_stacked]) t_samples.append(t[i+skip_stacked:i+future_steps+skip_stacked]) g_samples.append(groups[i]) if len(self.future_variables)>0: try: x_num_future_samples = np.stack(x_num_future_samples) except Exception as e: beauty_string('WARNING x_num_future_samples is empty and it should not','info',True) y_samples = np.stack(y_samples) t_samples = np.stack(t_samples) g_samples = np.stack(g_samples) if len(self.cat_var)>0: x_cat_past_samples = np.stack(x_cat_past_samples) x_cat_future_samples = np.stack(x_cat_future_samples) x_num_past_samples = np.stack(x_num_past_samples) if self.stacked: mod = 0 else: mod = 1.0 dd = {'y':y_samples.astype(np.float32), 'x_num_past':(x_num_past_samples*mod).astype(np.float32)} if len(self.cat_var)>0: dd['x_cat_past'] = x_cat_past_samples dd['x_cat_future'] = x_cat_future_samples if len(self.future_variables)>0: dd['x_num_future'] = x_num_future_samples.astype(np.float32) return MyDataset(dd,t_samples,g_samples,idx_target,idx_target_future)
[docs] def split_for_train(self, perc_train:Union[float,None]=0.6, perc_valid:Union[float,None]=0.2, range_train:Union[List[Union[datetime, str]],None]=None, range_validation:Union[List[Union[datetime, str]],None]=None, range_test:Union[List[Union[datetime, str]],None]=None, past_steps:int = 100, future_steps:int=20, shift:int = 0, keep_entire_seq_while_shifting:bool=False, starting_point:Union[None, dict]=None, skip_step:int=1, normalize_per_group: bool=False, check_consecutive: bool=True, scaler: str='StandardScaler()' )->List[DataLoader]: """Split the data and create the datasets. Args: perc_train (Union[float,None], optional): fraction of the training set. Defaults to 0.6. perc_valid (Union[float,None], optional): fraction of the test set. Defaults to 0.2. range_train (Union[List[Union[datetime, str]],None], optional): a list of two elements indicating the starting point and end point of the training set (string date style or datetime). Defaults to None. range_validation (Union[List[Union[datetime, str]],None], optional):a list of two elements indicating the starting point and end point of the validation set (string date style or datetime). Defaults to None. range_test (Union[List[Union[datetime, str]],None], optional): a list of two elements indicating the starting point and end point of the test set (string date style or datetime). Defaults to None. past_steps (int, optional): past step to consider for making the prediction. Defaults to 100. future_steps (int, optional): future step to predict. Defaults to 20. shift (int, optional): see `create_data_loader`. Defaults to 0. keep_entire_seq_while_shifting (bool, optional): if the dataset is shifted, you may want the future data be of length future_step+shift (like informer), default false starting_point (Union[None, dict], optional): see `create_data_loader`. Defaults to None. skip_step (int, optional): see `create_data_loader`. Defaults to 1. normalize_per_group (boolean, optional): if true and self.group is not None, the variables are scaled respect to the groups. Default False check_consecutive (boolean, optional): if false it skips the check on the consecutive ranges. Default True scaler: instance of a sklearn.preprocessing scaler. Default 'StandardScaler()' Returns: List[DataLoader,DataLoader,DataLoadtrainer]: three dataloader used for training or inference """ beauty_string('Splitting for train','block',self.verbose) try: ls = self.dataset.shape[0] except Exception as _: beauty_string('Empty dataset','info', True) return None, None, None if range_train is None: if self.group is None: beauty_string(f'Split temporally using perc_train: {perc_train} and perc_valid:{perc_valid}','section',self.verbose) train = self.dataset.iloc[0:int(perc_train*ls)] validation = self.dataset.iloc[int(perc_train*ls):int(perc_train*ls+perc_valid*ls)] test = self.dataset.iloc[int(perc_train*ls+perc_valid*ls):] else: beauty_string(f'Split temporally using perc_train: {perc_train} and perc_valid:{perc_valid} for each group!','info',self.verbose) train = [] validation =[] test = [] ls = self.dataset.groupby(self.group).time.count().reset_index() for group in self.dataset[self.group].unique(): tmp = self.dataset[self.dataset[self.group]==group] lt = ls[ls[self.group]==group].time.values[0] train.append(tmp[0:int(perc_train*lt)]) validation.append(tmp[int(perc_train*lt):int(perc_train*lt+perc_valid*lt)]) test.append(tmp[int(perc_train*lt+perc_valid*lt):]) train = pd.concat(train,ignore_index=True) validation = pd.concat(validation,ignore_index=True) test = pd.concat(test,ignore_index=True) else: if check_consecutive: assert range_train[0]<range_train[1]<=range_validation[0]<range_validation[1]<=range_test[0]<range_test[1], beauty_string(f'The range are not correct','info',True) beauty_string('Split temporally using the time intervals provided','section',self.verbose) train = self.dataset[self.dataset.time.between(range_train[0],range_train[1])] validation = self.dataset[self.dataset.time.between(range_validation[0],range_validation[1])] test = self.dataset[self.dataset.time.between(range_test[0],range_test[1])] beauty_string('Train categorical and numerical scalers','block',self.verbose) if self.is_trained: pass else: self.scaler_cat = {} self.scaler_num = {} if self.group is None or normalize_per_group is False: self.normalize_per_group = False for c in self.num_var: self.scaler_num[c] = eval(scaler) self.scaler_num[c].fit(train[c].values.reshape(-1,1)) for c in self.cat_var: self.scaler_cat[c] = LabelEncoder() self.scaler_cat[c].fit(train[c].values.ravel()) else: self.normalize_per_group = True self.scaler_cat[self.group] = LabelEncoder() self.scaler_cat[self.group].fit(train[self.group].values.ravel()) for group in train[self.group].unique(): tmp = train[train[self.group]==group] for c in self.num_var: self.scaler_num[f'{c}_{group}'] = eval(scaler) self.scaler_num[f'{c}_{group}'].fit(tmp[c].values.reshape(-1,1)) for c in self.cat_var: if c!=self.group: self.scaler_cat[f'{c}_{group}'] = LabelEncoder() self.scaler_cat[f'{c}_{group}'].fit(tmp[c].values.ravel()) dl_train = self.create_data_loader(train,past_steps,future_steps,shift,keep_entire_seq_while_shifting,starting_point,skip_step) dl_validation = self.create_data_loader(validation,past_steps,future_steps,shift,keep_entire_seq_while_shifting,starting_point,skip_step) if test.shape[0]>0: dl_test = self.create_data_loader(test,past_steps,future_steps,shift,keep_entire_seq_while_shifting,starting_point,skip_step) else: dl_test = None return dl_train,dl_validation,dl_test
[docs] def set_model(self,model:Base,config:dict=None,custom_init:bool=False): """Set the model to train Args: model (Base): see `models` config (dict, optional): usually the configuration used by the model. Defaults to None. custom_init (bool, optional): if true a custom initialization paradigm will be used (see weight_init in models/utils.py ) . """ self.model = model if custom_init: self.model.apply(weight_init) #self.model.apply(weight_init_zeros) self.config = config beauty_string('Setting the model','block',self.verbose) beauty_string(model,'',self.verbose)
[docs] def train_model(self,dirpath:str, split_params:dict, batch_size:int=100, num_workers:int=4, max_epochs:int=500, auto_lr_find:bool=True, gradient_clip_val:Union[float,None]=None, gradient_clip_algorithm:str="value", devices:Union[str,List[int]]='auto', precision:Union[str,int]=32, modifier:Union[None,str]=None, modifier_params:Union[None,dict]=None, seed:int=42 )-> float: """Train the model Args: dirpath (str): path where to put all the useful things split_params (dict): see `split_for_train` batch_size (int, optional): batch size. Defaults to 100. num_workers (int, optional): num_workers for the dataloader. Defaults to 4. max_epochs (int, optional): maximum epochs to perform. Defaults to 500. auto_lr_find (bool, optional): find initial learning rate, see `pytorch-lightening`. Defaults to True. gradient_clip_val (Union[float,None], optional): gradient_clip_val. Defaults to None. See https://lightning.ai/docs/pytorch/stable/advanced/training_tricks.html gradient_clip_algorithm (str, optional): gradient_clip_algorithm. Defaults to 'norm '. See https://lightning.ai/docs/pytorch/stable/advanced/training_tricks.html devices (Union[str,List[int]], optional): devices to use. Use auto if cpu or the list of gpu to use otherwise. Defaults to 'auto'. precision (Union[str,int], optional): precision to use. Usually 32 bit is fine but for larger model you should try 'bf16'. If 'auto' it will use bf16 for GPU and 32 for cpu modifier (Union[str,int], optional): if not None a modifier is applyed to the dataloader. Sometimes lightening has very restrictive rules on the dataloader, or we want to use a ML model before or after the DL model (See readme for more information) modifier_params (Union[dict,int], optional): parameters of the modifier seed (int, optional): seed for reproducibility """ beauty_string('Training the model','block',self.verbose) self.split_params = split_params self.check_custom = False train,validation,test = self.split_for_train(**self.split_params) accelerator = 'gpu' if torch.cuda.is_available() else "cpu" strategy = "auto" if accelerator == 'gpu': strategy = "auto" ##TODO in future investigate on this if precision=='auto': precision = 'bf16' #"bf16" ##in futuro magari inserirlo nei config, potrebbe essere che per alcuni modelli possa non andare bfloat32 torch.set_float32_matmul_precision('medium') beauty_string('Setting multiplication precision to medium','info',self.verbose) else: devices = 'auto' if precision=='auto': precision = 32 beauty_string(f'train:{len(train)}, validation:{len(validation)}, test:{len(test) if test is not None else 0}','section',self.verbose) if (accelerator=='gpu') and (num_workers>0): persistent_workers = True else: persistent_workers = False if modifier is not None: modifier = eval(modifier) modifier = modifier(**modifier_params) train, validation = modifier.fit_transform(train=train,val=validation) self.modifier = modifier else: self.modifier = None train_dl = DataLoader(train, batch_size = batch_size , shuffle=True,drop_last=True,num_workers=num_workers,persistent_workers=persistent_workers) valid_dl = DataLoader(validation, batch_size = batch_size , shuffle=False,drop_last=True,num_workers=num_workers,persistent_workers=persistent_workers) checkpoint_callback = ModelCheckpoint(dirpath=dirpath, monitor='val_loss', save_last = True, every_n_epochs =1, verbose = self.verbose, save_top_k = 1, filename='checkpoint') #logger = CSVLogger("logs", name=dirpath) aim_logger = AimLogger( experiment=self.name, train_metric_prefix='train_', val_metric_prefix='val_', ) #https://stackoverflow.com/questions/49201236/check-the-total-number-of-parameters-in-a-pytorch-model n_params = sum(dict((p.data_ptr(), p.numel()) for p in self.model.parameters()).values()) #https://discuss.pytorch.org/t/finding-model-size/130275 param_size = 0 for param in self.model.parameters(): param_size += param.nelement() * param.element_size() buffer_size = 0 for buffer in self.model.buffers(): buffer_size += buffer.nelement() * buffer.element_size() size_all_mb = (param_size + buffer_size) / 1024**2 #aim_logger.experiment.track(self.model.name,name='model_name') aim_logger.experiment.track(n_params,name='N-parameters') aim_logger.experiment.track(size_all_mb,name='dim-model-MB') aim_logger.experiment.track(len(train_dl.dataset),name='len-train') aim_logger.experiment.track(len(valid_dl.dataset),name='len-valid') #aim_logger.experiment.track(self.config,name=None) tmp = self.config.copy() tmp['model_name'] = self.model.name aim_logger._run['hyperparameters'] = tmp mc = MetricsCallback(dirpath) ## TODO se ci sono 2 o piu gpu MetricsCallback non funziona (secondo me fa una istanza per ogni dataparallel che lancia e poi non riesce a recuperare info) pl.seed_everything(seed, workers=True) self.model.max_epochs = max_epochs trainer = pl.Trainer(default_root_dir=dirpath, logger = aim_logger, max_epochs=max_epochs, callbacks=[checkpoint_callback,mc], auto_lr_find=auto_lr_find, accelerator=accelerator, devices=devices, strategy=strategy, enable_progress_bar=False, precision=precision, gradient_clip_val=gradient_clip_val, gradient_clip_algorithm=gradient_clip_algorithm)#,devices=1) tot_seconds = time.time() if auto_lr_find: trainer.tune(self.model,train_dataloaders=train_dl,val_dataloaders = valid_dl) files = os.listdir(dirpath) for f in files: if '.lr_find' in f: os.remove(os.path.join(dirpath,f)) trainer.fit(self.model, train_dl,valid_dl) self.checkpoint_file_best = checkpoint_callback.best_model_path self.checkpoint_file_last = checkpoint_callback.last_model_path if self.checkpoint_file_last=='': beauty_string('There is a bug on saving last model I will try to fix it','info',self.verbose) self.checkpoint_file_last = checkpoint_callback.best_model_path.replace('checkpoint','last') self.dirpath = dirpath self.losses = mc.metrics files = os.listdir(dirpath) ##accrocchio per multi gpu for f in files: if '__losses__.csv' in f: if len(self.losses['val_loss'])>0: self.losses = pd.DataFrame(self.losses) else: self.losses = pd.read_csv(os.path.join(os.path.join(dirpath,f))) os.remove(os.path.join(os.path.join(dirpath,f))) if isinstance(self.losses,dict): self.losses = pd.DataFrame() try: self.model = self.model.load_from_checkpoint(self.checkpoint_file_last) except Exception as _: beauty_string(f'There is a problem loading the weights on file MAYBE CHANGED HOW WEIGHTS ARE LOADED {self.checkpoint_file_last}','section',self.verbose) try: val_loss = self.losses.val_loss.values[-1] except Exception as _: beauty_string('Can not extract the validation loss, maybe it is a persistent model','info',self.verbose) val_loss = 100 self.is_trained = True beauty_string('END of the training process','block',self.verbose) aim_logger.experiment.track((time.time()-tot_seconds),name='seconds-training') aim_logger.experiment.track(val_loss,name='val-loss-end-train') return val_loss
[docs] def inference_on_set(self,batch_size:int=100, num_workers:int=4, split_params:Union[None,dict]=None,set:str='test', rescaling:bool=True, data:Union[None,torch.utils.data.Dataset]=None)->pd.DataFrame: """This function allows to get the prediction on a particular set (train, test or validation). Args: batch_size (int, optional): barch sise. Defaults to 100. num_workers (int, optional): num workers. Defaults to 4. split_params (Union[None,dict], optional): if not None the spliting procedure will use the given data otherwise it will use the same configuration used in train. Defaults to None. set (str, optional): trai, validation or test. Defaults to 'test'. rescaling (bool, optional): If rescaling is true the output will be rescaled to the initial values. . Defaults to True. data (None or pd.DataFrame, optional). If not None the inference is performed on the given data. In the case of custom data please call inference because it will normalize the data for you! Returns: pd.DataFrame: the predicted values in a pandas format """ beauty_string('Inference on a set (train, validation o test)','block',self.verbose) if data is None: if split_params is None: beauty_string(f'splitting using train parameters {self.split_params}','section',self.verbose) train,validation,test = self.split_for_train(**self.split_params) else: train,validation,test = self.split_for_train(**split_params) if set=='test': if self.modifier is not None: test = self.modifier.transform(test) dl = DataLoader(test, batch_size = batch_size , shuffle=False,drop_last=False,num_workers=num_workers) elif set=='validation': if self.modifier is not None: validation = self.modifier.transform(validation) dl = DataLoader(validation, batch_size = batch_size , shuffle=False,drop_last=False,num_workers=num_workers) elif set=='train': if self.modifier is not None: train = self.modifier.transform(train) dl = DataLoader(train, batch_size = batch_size , shuffle=False,drop_last=False,num_workers=num_workers) elif set=='custom': if self.check_custom: pass else: beauty_string('If you are here something went wrong, please report it','section',self.verbose) if self.modifier is not None: data = self.modifier.transform(data) dl = DataLoader(data, batch_size = batch_size , shuffle=False,drop_last=False,num_workers=num_workers) else: beauty_string('Select one of train, test, or validation set','section',self.verbose) self.model.eval() res = [] real = [] self.model.to(torch.device("cuda:0" if torch.cuda.is_available() else "cpu")) beauty_string(f'Device used: {self.model.device}','info',self.verbose) for batch in dl: res.append(self.model.inference(batch).cpu().detach().numpy()) real.append(batch['y'].cpu().detach().numpy()) res = np.vstack(res) real = np.vstack(real) time = dl.dataset.t groups = dl.dataset.groups #import pdb #pdb.set_trace() if self.modifier is not None: res,real = self.modifier.inverse_transform(res,real) ## BxLxCx3 if rescaling: beauty_string('Scaling back','info',self.verbose) if self.normalize_per_group is False: for i, c in enumerate(self.target_variables): real[:,:,i] = self.scaler_num[c].inverse_transform(real[:,:,i].reshape(-1,1)).reshape(-1,real.shape[1]) for j in range(res.shape[3]): res[:,:,i,j] = self.scaler_num[c].inverse_transform(res[:,:,i,j].reshape(-1,1)).reshape(-1,res.shape[1]) else: for group in np.unique(groups): idx = np.where(groups==group)[0] for i, c in enumerate(self.target_variables): real[idx,:,i] = self.scaler_num[f'{c}_{group}'].inverse_transform(real[idx,:,i].reshape(-1,1)).reshape(-1,real.shape[1]) for j in range(res.shape[3]): res[idx,:,i,j] = self.scaler_num[f'{c}_{group}'].inverse_transform(res[idx,:,i,j].reshape(-1,1)).reshape(-1,res.shape[1]) if self.model.use_quantiles: time = pd.DataFrame(time,columns=[i+1 for i in range(res.shape[1])]) if self.group is not None: time[self.group] = groups time = time.melt(id_vars=['region']) else: time = time.melt() time.rename(columns={'value':'time','variable':'lag'},inplace=True) tot = [time] for i, c in enumerate(self.target_variables): tot.append(pd.DataFrame(real[:,:,i],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c}).drop(columns=['variable'])) tot.append(pd.DataFrame(res[:,:,i,0],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c+'_low'}).drop(columns=['variable'])) tot.append(pd.DataFrame(res[:,:,i,1],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c+'_median'}).drop(columns=['variable'])) tot.append(pd.DataFrame(res[:,:,i,2],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c+'_high'}).drop(columns=['variable'])) res = pd.concat(tot,axis=1) ## BxLxCx1 else: time = pd.DataFrame(time,columns=[i+1 for i in range(res.shape[1])])#.melt() if self.group is not None: time[self.group] = groups time = time.melt(id_vars=['region']) else: time = time.melt() time.rename(columns={'value':'time','variable':'lag'},inplace=True) tot = [time] for i, c in enumerate(self.target_variables): tot.append(pd.DataFrame(real[:,:,i],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c}).drop(columns=['variable'])) tot.append(pd.DataFrame(res[:,:,i,0],columns=[i+1 for i in range(res.shape[1])]).melt().rename(columns={'value':c+'_pred'}).drop(columns=['variable'])) res = pd.concat(tot,axis=1) res['prediction_time'] = res.apply(lambda x: x.time-self.freq*x.lag, axis=1) return res
[docs] def inference(self,batch_size:int=100, num_workers:int=4, split_params:Union[None,dict]=None, rescaling:bool=True, data:pd.DataFrame=None, steps_in_future:int=0, check_holes_and_duplicates:bool=True, is_inference:bool=False)->pd.DataFrame: ##TODO PUSH THIS ON PTF! """similar to `inference_on_set` only change is split_params that must contain this keys but using the default can be sufficient: 'past_steps','future_steps','shift','keep_entire_seq_while_shifting','starting_point' skip_step is set to 1 for convenience (generally you want all the predictions) You can set split_params to None and use the standard parameters (at your own risck) Args: batch_size (int, optional): see inference_on_set. Defaults to 100. num_workers (int, optional): inference_on_set. Defaults to 4. split_params (Union[None,dict], optional): inference_on_set. Defaults to None. rescaling (bool, optional): inference_on_set. Defaults to True. data (pd.DataFrame, optional): startin dataset. Defaults to None. steps_in_future (int, optional): if>0 the dataset is extendend in order to make predictions in the future. Defaults to 0. check_holes_and_duplicates (bool, optional): if False the routine does not check for holes or for duplicates, set to False for stacked model. Defaults to True. Returns: pd.DataFrame: predicted values """ beauty_string('Inference on a custom dataset','block',self.verbose) self.check_custom = True ##this is a check for the dataset loading ## enlarge the dataset in order to have all the rows needed if check_holes_and_duplicates: if self.group is None: ##freq = pd.to_timedelta(np.diff(data.time).min()) freq = self.freq #TODO port it into PTF beauty_string(f'Detected minumum frequency: {freq}','section',self.verbose) ## TODO work on this for consistency empty = pd.DataFrame({'time':pd.date_range(data.time.min(),data.time.max()+freq*(steps_in_future+self.split_params['past_steps']+self.split_params['future_steps']),freq=freq)}) else: freq = pd.to_timedelta(np.diff(data[data[self.group==data[self.group].unique()[0]]].time).min()) beauty_string(f'Detected minumum frequency: {freq} supposing constant frequence inside the groups','section',self.verbose) _min = data.groupby(self.group).time.min() _max = data.groupby(self.group).time.max() empty = [] for c in data[self.group].unique(): empty.append(pd.DataFrame({self.group:c,'time':pd.date_range(_min.time[_min[self.group]==c].values[0],_max.time[_max[self.group]==c].values[0]+freq*(steps_in_future+self.split_params['past_steps']+self.split_params['future_steps']),freq=freq)})) empty = pd.concat(empty,ignore_index=True) dataset = empty.merge(data,how='left') #TODO port it into PTF for c in self.cat_var: self.enrich(dataset, c) else: dataset = data.copy() if split_params is None: split_params = {} for c in self.split_params.keys(): if c in ['past_steps','future_steps','shift','keep_entire_seq_while_shifting','starting_point']: split_params[c] = self.split_params[c] split_params['skip_step']=1 data = self.create_data_loader(dataset,**split_params,is_inference=is_inference) else: data = self.create_data_loader(data,**split_params,is_inference=is_inference) res = self.inference_on_set(batch_size=batch_size,num_workers=num_workers,split_params=None,set='custom',rescaling=rescaling,data=data) self.check_custom = False return res
[docs] def save(self, filename:str)->None: """save the timeseries object Args: filename (str): name of the file """ beauty_string('Saving','block',self.verbose) with open(f'{filename}.pkl','wb') as f: params = self.__dict__.copy() for k in ['model']: if k in params.keys(): _ = params.pop(k) pickle.dump(params,f)
[docs] def load(self,model:Base, filename:str,load_last:bool=True,dirpath:Union[str,None]=None,weight_path:Union[str, None]=None)->None: """ Load a saved model Args: model (Base): class of the model to load (it will be initiated by pytorch-lightening) filename (str): filename of the saved model load_last (bool, optional): if true the last checkpoint will be loaded otherwise the best (in the validation set). Defaults to True. dirpath (Union[str,None], optional): if None we asssume that the model is loaded from the same pc where it has been trained, otherwise we can pass the dirpath where all the stuff has been saved . Defaults to None. weight_path (Union[str, None], optional): if None the standard path will be used. Defaults to None. """ beauty_string('Loading','block',self.verbose) self.modifier = None self.check_custom = False self.is_trained = True with open(filename+'.pkl','rb') as f: params = pickle.load(f) for p in params: setattr(self,p, params[p]) if 'verbose' in self.config['model_configs'].keys(): self.config['model_configs'].pop('verbose') self.model = model(**self.config['model_configs'],optim_config = self.config['optim_config'],scheduler_config =self.config['scheduler_config'],verbose=self.verbose ) if weight_path is not None: tmp_path = weight_path else: if self.dirpath is not None: directory = self.dirpath else: directory = dirpath if load_last: try: tmp_path = os.path.join(directory,self.checkpoint_file_last.split('/')[-1]) except Exception as _: beauty_string('checkpoint_file_last not defined try to load best','section',self.verbose) tmp_path = os.path.join(directory,self.checkpoint_file_best.split('/')[-1]) else: try: tmp_path = os.path.join(directory,self.checkpoint_file_best.split('/')[-1]) except Exception as _: beauty_string('checkpoint_file_best not defined try to load best','section',self.verbose) tmp_path = os.path.join(directory,self.checkpoint_file_last.split('/')[-1]) try: self.model = self.model.load_from_checkpoint(tmp_path,verbose=self.verbose) except Exception as e: beauty_string(f'There is a problem loading the weights on file {tmp_path} {e}','section',self.verbose)